Skip to content

Commit

Permalink
More information in chunk provider.
Browse files Browse the repository at this point in the history
  • Loading branch information
dasatomic committed Jun 15, 2021
1 parent cac365b commit 66d3c03
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 25 deletions.
27 changes: 27 additions & 0 deletions QueryProcessing/Builders/AstToOpTreeBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,41 @@ namespace QueryProcessing
/// </summary>
public class RowProvider
{
private Dictionary<string, int> columnPositions = new Dictionary<string, int>();

public RowProvider(IAsyncEnumerable<RowHolder> enumerator, MetadataColumn[] columnInfo)
{
this.Enumerator = enumerator;
this.ColumnInfo = columnInfo;

foreach (MetadataColumn ci in columnInfo)
{
columnPositions.Add(ci.ColumnName, ci.ColumnId);
}
}

public IAsyncEnumerable<RowHolder> Enumerator { get; }
public MetadataColumn[] ColumnInfo { get; }

public T GetValue<T>(RowHolder rh, string columnName) where T : unmanaged
{
if (columnPositions.TryGetValue(columnName, out int position))
{
return rh.GetField<T>(position);
}

throw new InvalidColumnNameException();
}

public string GetValue(RowHolder rh, string columnName)
{
if (columnPositions.TryGetValue(columnName, out int position))
{
return new string(rh.GetStringField(position));
}

throw new InvalidColumnNameException();
}
}

public class AstToOpTreeBuilder
Expand Down
63 changes: 48 additions & 15 deletions QueryProcessing/PhyOperators/PhyOpScan.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
using MetadataManager;
using PageManager;
using System;
using System.Linq;
using System.Collections.Generic;
using System.Threading.Tasks;
using QueryProcessing.Exceptions;
using static QueryProcessing.SourceProvidersSignatures;

