Skip to content

Commit

Permalink
Prototype for initial function implementation.
Browse files Browse the repository at this point in the history
Code is currently pretty ugly and there is a lot of overlap between group by and project ops.

Refactoring is needed.
  • Loading branch information
dasatomic committed May 20, 2021
1 parent 4277d7d commit 5e7144a
Show file tree
Hide file tree
Showing 5 changed files with 432 additions and 32 deletions.
41 changes: 25 additions & 16 deletions QueryProcessing/PhyOpProject.cs
Original file line number Diff line number Diff line change
@@ -1,31 +1,49 @@
using MetadataManager;
using PageManager;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace QueryProcessing
{
public class PhyOpProjectComputeFunctors
{
public Func<RowHolder, RowHolder> Projector { get; }
public Action<RowHolder, RowHolder> Computer { get; }

public PhyOpProjectComputeFunctors(
Func<RowHolder, RowHolder> projector,
Action<RowHolder, RowHolder> computer)
{
this.Projector = projector;
this.Computer = computer;
}
}

public class PhyOpProject : IPhysicalOperator<RowHolder>
{
private IPhysicalOperator<RowHolder> source;
private int[] columnChooser;
private PhyOpProjectComputeFunctors functors;
private MetadataColumn[] outputMd;
bool isStar = false;
int? topRows;

public PhyOpProject(IPhysicalOperator<RowHolder> source, int[] columnChooser, int? topRows)
public PhyOpProject(IPhysicalOperator<RowHolder> source, PhyOpProjectComputeFunctors functors, MetadataColumn[] outputMd, int? topRows)
{
this.source = source;
this.columnChooser = columnChooser;
this.functors = functors;
this.topRows = topRows;
this.isStar = false;
this.outputMd = outputMd;
}

public PhyOpProject(IPhysicalOperator<RowHolder> source, int? topRows)
{
this.source = source;
this.topRows = topRows;
this.isStar = true;
this.outputMd = source.GetOutputColumns();
}

public async IAsyncEnumerable<RowHolder> Iterate(ITransaction tran)
Expand All @@ -38,7 +56,9 @@ await foreach (RowHolder row in this.source.Iterate(tran))
}
else
{
yield return row.Project(this.columnChooser);
RowHolder project = this.functors.Projector(row);
this.functors.Computer(row, project);
yield return project;
}

if (--this.topRows == 0)
Expand All @@ -53,17 +73,6 @@ public async Task Invoke()
await Task.FromResult(0);
}

public MetadataColumn[] GetOutputColumns()
{
if (isStar)
{
return source.GetOutputColumns();
}
else
{
MetadataColumn[] sourceColumns = source.GetOutputColumns();
return columnChooser.Select(cc => sourceColumns[cc]).ToArray();
}
}
public MetadataColumn[] GetOutputColumns() => outputMd;
}
}
213 changes: 197 additions & 16 deletions QueryProcessing/ProjectOpBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,199 @@

