Skip to content

Commit

Permalink
fixed incremental update issues
Browse files Browse the repository at this point in the history
  • Loading branch information
gary-holland committed Dec 11, 2020
1 parent 1853940 commit ddfba14
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 8 deletions.
4 changes: 2 additions & 2 deletions src/dexih.transforms/ReaderSql.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public override Task<bool> Open(long auditKey, SelectQuery requestQuery = null,

SelectQuery = requestQuery;
GeneratedQuery = GetGeneratedQuery(requestQuery);
GeneratedQuery.Alias = TableAlias;

if (GeneratedQuery?.Columns?.Count > 0)
{
Expand Down Expand Up @@ -117,8 +118,7 @@ protected override async Task<object[]> ReadRecord(CancellationToken cancellatio
{
_firstRead = false;
_sqlConnection = await ((ConnectionSql) ReferenceConnection).NewConnection(cancellationToken);
_sqlReader =
await ReferenceConnection.GetDatabaseReader(CacheTable, _sqlConnection, SelectQuery, cancellationToken);
_sqlReader = await ReferenceConnection.GetDatabaseReader(CacheTable, _sqlConnection, GeneratedQuery, cancellationToken);
_fieldCount = _sqlReader.FieldCount;
_fieldOrdinals = new List<int>();

Expand Down
7 changes: 7 additions & 0 deletions src/dexih.transforms/Transform.cs
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,13 @@ protected virtual SelectQuery GetGeneratedQuery(SelectQuery requestQuery)
else
{
columns = new SelectColumns(requestQuery.Columns);

// if there is an incremental column, we need to included it for deltas
var incrementalColumns = CacheTable.Columns.Where(c => c.IsIncrementalUpdate).ToArray();
if(incrementalColumns.Length == 1)
{
columns.AddIfNotExists(new SelectColumn(incrementalColumns[0]), c => c.Column.Name);
}
}

if (ReferenceConnection.CanFilter)
Expand Down
9 changes: 6 additions & 3 deletions src/dexih.transforms/TransformDelta.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1234,11 +1234,14 @@ public override Sorts RequiredSortFields()
foreach (var col in ReferenceTransform.CacheTable.GetColumns(EDeltaType.NaturalKey))
{
var primaryColumn = PrimaryTransform.CacheTable.Columns[col.Name];
if (primaryColumn == null)
//if (primaryColumn == null)
//{
// throw new Exception($"The delta could not run as the target table contains a column {col.Name} that does not have a matching input column.");
//}
if (primaryColumn != null)
{
throw new Exception($"The delta could not run as the target table contains a column {col.Name} that does not have a matching input column.");
fields.Add(new Sort(primaryColumn));
}
fields.Add(new Sort(primaryColumn));
}
var validFrom = ReferenceTransform.CacheTable.GetColumn(EDeltaType.ValidFromDate);
if (validFrom != null)
Expand Down
7 changes: 4 additions & 3 deletions src/dexih.transforms/TransformMapping.cs
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,12 @@ public override async Task<bool> Open(long auditKey, SelectQuery requestQuery =
Column = column,
Direction = sort.Direction
};
if (!mappedSorts.TryAdd(newSort, sort))
if (mappedSorts.TryAdd(newSort, sort))
{
throw new TransformException($"The column {column.Name} is duplicated in the sort. Check sorts and groups and avoid duplicate keys.");
newSorts.Add(newSort);
// throw new TransformException($"The column {column.Name} is duplicated in the sort. Check sorts and groups and avoid duplicate keys.");
}
newSorts.Add(newSort);

}

newSelectQuery.Sorts = newSorts;
Expand Down

0 comments on commit ddfba14

Please sign in to comment.