namespace QueryProcessing
{
Expand All @@ -29,17 +31,43 @@ public class PhyOpVideoChunker : IPhysicalOperator<RowHolder>
{
private RowProvider rowProvider;
private TimeSpan chunkLength;
private SourceProvidersSignatures.VideoChunkerProvider videoChunkProvider;
const string FilePathField = "FilePath";
private VideoChunkerProvider videoChunkProvider;
private const string FilePathField = "FilePath";
private MetadataColumn[] outputColumns;

private MetadataColumn[] extensionColumns = new[]
{
new MetadataColumn(0, 0, "chunk_path", new ColumnInfo(ColumnType.String, 256)), // Chunk path
new MetadataColumn(1, 0, "NbStreams", new ColumnInfo(ColumnType.Int)), // NbStreams
new MetadataColumn(2, 0, "NbPrograms", new ColumnInfo(ColumnType.Int)), // NbPrograms,
new MetadataColumn(3, 0, "StartTimeInSeconds", new ColumnInfo(ColumnType.Double)), // StartTimeInSeconds,
new MetadataColumn(4, 0, "DurationInSeconds", new ColumnInfo(ColumnType.Double)), // DurationInSeconds,
new MetadataColumn(5, 0, "FormatName", new ColumnInfo(ColumnType.String, 256)), // Format name,
new MetadataColumn(6, 0, "BitRate", new ColumnInfo(ColumnType.Int)), // BitRate,
};

public PhyOpVideoChunker(RowProvider rowProvider, TimeSpan chunkLength, SourceProvidersSignatures.VideoChunkerProvider videoChunkerCallback)
{
this.rowProvider = rowProvider;
this.chunkLength = chunkLength;
this.videoChunkProvider = videoChunkerCallback;

this.outputColumns = new MetadataColumn[rowProvider.ColumnInfo.Length + this.extensionColumns.Length];
rowProvider.ColumnInfo.CopyTo(outputColumns, 0);

int extensionPosition = 0;
for (int i = rowProvider.ColumnInfo.Length; i < outputColumns.Length; i++)
{
outputColumns[i] = new MetadataColumn(
extensionColumns[extensionPosition].ColumnId + rowProvider.ColumnInfo.Length,
extensionColumns[extensionPosition].TableId,
extensionColumns[extensionPosition].ColumnName,
extensionColumns[extensionPosition].ColumnType);
extensionPosition++;
}
}

public MetadataColumn[] GetOutputColumns() => rowProvider.ColumnInfo;
public MetadataColumn[] GetOutputColumns() => this.outputColumns;

public async IAsyncEnumerable<RowHolder> Iterate(ITransaction tran)
{
Expand All @@ -64,36 +92,41 @@ public async IAsyncEnumerable<RowHolder> Iterate(ITransaction tran)
throw new FilePathColumnDoesntExist();
}

// Need to build projection of all the columns plus append the chunk_name to end.
ProjectExtendInfo.MappingType[] mappingTypes = new ProjectExtendInfo.MappingType[rowProvider.ColumnInfo.Length + 1];
ProjectExtendInfo.MappingType[] mappingTypes = new ProjectExtendInfo.MappingType[rowProvider.ColumnInfo.Length + extensionColumns.Length];

for (int i = 0; i < mappingTypes.Length - 1; i++)
for (int i = 0; i < rowProvider.ColumnInfo.Length; i++)
{
mappingTypes[i] = ProjectExtendInfo.MappingType.Projection;
}

mappingTypes[mappingTypes.Length - 1] = ProjectExtendInfo.MappingType.Extension;
for (int i = rowProvider.ColumnInfo.Length; i < mappingTypes.Length; i++)
{
mappingTypes[i] = ProjectExtendInfo.MappingType.Extension;
}

int[] projectSourcePositions = new int[mappingTypes.Length - 1];
int[] projectSourcePositions = new int[rowProvider.ColumnInfo.Length];
for (int i = 0; i < projectSourcePositions.Length; i++)
{
projectSourcePositions[i] = i;
}

ColumnInfo[] extendedColumnInfo = new[] { new ColumnInfo(ColumnType.String, 256) };
ProjectExtendInfo extendInfo = new ProjectExtendInfo(mappingTypes, projectSourcePositions, extendedColumnInfo);

int chunkNamePosition = mappingTypes.Length - 1;
ProjectExtendInfo extendInfo = new ProjectExtendInfo(mappingTypes, projectSourcePositions, extensionColumns.Select(ec => ec.ColumnType).ToArray());

await foreach (RowHolder row in rowProvider.Enumerator)
{
string filePath = new string(row.GetStringField(filePathColumnId));

SourceProvidersSignatures.VideoChunkerResult videoChunkerResult = await this.videoChunkProvider(filePath, this.chunkLength, tran);
foreach (string chunkPath in videoChunkerResult.ChunkPaths)
VideoChunkerResult[] videoChunkerResult = await this.videoChunkProvider(filePath, this.chunkLength, tran);
foreach (VideoChunkerResult videoChunk in videoChunkerResult)
{
RowHolder expended = row.ProjectAndExtend(extendInfo);
expended.SetField(chunkNamePosition, chunkPath.ToCharArray());
expended.SetField(rowProvider.ColumnInfo.Length + 0, videoChunk.ChunkPath.ToCharArray());
expended.SetField(rowProvider.ColumnInfo.Length + 1, videoChunk.NbStreams);
expended.SetField(rowProvider.ColumnInfo.Length + 2, videoChunk.NbPrograms);
expended.SetField(rowProvider.ColumnInfo.Length + 3, videoChunk.StartTimeInSeconds);
expended.SetField(rowProvider.ColumnInfo.Length + 4, videoChunk.DurationInSeconds);
expended.SetField(rowProvider.ColumnInfo.Length + 5, videoChunk.FormatName.ToCharArray());
expended.SetField(rowProvider.ColumnInfo.Length + 6, videoChunk.BitRate);

yield return expended;
}
Expand Down
10 changes: 8 additions & 2 deletions QueryProcessing/SourceProvidersSignatures.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,15 @@ public static class SourceProvidersSignatures
{
public struct VideoChunkerResult
{
public string[] ChunkPaths;
public string ChunkPath;
public int NbStreams;
public int NbPrograms;
public double StartTimeInSeconds;
public double DurationInSeconds;
public string FormatName;
public int BitRate;
}

public delegate Task<VideoChunkerResult> VideoChunkerProvider(string videoPath, TimeSpan timespan, ITransaction tran);
public delegate Task<VideoChunkerResult[]> VideoChunkerProvider(string videoPath, TimeSpan timespan, ITransaction tran);
}
}
3 changes: 3 additions & 0 deletions VideoProcessing/FfProbeOutputSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ public class FormatSerializer
[JsonProperty("duration")]
public double DurationInSeconds;

[JsonProperty("start_time")]
public double StartTimeInSeconds;

[JsonProperty("format_name")]
public string FormatName;

Expand Down
24 changes: 19 additions & 5 deletions VideoProcessing/SourceRegistration.cs
Original file line number Diff line number Diff line change
@@ -1,22 +1,36 @@
using PageManager;
using QueryProcessing;
using System;
using System.Threading;
using static QueryProcessing.SourceProvidersSignatures;

namespace VideoProcessing
{
public static class SourceRegistration
{
public static SourceProvidersSignatures.VideoChunkerProvider VideoChunkerCallback(FfmpegVideoChunker videoChunker)
public static VideoChunkerProvider VideoChunkerCallback(FfmpegVideoChunker videoChunker, FfmpegProbeWrapper videoProbe)
{
return async (string path, TimeSpan chunk, ITransaction tran) =>
{
string[] chunkPaths = await videoChunker.Execute(path, chunk, tran, CancellationToken.None);
return new SourceProvidersSignatures.VideoChunkerResult
VideoChunkerResult[] chunkerResult = new VideoChunkerResult[chunkPaths.Length];
int i = 0;
foreach (string videoChunk in chunkPaths)
{
ChunkPaths = chunkPaths,
};
FfProbeOutputSerializer probeOutput = await videoProbe.Execute(videoChunk, CancellationToken.None);
chunkerResult[i].ChunkPath = videoChunk;
chunkerResult[i].NbStreams= probeOutput.Format.NbStreams;
chunkerResult[i].NbPrograms= probeOutput.Format.NbPrograms;
chunkerResult[i].StartTimeInSeconds = probeOutput.Format.StartTimeInSeconds;
chunkerResult[i].DurationInSeconds = probeOutput.Format.DurationInSeconds;
chunkerResult[i].FormatName = probeOutput.Format.FormatName;
chunkerResult[i].BitRate = probeOutput.Format.BitRate;
i++;
}
return chunkerResult;
};
}
}
Expand Down
3 changes: 2 additions & 1 deletion tests/E2EQueryExecutionTests/BaseTestSetup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ await using (ITransaction tran = this.logManager.CreateTransaction(pageManager,
metadataManager = new MetadataManager.MetadataManager(pageManager, stringHeap, pageManager, logManager);

var videoChunker = new FfmpegVideoChunker(GetTempFolderPath(), TestGlobals.TestFileLogger);
var videoChunkerCallback = SourceRegistration.VideoChunkerCallback(videoChunker);
var videoProbe = new FfmpegProbeWrapper(TestGlobals.TestFileLogger);
var videoChunkerCallback = SourceRegistration.VideoChunkerCallback(videoChunker, videoProbe);
AstToOpTreeBuilder treeBuilder = new AstToOpTreeBuilder(metadataManager, videoChunkerCallback);

this.queryEntryGate = new QueryEntryGate(
Expand Down
11 changes: 10 additions & 1 deletion tests/E2EQueryExecutionTests/FileSystemProviderTests.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using NUnit.Framework;
using PageManager;
using QueryProcessing;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
Expand Down Expand Up @@ -111,7 +112,15 @@ public async Task VideoChunkerTest()
RowHolder[] result = await this.queryEntryGate.Execute(query, tran).ToArrayAsync();
await tran.Commit();

Assert.AreEqual(5, result.Length);
RowProvider rowProvider = await this.queryEntryGate.BuildExecutionTree(query, tran);

RowHolder[] rows = await rowProvider.Enumerator.ToArrayAsync();
Assert.AreEqual(5, rows.Length);

foreach (RowHolder row in rows)
{
Assert.AreEqual(rowProvider.GetValue(row, "FormatName"), "matroska,webm");
}
}
}
}
18 changes: 17 additions & 1 deletion tests/VideoProcessingTests/FFMpegVideoSplitTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,29 @@ private static string GetTempFolderPath()
}

[Test]
public async Task FFmpegVideoChunkerTests()
public async Task FFmpegVideoChunkerTest()
{
var videoChunker = new FfmpegVideoChunker(GetTempFolderPath(), new NoOpLogging());

string[] chunkPaths = await videoChunker.Execute(GetExampleVideoPath(), TimeSpan.FromSeconds(10), new DummyTran(), CancellationToken.None);

Assert.AreEqual(5, chunkPaths.Length);
}

[Test]
public async Task TaskFFmpegChunkPlusProbeTest()
{

var videoChunker = new FfmpegVideoChunker(GetTempFolderPath(), new NoOpLogging());
var probe = new FfmpegProbeWrapper(new NoOpLogging());

string[] chunkPaths = await videoChunker.Execute(GetExampleVideoPath(), TimeSpan.FromSeconds(10), new DummyTran(), CancellationToken.None);

Assert.AreEqual(5, chunkPaths.Length);
foreach (string chunkPath in chunkPaths)
{
var probeOutput = await probe.Execute(chunkPath, CancellationToken.None);
}
}
}
}

0 comments on commit 66d3c03

Please sign in to comment.