Skip to content

Commit

Permalink
BugFix: JoinOperation errors are not propagated to the main EtlProces…
Browse files Browse the repository at this point in the history
…s with GetAllErrors method. Add AbstractJoinOperation class.
  • Loading branch information
Cortembert, Rudy authored and Cortembert, Rudy committed Feb 24, 2015
1 parent 84bf757 commit 498b52f
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 189 deletions.
120 changes: 16 additions & 104 deletions Rhino.Etl.Core/Operations/JoinOperation.cs
@@ -1,22 +1,19 @@
namespace Rhino.Etl.Core.Operations
{
using Enumerables;
using System;
using System.Collections.Generic;
using Enumerables;

/// <summary>
/// Perform a join between two sources. The left part of the join is optional and if not specified it will use the current pipeline as input.
/// </summary>
public abstract class JoinOperation : AbstractOperation
public abstract class JoinOperation : AbstractJoinOperation
{
private readonly PartialProcessOperation left = new PartialProcessOperation();
private readonly PartialProcessOperation right = new PartialProcessOperation();
private JoinType jointype;
private string[] leftColumns;
private string[] rightColumns;
private Dictionary<Row, object> rightRowsWereMatched = new Dictionary<Row, object>();
private Dictionary<ObjectArrayKeys, List<Row>> rightRowsByJoinKey = new Dictionary<ObjectArrayKeys, List<Row>>();
private bool leftRegistered = false;

/// <summary>
/// Sets the right part of the join
Expand All @@ -28,7 +25,6 @@ public JoinOperation Right(IOperation value)
return this;
}


/// <summary>
/// Sets the left part of the join
/// </summary>
Expand All @@ -49,6 +45,10 @@ public override IEnumerable<Row> Execute(IEnumerable<Row> rows)
{
PrepareForJoin();

SetupJoinConditions();
Guard.Against(leftColumns == null, "You must setup the left columns");
Guard.Against(rightColumns == null, "You must setup the right columns");

IEnumerable<Row> rightEnumerable = GetRightEnumerable();

IEnumerable<Row> execute = left.Execute(leftRegistered ? null : rows);
Expand Down Expand Up @@ -86,19 +86,6 @@ public override IEnumerable<Row> Execute(IEnumerable<Row> rows)
}
}

private void PrepareForJoin()
{
Initialize();

Guard.Against(left == null, "Left branch of a join cannot be null");
Guard.Against(right == null, "Right branch of a join cannot be null");

SetupJoinConditions();

Guard.Against(leftColumns == null, "You must setup the left columns");
Guard.Against(rightColumns == null, "You must setup the right columns");
}

private IEnumerable<Row> GetRightEnumerable()
{
IEnumerable<Row> rightEnumerable = new CachingEnumerable<Row>(
Expand All @@ -117,42 +104,6 @@ private IEnumerable<Row> GetRightEnumerable()
return rightEnumerable;
}

/// <summary>
/// Called when a row on the right side was filtered by
/// the join condition, allow a derived class to perform
/// logic associated to that, such as logging
/// </summary>
protected virtual void RightOrphanRow(Row row)
{

}

/// <summary>
/// Called when a row on the left side was filtered by
/// the join condition, allow a derived class to perform
/// logic associated to that, such as logging
/// </summary>
/// <param name="row">The row.</param>
protected virtual void LeftOrphanRow(Row row)
{

}

/// <summary>
/// Merges the two rows into a single row
/// </summary>
/// <param name="leftRow">The left row.</param>
/// <param name="rightRow">The right row.</param>
/// <returns></returns>
protected abstract Row MergeRows(Row leftRow, Row rightRow);

/// <summary>
/// Initializes this instance.
/// </summary>
protected virtual void Initialize()
{
}

/// <summary>
/// Setups the join conditions.
/// </summary>
Expand All @@ -176,7 +127,6 @@ protected JoinBuilder LeftJoin
get { return new JoinBuilder(this, JoinType.Left); }
}


/// <summary>
/// Create a right outer join
/// </summary>
Expand All @@ -186,7 +136,6 @@ protected JoinBuilder RightJoin
get { return new JoinBuilder(this, JoinType.Right); }
}


/// <summary>
/// Create a full outer join
/// </summary>
Expand All @@ -196,43 +145,6 @@ protected JoinBuilder FullOuterJoin
get { return new JoinBuilder(this, JoinType.Full); }
}