namespace QueryProcessing
{
class ProjectOrApplyFuncFunctors
{
/// <summary>
/// Functor that projects rowholder into new one.
/// Fields that are to be copied are copied.
/// Fields on which some execution is to be done
/// (e.g. func call) are kept empty and initialized
/// with default value.
/// </summary>
public Func<RowHolder, RowHolder> ProjectAndExtend { get; }

/// <summary>
/// Applies computations on initialized RowHolder
/// (i.e. one passed from ProjectAndExtend).
/// </summary>
public Action<RowHolder> ApplyCompute { get; }

public MetadataColumn[] ProjectColumnInfo { get; }

public ProjectOrApplyFuncFunctors(
Func<RowHolder, RowHolder> projectAndExtend,
Action<RowHolder> applyCompute,
MetadataColumn[] projectColumnInfo)
{
this.ProjectAndExtend = projectAndExtend;
this.ApplyCompute = applyCompute;
this.ProjectColumnInfo = projectColumnInfo;
}
}

class ProjectOpBuilder : IStatementTreeBuilder
{
private MetadataColumn[] GetOutputSchema(Sql.columnSelect[] columns, IPhysicalOperator<RowHolder> source)
{
MetadataColumn[] result = new MetadataColumn[columns.Length];
int pos = 0;
foreach (Sql.columnSelect column in columns)
{
if (column.IsProjection)
{
var projection = ((Sql.columnSelect.Projection)column);
if (!projection.Item.IsId)
{
throw new Exception("Projection on non id is not supported");
}

string projectionId = ((Sql.value.Id)projection.Item).Item;
result[pos] = QueryProcessingAccessors.GetMetadataColumn(projectionId, source.GetOutputColumns());
}
else if (column.IsFunc)
{
var func = ((Sql.columnSelect.Func)column).Item;
Sql.FuncType funcType = func.Item1;
// TODO: Map func type to the right function.
// This should go to new class.
Sql.scalarArgs args = func.Item2;

if (funcType.IsAdd)
{
if (!args.IsArgs2)
{
throw new Exception("Sum requires 2 arguments");
}

var args2 = ((Sql.scalarArgs.Args2)args).Item;
Sql.value argOne = args2.Item1;
Sql.value argTwo = args2.Item2;
// TODO: Some rules should be applied here to determine output type.
// TODO: For now always return int.
// TODO is keeping 0, 0 here ok?
result[pos] = new MetadataColumn(0, 0, "ADD_Result", new ColumnInfo(ColumnType.Int));
}
else
{
// TODO:
throw new NotImplementedException();
}
}

pos++;
}

return result;
}

private (int?, ColumnInfo?)[] BuildProjectExtendInfo(Sql.columnSelect[] columns, IPhysicalOperator<RowHolder> source)
{
(int?, ColumnInfo?)[] extendInfo = columns.Select<Sql.columnSelect, (int?, ColumnInfo?)>(c =>
{
if (c.IsProjection)
{
var projection = ((Sql.columnSelect.Projection)c);
if (!projection.Item.IsId)
{
throw new Exception("Projection on non id is not supported");
}
string projectionId = ((Sql.value.Id)projection.Item).Item;
MetadataColumn mc = QueryProcessingAccessors.GetMetadataColumn(projectionId, source.GetOutputColumns());
return (mc.ColumnId, null);
}
else if (c.IsFunc)
{
var func = ((Sql.columnSelect.Func)c).Item;
Sql.FuncType funcType = func.Item1;
// TODO: Map func type to the right function.
// This should go to new class.
Sql.scalarArgs args = func.Item2;
if (funcType.IsAdd)
{
if (!args.IsArgs2)
{
throw new Exception("Sum requires 2 arguments");
}
var args2 = ((Sql.scalarArgs.Args2)args).Item;
Sql.value argOne = args2.Item1;
Sql.value argTwo = args2.Item2;
// TODO: Some rules should be applied here to determine output type.
// TODO: For now always return int.
return (null, new ColumnInfo(ColumnType.Int));
}
else
{
// TODO:
throw new NotImplementedException();
}
}
else
{
throw new Exception("Invalid type in select");
}
}).ToArray();

return extendInfo;
}

private Action<RowHolder, RowHolder> ExecuteComputeOnRowHolder(IEnumerable<Sql.columnSelect> selects, MetadataColumn[] sourceColumns)
{
int outputPosition = 0;
List<Action<RowHolder /* input */, RowHolder /* output */>> listOfActions = new List<Action<RowHolder, RowHolder>>();

foreach (var select in selects)
{
if (!select.IsFunc)
{
outputPosition++;
continue;
}

var func = ((Sql.columnSelect.Func)select).Item;
Sql.FuncType funcType = func.Item1;
// TODO: Map func type to the right function.
// This should go to new class.
Sql.scalarArgs args = func.Item2;

if (funcType.IsAdd)
{
Sql.scalarArgs.Args2 argsExtracted = (Sql.scalarArgs.Args2)args;
Sql.value arg1 = argsExtracted.Item.Item1;
Sql.value arg2 = argsExtracted.Item.Item2;

if (!arg1.IsId || !arg2.IsId)
{
// TODO:
throw new Exception("Only support for ids as function arguments");
}

Sql.value.Id arg1Id = (Sql.value.Id)(arg1);
Sql.value.Id arg2Id = (Sql.value.Id)(arg2);

MetadataColumn mc1 = QueryProcessingAccessors.GetMetadataColumn(arg1Id.Item, sourceColumns);
MetadataColumn mc2 = QueryProcessingAccessors.GetMetadataColumn(arg2Id.Item, sourceColumns);

listOfActions.Add((RowHolder inputRh, RowHolder outputRh) =>
{
QueryProcessingAccessors.ApplyFuncInPlace(
new MetadataColumn[] { mc1, mc2 },
outputPosition,
inputRh,
outputRh,
funcType
);
});
}
}

// Execute all the actions.
return ((RowHolder input, RowHolder output) => listOfActions.ForEach(act => act(input, output)));
}

public Task<IPhysicalOperator<RowHolder>> BuildStatement(Sql.sqlStatement statement, ITransaction tran, IPhysicalOperator<RowHolder> source, InputStringNormalizer inputStringNormalizer)
{
Sql.columnSelect[] columns = new Sql.columnSelect[0];
Expand Down Expand Up @@ -59,24 +250,14 @@ public Task<IPhysicalOperator<RowHolder>> BuildStatement(Sql.sqlStatement statem
// Project Op.
List<MetadataColumn> columnMapping = new List<MetadataColumn>();

foreach (Sql.columnSelect selectColumn in columns)
{
if (selectColumn.IsProjection)
{
// TODO: What about func here?
var projection = ((Sql.columnSelect.Projection)selectColumn);
if (!projection.Item.IsId)
{
throw new Exception("Projection on non id is not supported");
}
(int?, ColumnInfo?)[] extendInfo = this.BuildProjectExtendInfo(columns, source);
Func<RowHolder, RowHolder> projector = (rowHolder) => rowHolder.ProjectAndExtend(extendInfo);
Action<RowHolder, RowHolder> computes = ExecuteComputeOnRowHolder(columns, source.GetOutputColumns());
MetadataColumn[] outputSchema = this.GetOutputSchema(columns, source);

string projectionId = ((Sql.value.Id)projection.Item).Item;
MetadataColumn mc = QueryProcessingAccessors.GetMetadataColumn(projectionId, source.GetOutputColumns());
columnMapping.Add(mc);
}
}
PhyOpProjectComputeFunctors functors = new PhyOpProjectComputeFunctors(projector, computes);

IPhysicalOperator<RowHolder> projectOp = new PhyOpProject(source, columnMapping.Select(mc => mc.ColumnId).ToArray(), topRows);
IPhysicalOperator<RowHolder> projectOp = new PhyOpProject(source, functors, outputSchema, topRows);
return Task.FromResult(projectOp);
}
}
Expand Down
34 changes: 34 additions & 0 deletions QueryProcessing/QueryProcessingAccessors.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,40 @@ public static IComparable MetadataColumnRowsetHolderFetcher(MetadataColumn mc, R
}
}

/// <summary>
/// Applies given function to given row holder.
/// </summary>
/// <param name="sourcesMds">List of source metadata columns, ordered by scalar args. First arg maps to sourcesMds[0] and so on. Null if arg is not an id.</param>
/// <param name="outputPosition">Target positions in output rowholder.</param>
/// <param name="inputRowHolder">Input rowholder</param>
/// <param name="funcType">Func type.</param>
/// <param name="args">Arguments</param>
/// <param name="outputRowHolder">Output rowholder.</param>
public static void ApplyFuncInPlace(MetadataColumn[] sourcesMds, int outputPosition, RowHolder inputRowHolder, RowHolder outputRowHolder, Sql.FuncType funcType)
{
if (funcType.IsAdd)
{
MetadataColumn columnOneMd = sourcesMds[0];
MetadataColumn columnTwoMd = sourcesMds[1];

if (columnOneMd.ColumnType.ColumnType != ColumnType.Int || columnTwoMd.ColumnType.ColumnType != ColumnType.Int)
{
// TODO:
throw new Exception("Only support for ints in sum");
}

int argOneExtracted = inputRowHolder.GetField<int>(columnOneMd.ColumnId);
int argTwoExtracted = inputRowHolder.GetField<int>(columnTwoMd.ColumnId);

int res = argOneExtracted + argTwoExtracted;
outputRowHolder.SetField<int>(outputPosition, res);
}
else
{
throw new NotImplementedException();
}
}

// TODO: This is just bad.
// It is very hard to keep all type -> agg mappings.
// Needs refactoring.
Expand Down
Loading

0 comments on commit 5e7144a

Please sign in to comment.