From 66d3c03a183713b07c6aa01403cb73d5c2243733 Mon Sep 17 00:00:00 2001 From: dasatomic Date: Tue, 15 Jun 2021 13:30:06 +0200 Subject: [PATCH] More information in chunk provider. --- .../Builders/AstToOpTreeBuilder.cs | 27 ++++++++ QueryProcessing/PhyOperators/PhyOpScan.cs | 63 ++++++++++++++----- QueryProcessing/SourceProvidersSignatures.cs | 10 ++- VideoProcessing/FfProbeOutputSerializer.cs | 3 + VideoProcessing/SourceRegistration.cs | 24 +++++-- tests/E2EQueryExecutionTests/BaseTestSetup.cs | 3 +- .../FileSystemProviderTests.cs | 11 +++- .../FFMpegVideoSplitTests.cs | 18 +++++- 8 files changed, 134 insertions(+), 25 deletions(-) diff --git a/QueryProcessing/Builders/AstToOpTreeBuilder.cs b/QueryProcessing/Builders/AstToOpTreeBuilder.cs index f5a3b7d..b239737 100644 --- a/QueryProcessing/Builders/AstToOpTreeBuilder.cs +++ b/QueryProcessing/Builders/AstToOpTreeBuilder.cs @@ -14,14 +14,41 @@ namespace QueryProcessing /// public class RowProvider { + private Dictionary columnPositions = new Dictionary(); + public RowProvider(IAsyncEnumerable enumerator, MetadataColumn[] columnInfo) { this.Enumerator = enumerator; this.ColumnInfo = columnInfo; + + foreach (MetadataColumn ci in columnInfo) + { + columnPositions.Add(ci.ColumnName, ci.ColumnId); + } } public IAsyncEnumerable Enumerator { get; } public MetadataColumn[] ColumnInfo { get; } + + public T GetValue(RowHolder rh, string columnName) where T : unmanaged + { + if (columnPositions.TryGetValue(columnName, out int position)) + { + return rh.GetField(position); + } + + throw new InvalidColumnNameException(); + } + + public string GetValue(RowHolder rh, string columnName) + { + if (columnPositions.TryGetValue(columnName, out int position)) + { + return new string(rh.GetStringField(position)); + } + + throw new InvalidColumnNameException(); + } } public class AstToOpTreeBuilder diff --git a/QueryProcessing/PhyOperators/PhyOpScan.cs b/QueryProcessing/PhyOperators/PhyOpScan.cs index c0f676d..adbe923 100644 --- a/QueryProcessing/PhyOperators/PhyOpScan.cs +++ b/QueryProcessing/PhyOperators/PhyOpScan.cs @@ -2,9 +2,11 @@ using MetadataManager; using PageManager; using System; +using System.Linq; using System.Collections.Generic; using System.Threading.Tasks; using QueryProcessing.Exceptions; +using static QueryProcessing.SourceProvidersSignatures; namespace QueryProcessing { @@ -29,17 +31,43 @@ public class PhyOpVideoChunker : IPhysicalOperator { private RowProvider rowProvider; private TimeSpan chunkLength; - private SourceProvidersSignatures.VideoChunkerProvider videoChunkProvider; - const string FilePathField = "FilePath"; + private VideoChunkerProvider videoChunkProvider; + private const string FilePathField = "FilePath"; + private MetadataColumn[] outputColumns; + + private MetadataColumn[] extensionColumns = new[] + { + new MetadataColumn(0, 0, "chunk_path", new ColumnInfo(ColumnType.String, 256)), // Chunk path + new MetadataColumn(1, 0, "NbStreams", new ColumnInfo(ColumnType.Int)), // NbStreams + new MetadataColumn(2, 0, "NbPrograms", new ColumnInfo(ColumnType.Int)), // NbPrograms, + new MetadataColumn(3, 0, "StartTimeInSeconds", new ColumnInfo(ColumnType.Double)), // StartTimeInSeconds, + new MetadataColumn(4, 0, "DurationInSeconds", new ColumnInfo(ColumnType.Double)), // DurationInSeconds, + new MetadataColumn(5, 0, "FormatName", new ColumnInfo(ColumnType.String, 256)), // Format name, + new MetadataColumn(6, 0, "BitRate", new ColumnInfo(ColumnType.Int)), // BitRate, + }; public PhyOpVideoChunker(RowProvider rowProvider, TimeSpan chunkLength, SourceProvidersSignatures.VideoChunkerProvider videoChunkerCallback) { this.rowProvider = rowProvider; this.chunkLength = chunkLength; this.videoChunkProvider = videoChunkerCallback; + + this.outputColumns = new MetadataColumn[rowProvider.ColumnInfo.Length + this.extensionColumns.Length]; + rowProvider.ColumnInfo.CopyTo(outputColumns, 0); + + int extensionPosition = 0; + for (int i = rowProvider.ColumnInfo.Length; i < outputColumns.Length; i++) + { + outputColumns[i] = new MetadataColumn( + extensionColumns[extensionPosition].ColumnId + rowProvider.ColumnInfo.Length, + extensionColumns[extensionPosition].TableId, + extensionColumns[extensionPosition].ColumnName, + extensionColumns[extensionPosition].ColumnType); + extensionPosition++; + } } - public MetadataColumn[] GetOutputColumns() => rowProvider.ColumnInfo; + public MetadataColumn[] GetOutputColumns() => this.outputColumns; public async IAsyncEnumerable Iterate(ITransaction tran) { @@ -64,36 +92,41 @@ public async IAsyncEnumerable Iterate(ITransaction tran) throw new FilePathColumnDoesntExist(); } - // Need to build projection of all the columns plus append the chunk_name to end. - ProjectExtendInfo.MappingType[] mappingTypes = new ProjectExtendInfo.MappingType[rowProvider.ColumnInfo.Length + 1]; + ProjectExtendInfo.MappingType[] mappingTypes = new ProjectExtendInfo.MappingType[rowProvider.ColumnInfo.Length + extensionColumns.Length]; - for (int i = 0; i < mappingTypes.Length - 1; i++) + for (int i = 0; i < rowProvider.ColumnInfo.Length; i++) { mappingTypes[i] = ProjectExtendInfo.MappingType.Projection; } - mappingTypes[mappingTypes.Length - 1] = ProjectExtendInfo.MappingType.Extension; + for (int i = rowProvider.ColumnInfo.Length; i < mappingTypes.Length; i++) + { + mappingTypes[i] = ProjectExtendInfo.MappingType.Extension; + } - int[] projectSourcePositions = new int[mappingTypes.Length - 1]; + int[] projectSourcePositions = new int[rowProvider.ColumnInfo.Length]; for (int i = 0; i < projectSourcePositions.Length; i++) { projectSourcePositions[i] = i; } - ColumnInfo[] extendedColumnInfo = new[] { new ColumnInfo(ColumnType.String, 256) }; - ProjectExtendInfo extendInfo = new ProjectExtendInfo(mappingTypes, projectSourcePositions, extendedColumnInfo); - - int chunkNamePosition = mappingTypes.Length - 1; + ProjectExtendInfo extendInfo = new ProjectExtendInfo(mappingTypes, projectSourcePositions, extensionColumns.Select(ec => ec.ColumnType).ToArray()); await foreach (RowHolder row in rowProvider.Enumerator) { string filePath = new string(row.GetStringField(filePathColumnId)); - SourceProvidersSignatures.VideoChunkerResult videoChunkerResult = await this.videoChunkProvider(filePath, this.chunkLength, tran); - foreach (string chunkPath in videoChunkerResult.ChunkPaths) + VideoChunkerResult[] videoChunkerResult = await this.videoChunkProvider(filePath, this.chunkLength, tran); + foreach (VideoChunkerResult videoChunk in videoChunkerResult) { RowHolder expended = row.ProjectAndExtend(extendInfo); - expended.SetField(chunkNamePosition, chunkPath.ToCharArray()); + expended.SetField(rowProvider.ColumnInfo.Length + 0, videoChunk.ChunkPath.ToCharArray()); + expended.SetField(rowProvider.ColumnInfo.Length + 1, videoChunk.NbStreams); + expended.SetField(rowProvider.ColumnInfo.Length + 2, videoChunk.NbPrograms); + expended.SetField(rowProvider.ColumnInfo.Length + 3, videoChunk.StartTimeInSeconds); + expended.SetField(rowProvider.ColumnInfo.Length + 4, videoChunk.DurationInSeconds); + expended.SetField(rowProvider.ColumnInfo.Length + 5, videoChunk.FormatName.ToCharArray()); + expended.SetField(rowProvider.ColumnInfo.Length + 6, videoChunk.BitRate); yield return expended; } diff --git a/QueryProcessing/SourceProvidersSignatures.cs b/QueryProcessing/SourceProvidersSignatures.cs index 312f449..af3f213 100644 --- a/QueryProcessing/SourceProvidersSignatures.cs +++ b/QueryProcessing/SourceProvidersSignatures.cs @@ -8,9 +8,15 @@ public static class SourceProvidersSignatures { public struct VideoChunkerResult { - public string[] ChunkPaths; + public string ChunkPath; + public int NbStreams; + public int NbPrograms; + public double StartTimeInSeconds; + public double DurationInSeconds; + public string FormatName; + public int BitRate; } - public delegate Task VideoChunkerProvider(string videoPath, TimeSpan timespan, ITransaction tran); + public delegate Task VideoChunkerProvider(string videoPath, TimeSpan timespan, ITransaction tran); } } diff --git a/VideoProcessing/FfProbeOutputSerializer.cs b/VideoProcessing/FfProbeOutputSerializer.cs index b261cce..9effcd8 100644 --- a/VideoProcessing/FfProbeOutputSerializer.cs +++ b/VideoProcessing/FfProbeOutputSerializer.cs @@ -48,6 +48,9 @@ public class FormatSerializer [JsonProperty("duration")] public double DurationInSeconds; + [JsonProperty("start_time")] + public double StartTimeInSeconds; + [JsonProperty("format_name")] public string FormatName; diff --git a/VideoProcessing/SourceRegistration.cs b/VideoProcessing/SourceRegistration.cs index 2efb237..38541a0 100644 --- a/VideoProcessing/SourceRegistration.cs +++ b/VideoProcessing/SourceRegistration.cs @@ -1,22 +1,36 @@ using PageManager; -using QueryProcessing; using System; using System.Threading; +using static QueryProcessing.SourceProvidersSignatures; namespace VideoProcessing { public static class SourceRegistration { - public static SourceProvidersSignatures.VideoChunkerProvider VideoChunkerCallback(FfmpegVideoChunker videoChunker) + public static VideoChunkerProvider VideoChunkerCallback(FfmpegVideoChunker videoChunker, FfmpegProbeWrapper videoProbe) { return async (string path, TimeSpan chunk, ITransaction tran) => { string[] chunkPaths = await videoChunker.Execute(path, chunk, tran, CancellationToken.None); - return new SourceProvidersSignatures.VideoChunkerResult + VideoChunkerResult[] chunkerResult = new VideoChunkerResult[chunkPaths.Length]; + + int i = 0; + foreach (string videoChunk in chunkPaths) { - ChunkPaths = chunkPaths, - }; + FfProbeOutputSerializer probeOutput = await videoProbe.Execute(videoChunk, CancellationToken.None); + chunkerResult[i].ChunkPath = videoChunk; + chunkerResult[i].NbStreams= probeOutput.Format.NbStreams; + chunkerResult[i].NbPrograms= probeOutput.Format.NbPrograms; + chunkerResult[i].StartTimeInSeconds = probeOutput.Format.StartTimeInSeconds; + chunkerResult[i].DurationInSeconds = probeOutput.Format.DurationInSeconds; + chunkerResult[i].FormatName = probeOutput.Format.FormatName; + chunkerResult[i].BitRate = probeOutput.Format.BitRate; + + i++; + } + + return chunkerResult; }; } } diff --git a/tests/E2EQueryExecutionTests/BaseTestSetup.cs b/tests/E2EQueryExecutionTests/BaseTestSetup.cs index d76f5b2..7c8ba43 100644 --- a/tests/E2EQueryExecutionTests/BaseTestSetup.cs +++ b/tests/E2EQueryExecutionTests/BaseTestSetup.cs @@ -41,7 +41,8 @@ await using (ITransaction tran = this.logManager.CreateTransaction(pageManager, metadataManager = new MetadataManager.MetadataManager(pageManager, stringHeap, pageManager, logManager); var videoChunker = new FfmpegVideoChunker(GetTempFolderPath(), TestGlobals.TestFileLogger); - var videoChunkerCallback = SourceRegistration.VideoChunkerCallback(videoChunker); + var videoProbe = new FfmpegProbeWrapper(TestGlobals.TestFileLogger); + var videoChunkerCallback = SourceRegistration.VideoChunkerCallback(videoChunker, videoProbe); AstToOpTreeBuilder treeBuilder = new AstToOpTreeBuilder(metadataManager, videoChunkerCallback); this.queryEntryGate = new QueryEntryGate( diff --git a/tests/E2EQueryExecutionTests/FileSystemProviderTests.cs b/tests/E2EQueryExecutionTests/FileSystemProviderTests.cs index b29db5a..d7a11d7 100644 --- a/tests/E2EQueryExecutionTests/FileSystemProviderTests.cs +++ b/tests/E2EQueryExecutionTests/FileSystemProviderTests.cs @@ -1,5 +1,6 @@ using NUnit.Framework; using PageManager; +using QueryProcessing; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; @@ -111,7 +112,15 @@ public async Task VideoChunkerTest() RowHolder[] result = await this.queryEntryGate.Execute(query, tran).ToArrayAsync(); await tran.Commit(); - Assert.AreEqual(5, result.Length); + RowProvider rowProvider = await this.queryEntryGate.BuildExecutionTree(query, tran); + + RowHolder[] rows = await rowProvider.Enumerator.ToArrayAsync(); + Assert.AreEqual(5, rows.Length); + + foreach (RowHolder row in rows) + { + Assert.AreEqual(rowProvider.GetValue(row, "FormatName"), "matroska,webm"); + } } } } diff --git a/tests/VideoProcessingTests/FFMpegVideoSplitTests.cs b/tests/VideoProcessingTests/FFMpegVideoSplitTests.cs index f987f96..1ec40d0 100644 --- a/tests/VideoProcessingTests/FFMpegVideoSplitTests.cs +++ b/tests/VideoProcessingTests/FFMpegVideoSplitTests.cs @@ -28,7 +28,7 @@ private static string GetTempFolderPath() } [Test] - public async Task FFmpegVideoChunkerTests() + public async Task FFmpegVideoChunkerTest() { var videoChunker = new FfmpegVideoChunker(GetTempFolderPath(), new NoOpLogging()); @@ -36,5 +36,21 @@ public async Task FFmpegVideoChunkerTests() Assert.AreEqual(5, chunkPaths.Length); } + + [Test] + public async Task TaskFFmpegChunkPlusProbeTest() + { + + var videoChunker = new FfmpegVideoChunker(GetTempFolderPath(), new NoOpLogging()); + var probe = new FfmpegProbeWrapper(new NoOpLogging()); + + string[] chunkPaths = await videoChunker.Execute(GetExampleVideoPath(), TimeSpan.FromSeconds(10), new DummyTran(), CancellationToken.None); + + Assert.AreEqual(5, chunkPaths.Length); + foreach (string chunkPath in chunkPaths) + { + var probeOutput = await probe.Execute(chunkPath, CancellationToken.None); + } + } } }