/// <summary>
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
/// </summary>
public override void Dispose()
{
left.Dispose();
right.Dispose();
}


/// <summary>
/// Initializes this instance
/// </summary>
/// <param name="pipelineExecuter">The current pipeline executer.</param>
public override void PrepareForExecution(IPipelineExecuter pipelineExecuter)
{
left.PrepareForExecution(pipelineExecuter);
right.PrepareForExecution(pipelineExecuter);
}

/// <summary>
/// Gets all errors that occured when running this operation
/// </summary>
/// <returns></returns>
public override IEnumerable<Exception> GetAllErrors()
{
foreach (Exception error in left.GetAllErrors())
{
yield return error;
}
foreach (Exception error in right.GetAllErrors())
{
yield return error;
}
}

/// <summary>
/// Fluent interface to create joins
/// </summary>
Expand Down Expand Up @@ -277,36 +189,36 @@ public JoinBuilder Right(params string[] columns)
/// <summary>
/// Occurs when a row is processed.
/// </summary>
public override event Action<IOperation, Row> OnRowProcessed
public override event Action<IOperation, Row> OnRowProcessed
{
add
{
foreach (IOperation operation in new[] { left, right })
operation.OnRowProcessed += value;
base.OnRowProcessed += value;
foreach (IOperation operation in new[] { left, right })
operation.OnRowProcessed += value;
base.OnRowProcessed += value;
}
remove
{
foreach (IOperation operation in new[] { left, right })
operation.OnRowProcessed -= value;
base.OnRowProcessed -= value;
foreach (IOperation operation in new[] { left, right })
operation.OnRowProcessed -= value;
base.OnRowProcessed -= value;
}
}

