diff --git a/QueryProcessing/PhyOpProject.cs b/QueryProcessing/PhyOpProject.cs index ff6d7fd..8f305f8 100644 --- a/QueryProcessing/PhyOpProject.cs +++ b/QueryProcessing/PhyOpProject.cs @@ -1,24 +1,41 @@ using MetadataManager; using PageManager; +using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; namespace QueryProcessing { + public class PhyOpProjectComputeFunctors + { + public Func Projector { get; } + public Action Computer { get; } + + public PhyOpProjectComputeFunctors( + Func projector, + Action computer) + { + this.Projector = projector; + this.Computer = computer; + } + } + public class PhyOpProject : IPhysicalOperator { private IPhysicalOperator source; - private int[] columnChooser; + private PhyOpProjectComputeFunctors functors; + private MetadataColumn[] outputMd; bool isStar = false; int? topRows; - public PhyOpProject(IPhysicalOperator source, int[] columnChooser, int? topRows) + public PhyOpProject(IPhysicalOperator source, PhyOpProjectComputeFunctors functors, MetadataColumn[] outputMd, int? topRows) { this.source = source; - this.columnChooser = columnChooser; + this.functors = functors; this.topRows = topRows; this.isStar = false; + this.outputMd = outputMd; } public PhyOpProject(IPhysicalOperator source, int? topRows) @@ -26,6 +43,7 @@ public PhyOpProject(IPhysicalOperator source, int? topRows) this.source = source; this.topRows = topRows; this.isStar = true; + this.outputMd = source.GetOutputColumns(); } public async IAsyncEnumerable Iterate(ITransaction tran) @@ -38,7 +56,9 @@ await foreach (RowHolder row in this.source.Iterate(tran)) } else { - yield return row.Project(this.columnChooser); + RowHolder project = this.functors.Projector(row); + this.functors.Computer(row, project); + yield return project; } if (--this.topRows == 0) @@ -53,17 +73,6 @@ public async Task Invoke() await Task.FromResult(0); } - public MetadataColumn[] GetOutputColumns() - { - if (isStar) - { - return source.GetOutputColumns(); - } - else - { - MetadataColumn[] sourceColumns = source.GetOutputColumns(); - return columnChooser.Select(cc => sourceColumns[cc]).ToArray(); - } - } + public MetadataColumn[] GetOutputColumns() => outputMd; } } diff --git a/QueryProcessing/ProjectOpBuilder.cs b/QueryProcessing/ProjectOpBuilder.cs index 119eed7..ad27a9f 100644 --- a/QueryProcessing/ProjectOpBuilder.cs +++ b/QueryProcessing/ProjectOpBuilder.cs @@ -9,8 +9,199 @@ namespace QueryProcessing { + class ProjectOrApplyFuncFunctors + { + /// + /// Functor that projects rowholder into new one. + /// Fields that are to be copied are copied. + /// Fields on which some execution is to be done + /// (e.g. func call) are kept empty and initialized + /// with default value. + /// + public Func ProjectAndExtend { get; } + + /// + /// Applies computations on initialized RowHolder + /// (i.e. one passed from ProjectAndExtend). + /// + public Action ApplyCompute { get; } + + public MetadataColumn[] ProjectColumnInfo { get; } + + public ProjectOrApplyFuncFunctors( + Func projectAndExtend, + Action applyCompute, + MetadataColumn[] projectColumnInfo) + { + this.ProjectAndExtend = projectAndExtend; + this.ApplyCompute = applyCompute; + this.ProjectColumnInfo = projectColumnInfo; + } + } + class ProjectOpBuilder : IStatementTreeBuilder { + private MetadataColumn[] GetOutputSchema(Sql.columnSelect[] columns, IPhysicalOperator source) + { + MetadataColumn[] result = new MetadataColumn[columns.Length]; + int pos = 0; + foreach (Sql.columnSelect column in columns) + { + if (column.IsProjection) + { + var projection = ((Sql.columnSelect.Projection)column); + if (!projection.Item.IsId) + { + throw new Exception("Projection on non id is not supported"); + } + + string projectionId = ((Sql.value.Id)projection.Item).Item; + result[pos] = QueryProcessingAccessors.GetMetadataColumn(projectionId, source.GetOutputColumns()); + } + else if (column.IsFunc) + { + var func = ((Sql.columnSelect.Func)column).Item; + Sql.FuncType funcType = func.Item1; + // TODO: Map func type to the right function. + // This should go to new class. + Sql.scalarArgs args = func.Item2; + + if (funcType.IsAdd) + { + if (!args.IsArgs2) + { + throw new Exception("Sum requires 2 arguments"); + } + + var args2 = ((Sql.scalarArgs.Args2)args).Item; + Sql.value argOne = args2.Item1; + Sql.value argTwo = args2.Item2; + // TODO: Some rules should be applied here to determine output type. + // TODO: For now always return int. + // TODO is keeping 0, 0 here ok? + result[pos] = new MetadataColumn(0, 0, "ADD_Result", new ColumnInfo(ColumnType.Int)); + } + else + { + // TODO: + throw new NotImplementedException(); + } + } + + pos++; + } + + return result; + } + + private (int?, ColumnInfo?)[] BuildProjectExtendInfo(Sql.columnSelect[] columns, IPhysicalOperator source) + { + (int?, ColumnInfo?)[] extendInfo = columns.Select(c => + { + if (c.IsProjection) + { + var projection = ((Sql.columnSelect.Projection)c); + if (!projection.Item.IsId) + { + throw new Exception("Projection on non id is not supported"); + } + + string projectionId = ((Sql.value.Id)projection.Item).Item; + MetadataColumn mc = QueryProcessingAccessors.GetMetadataColumn(projectionId, source.GetOutputColumns()); + return (mc.ColumnId, null); + } + else if (c.IsFunc) + { + var func = ((Sql.columnSelect.Func)c).Item; + + Sql.FuncType funcType = func.Item1; + // TODO: Map func type to the right function. + // This should go to new class. + Sql.scalarArgs args = func.Item2; + + if (funcType.IsAdd) + { + if (!args.IsArgs2) + { + throw new Exception("Sum requires 2 arguments"); + } + + var args2 = ((Sql.scalarArgs.Args2)args).Item; + Sql.value argOne = args2.Item1; + Sql.value argTwo = args2.Item2; + // TODO: Some rules should be applied here to determine output type. + // TODO: For now always return int. + return (null, new ColumnInfo(ColumnType.Int)); + } + else + { + // TODO: + throw new NotImplementedException(); + } + } + else + { + throw new Exception("Invalid type in select"); + } + }).ToArray(); + + return extendInfo; + } + + private Action ExecuteComputeOnRowHolder(IEnumerable selects, MetadataColumn[] sourceColumns) + { + int outputPosition = 0; + List> listOfActions = new List>(); + + foreach (var select in selects) + { + if (!select.IsFunc) + { + outputPosition++; + continue; + } + + var func = ((Sql.columnSelect.Func)select).Item; + Sql.FuncType funcType = func.Item1; + // TODO: Map func type to the right function. + // This should go to new class. + Sql.scalarArgs args = func.Item2; + + if (funcType.IsAdd) + { + Sql.scalarArgs.Args2 argsExtracted = (Sql.scalarArgs.Args2)args; + Sql.value arg1 = argsExtracted.Item.Item1; + Sql.value arg2 = argsExtracted.Item.Item2; + + if (!arg1.IsId || !arg2.IsId) + { + // TODO: + throw new Exception("Only support for ids as function arguments"); + } + + Sql.value.Id arg1Id = (Sql.value.Id)(arg1); + Sql.value.Id arg2Id = (Sql.value.Id)(arg2); + + MetadataColumn mc1 = QueryProcessingAccessors.GetMetadataColumn(arg1Id.Item, sourceColumns); + MetadataColumn mc2 = QueryProcessingAccessors.GetMetadataColumn(arg2Id.Item, sourceColumns); + + listOfActions.Add((RowHolder inputRh, RowHolder outputRh) => + { + QueryProcessingAccessors.ApplyFuncInPlace( + new MetadataColumn[] { mc1, mc2 }, + outputPosition, + inputRh, + outputRh, + funcType + ); + }); + } + } + + // Execute all the actions. + return ((RowHolder input, RowHolder output) => listOfActions.ForEach(act => act(input, output))); + } + public Task> BuildStatement(Sql.sqlStatement statement, ITransaction tran, IPhysicalOperator source, InputStringNormalizer inputStringNormalizer) { Sql.columnSelect[] columns = new Sql.columnSelect[0]; @@ -59,24 +250,14 @@ public Task> BuildStatement(Sql.sqlStatement statem // Project Op. List columnMapping = new List(); - foreach (Sql.columnSelect selectColumn in columns) - { - if (selectColumn.IsProjection) - { - // TODO: What about func here? - var projection = ((Sql.columnSelect.Projection)selectColumn); - if (!projection.Item.IsId) - { - throw new Exception("Projection on non id is not supported"); - } + (int?, ColumnInfo?)[] extendInfo = this.BuildProjectExtendInfo(columns, source); + Func projector = (rowHolder) => rowHolder.ProjectAndExtend(extendInfo); + Action computes = ExecuteComputeOnRowHolder(columns, source.GetOutputColumns()); + MetadataColumn[] outputSchema = this.GetOutputSchema(columns, source); - string projectionId = ((Sql.value.Id)projection.Item).Item; - MetadataColumn mc = QueryProcessingAccessors.GetMetadataColumn(projectionId, source.GetOutputColumns()); - columnMapping.Add(mc); - } - } + PhyOpProjectComputeFunctors functors = new PhyOpProjectComputeFunctors(projector, computes); - IPhysicalOperator projectOp = new PhyOpProject(source, columnMapping.Select(mc => mc.ColumnId).ToArray(), topRows); + IPhysicalOperator projectOp = new PhyOpProject(source, functors, outputSchema, topRows); return Task.FromResult(projectOp); } } diff --git a/QueryProcessing/QueryProcessingAccessors.cs b/QueryProcessing/QueryProcessingAccessors.cs index ff538ef..424521f 100644 --- a/QueryProcessing/QueryProcessingAccessors.cs +++ b/QueryProcessing/QueryProcessingAccessors.cs @@ -104,6 +104,40 @@ public static IComparable MetadataColumnRowsetHolderFetcher(MetadataColumn mc, R } } + /// + /// Applies given function to given row holder. + /// + /// List of source metadata columns, ordered by scalar args. First arg maps to sourcesMds[0] and so on. Null if arg is not an id. + /// Target positions in output rowholder. + /// Input rowholder + /// Func type. + /// Arguments + /// Output rowholder. + public static void ApplyFuncInPlace(MetadataColumn[] sourcesMds, int outputPosition, RowHolder inputRowHolder, RowHolder outputRowHolder, Sql.FuncType funcType) + { + if (funcType.IsAdd) + { + MetadataColumn columnOneMd = sourcesMds[0]; + MetadataColumn columnTwoMd = sourcesMds[1]; + + if (columnOneMd.ColumnType.ColumnType != ColumnType.Int || columnTwoMd.ColumnType.ColumnType != ColumnType.Int) + { + // TODO: + throw new Exception("Only support for ints in sum"); + } + + int argOneExtracted = inputRowHolder.GetField(columnOneMd.ColumnId); + int argTwoExtracted = inputRowHolder.GetField(columnTwoMd.ColumnId); + + int res = argOneExtracted + argTwoExtracted; + outputRowHolder.SetField(outputPosition, res); + } + else + { + throw new NotImplementedException(); + } + } + // TODO: This is just bad. // It is very hard to keep all type -> agg mappings. // Needs refactoring. diff --git a/tests/E2EQueryExecutionTests/SimpleE2ETests.cs b/tests/E2EQueryExecutionTests/SimpleE2ETests.cs index 8ec9a4e..12f0482 100644 --- a/tests/E2EQueryExecutionTests/SimpleE2ETests.cs +++ b/tests/E2EQueryExecutionTests/SimpleE2ETests.cs @@ -185,6 +185,60 @@ await using (ITransaction tran = this.logManager.CreateTransaction(pageManager, } } + [Test] + public async Task SelectWithFunc() + { + await using (ITransaction tran = this.logManager.CreateTransaction(pageManager, "CREATE_TABLE")) + { + string createTableQuery = "CREATE TABLE T1 (TYPE_INT a, TYPE_DOUBLE b, TYPE_STRING(10) c)"; + await this.queryEntryGate.Execute(createTableQuery, tran).ToArrayAsync(); + await tran.Commit(); + } + + await using (ITransaction tran = this.logManager.CreateTransaction(pageManager, "INSERT")) + { + for (int i = 0; i < 20; i++) + { + string insertQuery = $"INSERT INTO T1 VALUES ({i}, 1.1, 'mystring')"; + await this.queryEntryGate.Execute(insertQuery, tran).ToArrayAsync(); + } + + await tran.Commit(); + } + + await using (ITransaction tran = this.logManager.CreateTransaction(pageManager, "GET_ROWS")) + { + string query = @"SELECT ADD(a, a) FROM T1"; + RowHolder[] result = await this.queryEntryGate.Execute(query, tran).ToArrayAsync(); + Assert.AreEqual(20, result.Length); + + int i = 0; + foreach (var row in result) + { + Assert.AreEqual(i * 2, row.GetField(0)); + i++; + } + + await tran.Commit(); + } + + await using (ITransaction tran = this.logManager.CreateTransaction(pageManager, "GET_ROWS")) + { + string query = @"SELECT TOP 5 ADD(a, a) FROM T1"; + RowHolder[] result = await this.queryEntryGate.Execute(query, tran).ToArrayAsync(); + Assert.AreEqual(5, result.Length); + + int i = 0; + foreach (var row in result) + { + Assert.AreEqual(i * 2, row.GetField(0)); + i++; + } + + await tran.Commit(); + } + } + [Test] public async Task E2EWithRollback() { diff --git a/tests/QueryProcessingTests/PhyOpProjectTests.cs b/tests/QueryProcessingTests/PhyOpProjectTests.cs new file mode 100644 index 0000000..c2e83ad --- /dev/null +++ b/tests/QueryProcessingTests/PhyOpProjectTests.cs @@ -0,0 +1,122 @@ +using DataStructures; +using LogManager; +using MetadataManager; +using NUnit.Framework; +using PageManager; +using QueryProcessing; +using System.Collections.Generic; +using System.IO; +using System.Threading.Tasks; +using Test.Common; + +namespace QueryProcessingTests +{ + public class PhyOpProjectTests + { + private PhyOpScan scan; + private ITransaction tran; + + [SetUp] + public async Task Setup() + { + ILogManager logManager; + MetadataManager.MetadataManager metadataManager; + IPageManager allocator; + MetadataTable table; + ColumnInfo[] columnInfos; + + allocator = new PageManager.PageManager(4096, TestGlobals.DefaultEviction, TestGlobals.DefaultPersistedStream); + logManager = new LogManager.LogManager(new BinaryWriter(new MemoryStream())); + ITransaction setupTran = logManager.CreateTransaction(allocator); + StringHeapCollection stringHeap = new StringHeapCollection(allocator, setupTran); + metadataManager = new MetadataManager.MetadataManager(allocator, stringHeap, allocator, logManager); + var tm = metadataManager.GetTableManager(); + + tran = logManager.CreateTransaction(allocator); + columnInfos = new[] { new ColumnInfo(ColumnType.Int), new ColumnInfo(ColumnType.String, 1), new ColumnInfo(ColumnType.Double) }; + int id = await tm.CreateObject(new TableCreateDefinition() + { + TableName = "Table", + ColumnNames = new[] { "a", "b", "c" }, + ColumnTypes = columnInfos, + }, tran); + + await tran.Commit(); + + tran = logManager.CreateTransaction(allocator); + table = await tm.GetById(id, tran); + await tran.Commit(); + + List source = new List(); + for (int i = 0; i < 5; i++) + { + var rhf = new RowHolder(new[] { new ColumnInfo(ColumnType.Int), new ColumnInfo(ColumnType.String, 1), new ColumnInfo(ColumnType.Double) }); + rhf.SetField(0, i); + rhf.SetField(1, i.ToString().ToCharArray()); + rhf.SetField(2, i + 0.1); + source.Add(rhf); + } + + PhyOpStaticRowProvider opStatic = new PhyOpStaticRowProvider(source); + + tran = logManager.CreateTransaction(allocator); + PhyOpTableInsert op = new PhyOpTableInsert(table.Collection, opStatic); + await op.Iterate(tran).AllResultsAsync(); + await tran.Commit(); + + tran = logManager.CreateTransaction(allocator); + PageListCollection pcl = new PageListCollection(allocator, columnInfos, table.RootPage); + scan = new PhyOpScan(pcl, tran, table.Columns, "Table"); + } + + [Test] + public async Task ValidateSimpleProject() + { + PhyOpProjectComputeFunctors functors = new PhyOpProjectComputeFunctors( + projector: (rhf) => rhf.Project(new int[] { 0 }), + computer: (r1, r2) => { } + ); + + PhyOpProject project = new PhyOpProject(this.scan, functors, new MetadataColumn[] { this.scan.GetOutputColumns()[0] }, null); + + List result = new List(); + await foreach (var row in project.Iterate(this.tran)) + { + result.Add(row); + } + + Assert.AreEqual(5, result.Count); + for (int i = 0; i < 5; i++) + { + Assert.AreEqual(i, result[i].GetField(0)); + } + } + + [Test] + public async Task ValidateFuncProject() + { + PhyOpProjectComputeFunctors functors = new PhyOpProjectComputeFunctors( + projector: (rhf) => rhf.Project(new int[] { 0 }), + computer: (r1, r2) => + { + int res = r1.GetField(0) + r2.GetField(0); + r2.SetField(0, res); + } + ); + + PhyOpProject project = new PhyOpProject(this.scan, functors, new MetadataColumn[] { this.scan.GetOutputColumns()[0] }, null); + + List result = new List(); + await foreach (var row in project.Iterate(this.tran)) + { + result.Add(row); + } + + Assert.AreEqual(5, result.Count); + for (int i = 0; i < 5; i++) + { + Assert.AreEqual(i * 2, result[i].GetField(0)); + } + } + } +}