Skip to content

Commit

Permalink
DynamoDB Reminders load using GSI (#7437) (#7717)
Browse files Browse the repository at this point in the history
* DynamoDB Reminders load using GSI

* Update DynamoDBStorage.cs

* Update DynamoDBReminderTable.cs

* Update DynamoDBReminderTable.cs

Co-authored-by: Reuben Bond <203839+ReubenBond@users.noreply.github.com>

Co-authored-by: michaeltdaniels <45430678+michaeltdaniels@users.noreply.github.com>
  • Loading branch information
ReubenBond and michaeltdaniels committed May 5, 2022
1 parent 63c0dd6 commit cefa0d3
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,31 +146,48 @@ public async Task<ReminderTableData> ReadRows(GrainReference grainRef)
/// <summary>
/// Reads reminder table data for a given hash range.
/// </summary>
/// <param name="beginHash"></param>
/// <param name="endHash"></param>
/// <param name="begin"></param>
/// <param name="end"></param>
/// <returns> Return the RemiderTableData if the rows were read successfully </returns>
public async Task<ReminderTableData> ReadRows(uint beginHash, uint endHash)
public async Task<ReminderTableData> ReadRows(uint begin, uint end)
{
var expressionValues = new Dictionary<string, AttributeValue>
{
{ $":{SERVICE_ID_PROPERTY_NAME}", new AttributeValue(this.serviceId) },
{ $":Begin{GRAIN_HASH_PROPERTY_NAME}", new AttributeValue { N = beginHash.ToString() } },
{ $":End{GRAIN_HASH_PROPERTY_NAME}", new AttributeValue { N = endHash.ToString() } }
};
Dictionary<string, AttributeValue> expressionValues = null;

try
{
string expression = string.Empty;
if (beginHash < endHash)
List<ReminderEntry> records;

if (begin < end)
{
expression = $"{SERVICE_ID_PROPERTY_NAME} = :{SERVICE_ID_PROPERTY_NAME} AND {GRAIN_HASH_PROPERTY_NAME} > :Begin{GRAIN_HASH_PROPERTY_NAME} AND {GRAIN_HASH_PROPERTY_NAME} <= :End{GRAIN_HASH_PROPERTY_NAME}";
expressionValues = new Dictionary<string, AttributeValue>
{
{ $":{SERVICE_ID_PROPERTY_NAME}", new AttributeValue(this.serviceId) },
{ $":Begin{GRAIN_HASH_PROPERTY_NAME}", new AttributeValue { N = (begin + 1).ToString() } },
{ $":End{GRAIN_HASH_PROPERTY_NAME}", new AttributeValue { N = end.ToString() } }
};
expression = $"{SERVICE_ID_PROPERTY_NAME} = :{SERVICE_ID_PROPERTY_NAME} AND {GRAIN_HASH_PROPERTY_NAME} BETWEEN :Begin{GRAIN_HASH_PROPERTY_NAME} AND :End{GRAIN_HASH_PROPERTY_NAME}";
records = await this.storage.QueryAllAsync(this.options.TableName, expressionValues, expression, this.Resolve, SERVICE_ID_INDEX, consistentRead: false).ConfigureAwait(false);
}
else
{
expression = $"{SERVICE_ID_PROPERTY_NAME} = :{SERVICE_ID_PROPERTY_NAME} AND ({GRAIN_HASH_PROPERTY_NAME} > :Begin{GRAIN_HASH_PROPERTY_NAME} OR {GRAIN_HASH_PROPERTY_NAME} <= :End{GRAIN_HASH_PROPERTY_NAME})";
}
expressionValues = new Dictionary<string, AttributeValue>
{
{ $":{SERVICE_ID_PROPERTY_NAME}", new AttributeValue(this.serviceId) },
{ $":End{GRAIN_HASH_PROPERTY_NAME}", new AttributeValue { N = end.ToString() } }
};
expression = $"{SERVICE_ID_PROPERTY_NAME} = :{SERVICE_ID_PROPERTY_NAME} AND {GRAIN_HASH_PROPERTY_NAME} <= :End{GRAIN_HASH_PROPERTY_NAME}";
records = await this.storage.QueryAllAsync(this.options.TableName, expressionValues, expression, this.Resolve, SERVICE_ID_INDEX, consistentRead: false).ConfigureAwait(false);

var records = await this.storage.ScanAsync(this.options.TableName, expressionValues, expression, this.Resolve).ConfigureAwait(false);
expressionValues = new Dictionary<string, AttributeValue>
{
{ $":{SERVICE_ID_PROPERTY_NAME}", new AttributeValue(this.serviceId) },
{ $":Begin{GRAIN_HASH_PROPERTY_NAME}", new AttributeValue { N = begin.ToString() } }
};
expression = $"{SERVICE_ID_PROPERTY_NAME} = :{SERVICE_ID_PROPERTY_NAME} AND {GRAIN_HASH_PROPERTY_NAME} > :Begin{GRAIN_HASH_PROPERTY_NAME}";
records.AddRange(await this.storage.QueryAllAsync(this.options.TableName, expressionValues, expression, this.Resolve, SERVICE_ID_INDEX, consistentRead: false).ConfigureAwait(false));

}

return new ReminderTableData(records);
}
Expand Down
14 changes: 8 additions & 6 deletions src/AWS/Shared/Storage/DynamoDBStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ public Task DeleteEntriesAsync(string tableName, IReadOnlyCollection<Dictionary<
{
if (Logger.IsEnabled(LogLevel.Trace)) Logger.Trace("Deleting {0} table entries", tableName);

if (toDelete == null) throw new ArgumentNullException("collection");
if (toDelete == null) throw new ArgumentNullException(nameof(toDelete));

if (toDelete.Count == 0)
return Task.CompletedTask;
Expand Down Expand Up @@ -492,16 +492,17 @@ public Task DeleteEntriesAsync(string tableName, IReadOnlyCollection<Dictionary<
/// <param name="indexName">In case a secondary index is used in the keyConditionExpression</param>
/// <param name="scanIndexForward">In case an index is used, show if the seek order is ascending (true) or descending (false)</param>
/// <param name="lastEvaluatedKey">The primary key of the first item that this operation will evaluate. Use the value that was returned for LastEvaluatedKey in the previous operation</param>
/// <param name="consistentRead">Determines the read consistency model. Note that if a GSI is used, this must be false.</param>
/// <returns>The collection containing a list of objects translated by the resolver function and the LastEvaluatedKey for paged results</returns>
public async Task<(List<TResult> results, Dictionary<string, AttributeValue> lastEvaluatedKey)> QueryAsync<TResult>(string tableName, Dictionary<string, AttributeValue> keys, string keyConditionExpression, Func<Dictionary<string, AttributeValue>, TResult> resolver, string indexName = "", bool scanIndexForward = true, Dictionary<string, AttributeValue> lastEvaluatedKey = null) where TResult : class
public async Task<(List<TResult> results, Dictionary<string, AttributeValue> lastEvaluatedKey)> QueryAsync<TResult>(string tableName, Dictionary<string, AttributeValue> keys, string keyConditionExpression, Func<Dictionary<string, AttributeValue>, TResult> resolver, string indexName = "", bool scanIndexForward = true, Dictionary<string, AttributeValue> lastEvaluatedKey = null, bool consistentRead = true) where TResult : class
{
try
{
var request = new QueryRequest
{
TableName = tableName,
ExpressionAttributeValues = keys,
ConsistentRead = true,
ConsistentRead = consistentRead,
KeyConditionExpression = keyConditionExpression,
Select = Select.ALL_ATTRIBUTES,
ExclusiveStartKey = lastEvaluatedKey
Expand Down Expand Up @@ -539,18 +540,19 @@ public Task DeleteEntriesAsync(string tableName, IReadOnlyCollection<Dictionary<
/// <param name="resolver">Function that will be called to translate the returned fields into a concrete type. This Function is only called if the result is != null and will be called for each entry that match the query and added to the results list</param>
/// <param name="indexName">In case a secondary index is used in the keyConditionExpression</param>
/// <param name="scanIndexForward">In case an index is used, show if the seek order is ascending (true) or descending (false)</param>
/// <param name="consistentRead">Determines the read consistency model. Note that if a GSI is used, this must be false.</param>
/// <returns>The collection containing a list of objects translated by the resolver function</returns>
public async Task<List<TResult>> QueryAllAsync<TResult>(string tableName, Dictionary<string, AttributeValue> keys,
string keyConditionExpression, Func<Dictionary<string, AttributeValue>, TResult> resolver,
string indexName = "", bool scanIndexForward = true) where TResult : class
string indexName = "", bool scanIndexForward = true, bool consistentRead = true) where TResult : class
{
List<TResult> resultList = null;
Dictionary<string, AttributeValue> lastEvaluatedKey = null;
do
{
List<TResult> results;
(results, lastEvaluatedKey) = await QueryAsync(tableName, keys, keyConditionExpression, resolver,
indexName, scanIndexForward, lastEvaluatedKey);
indexName, scanIndexForward, lastEvaluatedKey, consistentRead);
if (resultList == null)
{
resultList = results;
Expand Down Expand Up @@ -636,7 +638,7 @@ public Task PutEntriesAsync(string tableName, IReadOnlyCollection<Dictionary<str
{
if (Logger.IsEnabled(LogLevel.Trace)) Logger.Trace("Put entries {0} table", tableName);

if (toCreate == null) throw new ArgumentNullException("collection");
if (toCreate == null) throw new ArgumentNullException(nameof(toCreate));

if (toCreate.Count == 0)
return Task.CompletedTask;
Expand Down

0 comments on commit cefa0d3

Please sign in to comment.