/// <summary>
/// Occurs when all the rows has finished processing.
/// </summary>
public override event Action<IOperation> OnFinishedProcessing
public override event Action<IOperation> OnFinishedProcessing
{
add
{
foreach (IOperation operation in new[] { left, right })
foreach (IOperation operation in new[] { left, right })
operation.OnFinishedProcessing += value;
base.OnFinishedProcessing += value;
}
remove
{
foreach (IOperation operation in new[] { left, right })
foreach (IOperation operation in new[] { left, right })
operation.OnFinishedProcessing -= value;
base.OnFinishedProcessing -= value;
}
Expand Down
93 changes: 8 additions & 85 deletions Rhino.Etl.Core/Operations/NestedLoopsJoinOperation.cs
@@ -1,20 +1,18 @@
namespace Rhino.Etl.Core.Operations
{
using Enumerables;
using System;
using System.Collections.Generic;
using Enumerables;

/// <summary>
/// Perform a join between two sources. The left part of the join is optional and if not specified it will use the current pipeline as input.
/// </summary>
public abstract class NestedLoopsJoinOperation : AbstractOperation
public abstract class NestedLoopsJoinOperation : AbstractJoinOperation
{
private readonly PartialProcessOperation left = new PartialProcessOperation();
private readonly PartialProcessOperation right = new PartialProcessOperation();
private static readonly string IsEmptyRowMarker = Guid.NewGuid().ToString();
private bool leftRegistered = false;

private Row currentRightRow, currentLeftRow;

/// <summary>
/// Sets the right part of the join
/// </summary>
Expand Down Expand Up @@ -43,10 +41,7 @@ public NestedLoopsJoinOperation Left(IOperation value)
/// <returns></returns>
public override IEnumerable<Row> Execute(IEnumerable<Row> rows)
{
Initialize();

Guard.Against(left == null, "Left branch of a join cannot be null");
Guard.Against(right == null, "Right branch of a join cannot be null");
PrepareForJoin();

Dictionary<Row, object> matchedRightRows = new Dictionary<Row, object>();
CachingEnumerable<Row> rightEnumerable = new CachingEnumerable<Row>(
Expand Down Expand Up @@ -93,35 +88,6 @@ public override IEnumerable<Row> Execute(IEnumerable<Row> rows)
}
}

/// <summary>
/// Called when a row on the right side was filtered by
/// the join condition, allow a derived class to perform
/// logic associated to that, such as logging
/// </summary>
protected virtual void RightOrphanRow(Row row)
{

}

/// <summary>
/// Called when a row on the left side was filtered by
/// the join condition, allow a derived class to perform
/// logic associated to that, such as logging
/// </summary>
/// <param name="row">The row.</param>
protected virtual void LeftOrphanRow(Row row)
{

}

/// <summary>
/// Merges the two rows into a single row
/// </summary>
/// <param name="leftRow">The left row.</param>
/// <param name="rightRow">The right row.</param>
/// <returns></returns>
protected abstract Row MergeRows(Row leftRow, Row rightRow);

/// <summary>
/// Check if the two rows match to the join condition.
/// </summary>
Expand All @@ -130,49 +96,6 @@ protected virtual void LeftOrphanRow(Row row)
/// <returns></returns>
protected abstract bool MatchJoinCondition(Row leftRow, Row rightRow);

/// <summary>
/// Initializes this instance.
/// </summary>
protected virtual void Initialize()
{
}

/// <summary>
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
/// </summary>
public override void Dispose()
{
left.Dispose();
right.Dispose();
}


/// <summary>
/// Initializes this instance
/// </summary>
/// <param name="pipelineExecuter">The current pipeline executer.</param>
public override void PrepareForExecution(IPipelineExecuter pipelineExecuter)
{
left.PrepareForExecution(pipelineExecuter);
right.PrepareForExecution(pipelineExecuter);
}

/// <summary>
/// Gets all errors that occured when running this operation
/// </summary>
/// <returns></returns>
public override IEnumerable<Exception> GetAllErrors()
{
foreach (Exception error in left.GetAllErrors())
{
yield return error;
}
foreach (Exception error in right.GetAllErrors())
{
yield return error;
}
}

/// <summary>
/// Perform an inner join equality on the two objects.
/// Null values are not considered equal
Expand All @@ -182,7 +105,7 @@ public override IEnumerable<Exception> GetAllErrors()
/// <returns></returns>
protected virtual bool InnerJoin(object left, object right)
{
if(IsEmptyRow(currentLeftRow) || IsEmptyRow(currentRightRow))
if (IsEmptyRow(currentLeftRow) || IsEmptyRow(currentRightRow))
return false;
if (left == null || right == null)
return false;
Expand All @@ -205,7 +128,7 @@ private static bool IsEmptyRow(Row row)
/// <returns></returns>
protected virtual bool LeftJoin(object left, object right)
{
if(IsEmptyRow(currentRightRow))
if (IsEmptyRow(currentRightRow))
return true;
if (left == null || right == null)
return false;
Expand All @@ -223,7 +146,7 @@ protected virtual bool LeftJoin(object left, object right)
/// <returns></returns>
protected virtual bool RightJoin(object left, object right)
{
if(IsEmptyRow(currentLeftRow))
if (IsEmptyRow(currentLeftRow))
return true;
if (left == null || right == null)
return false;
Expand All @@ -240,7 +163,7 @@ protected virtual bool RightJoin(object left, object right)
/// <returns></returns>
protected virtual bool FullJoin(object left, object right)
{
if(IsEmptyRow(currentLeftRow) || IsEmptyRow(currentRightRow))
if (IsEmptyRow(currentLeftRow) || IsEmptyRow(currentRightRow))
return true;
if (left == null || right == null)
return false;
Expand Down
1 change: 1 addition & 0 deletions Rhino.Etl.Core/Rhino.Etl.Core.csproj
Expand Up @@ -81,6 +81,7 @@
<Compile Include="Infrastructure\SqlCommandSet.cs" />
<Compile Include="Infrastructure\Use.cs" />
<Compile Include="Operations\AbstractBranchingOperation.cs" />
<Compile Include="Operations\AbstractJoinOperation.cs" />
<Compile Include="Operations\AbstractSortedAggregationOperation.cs" />
<Compile Include="Operations\BranchingOperation.cs" />
<Compile Include="Operations\JoinType.cs" />
Expand Down
1 change: 1 addition & 0 deletions Rhino.Etl.Tests/Rhino.Etl.Tests.csproj
Expand Up @@ -232,6 +232,7 @@
</None>
</ItemGroup>
<ItemGroup>
<Service Include="{82A7F48D-3B50-4B1E-B82E-3ADA8210C358}" />
<Service Include="{B4F97281-0DBD-4835-9ED8-7DFB966E87FF}" />
</ItemGroup>
<ItemGroup>
Expand Down

0 comments on commit 498b52f

Please sign in to comment.