Skip to content

Commit

Permalink
Merge pull request #61 from OleksiiZuiev/change-read-query
Browse files Browse the repository at this point in the history
Separate queries of stream row and events in read operation
  • Loading branch information
yevhen committed Jan 22, 2021
2 parents 0749dd2 + 4989dbf commit 77af7a5
Showing 1 changed file with 26 additions and 17 deletions.
43 changes: 26 additions & 17 deletions Source/Streamstone/Stream.Operations.cs
Expand Up @@ -383,38 +383,47 @@ public ReadOperation(Partition partition, int startVersion, int sliceSize)
table = partition.Table;
}

public async Task<StreamSlice<T>> ExecuteAsync(Func<DynamicTableEntity, T> transform) =>
Result(await ExecuteQueryAsync(PrepareQuery()), transform);

StreamSlice<T> Result(ICollection<DynamicTableEntity> entities, Func<DynamicTableEntity, T> transform)
public async Task<StreamSlice<T>> ExecuteAsync(Func<DynamicTableEntity, T> transform)
{
var streamEntity = FindStreamEntity(entities);
entities.Remove(streamEntity);
var eventsQuery = ExecuteQueryAsync(EventsQuery());
var streamRowQuery = ExecuteQueryAsync(StreamRowQuery());
await Task.WhenAll(eventsQuery, streamRowQuery);
return Result(await eventsQuery, FindStreamEntity(await streamRowQuery), transform);
}

StreamSlice<T> Result(ICollection<DynamicTableEntity> entities, DynamicTableEntity streamEntity, Func<DynamicTableEntity, T> transform)
{
var stream = BuildStream(streamEntity);
var events = BuildEvents(entities, transform);

return new StreamSlice<T>(stream, events, startVersion, sliceSize);
}

TableQuery<DynamicTableEntity> PrepareQuery()
TableQuery<DynamicTableEntity> EventsQuery()
{
var rowKeyStart = partition.EventVersionRowKey(startVersion);
var rowKeyEnd = partition.EventVersionRowKey(startVersion + sliceSize - 1);

var filter = TableQuery.CombineFilters(
TableQuery.GenerateFilterCondition(nameof(DynamicTableEntity.PartitionKey), QueryComparisons.Equal, partition.PartitionKey),
TableOperators.And,
TableQuery.CombineFilters(
TableQuery.GenerateFilterCondition(nameof(DynamicTableEntity.RowKey), QueryComparisons.Equal, partition.StreamRowKey()),
TableOperators.Or,
TableQuery.CombineFilters(
TableQuery.GenerateFilterCondition(nameof(DynamicTableEntity.RowKey), QueryComparisons.GreaterThanOrEqual, rowKeyStart),
TableOperators.And,
TableQuery.GenerateFilterCondition(nameof(DynamicTableEntity.RowKey), QueryComparisons.LessThanOrEqual, rowKeyEnd)
)
)
);
TableQuery.CombineFilters(
TableQuery.GenerateFilterCondition(nameof(DynamicTableEntity.RowKey), QueryComparisons.GreaterThanOrEqual, rowKeyStart),
TableOperators.And,
TableQuery.GenerateFilterCondition(nameof(DynamicTableEntity.RowKey), QueryComparisons.LessThanOrEqual, rowKeyEnd)
)
);

return new TableQuery<DynamicTableEntity>().Where(filter);
}

TableQuery<DynamicTableEntity> StreamRowQuery()
{
var filter = TableQuery.CombineFilters(
TableQuery.GenerateFilterCondition(nameof(DynamicTableEntity.PartitionKey), QueryComparisons.Equal, partition.PartitionKey),
TableOperators.And,
TableQuery.GenerateFilterCondition(nameof(DynamicTableEntity.RowKey), QueryComparisons.Equal, partition.StreamRowKey())
);

return new TableQuery<DynamicTableEntity>().Where(filter);
}
Expand Down

0 comments on commit 77af7a5

Please sign in to comment.