Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/OrleansAWSUtils/OrleansAWSUtils.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
<Compile Include="Membership\DynamoDBGatewayListProvider.cs" />
<Compile Include="Membership\DynamoDBMembershipTable.cs" />
<Compile Include="Membership\SiloInstanceRecord.cs" />
<Compile Include="Reminders\DynamoDBReminderTable.cs" />
<Compile Include="Storage\DynamoDBStorage.cs" />
<Compile Include="Storage\Provider\DynamoDBStorageProvider.cs" />
</ItemGroup>
Expand Down
261 changes: 261 additions & 0 deletions src/OrleansAWSUtils/Reminders/DynamoDBReminderTable.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
using Orleans;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Orleans.Runtime;
using Orleans.Runtime.Configuration;
using OrleansAWSUtils.Storage;
using Amazon.DynamoDBv2.Model;
using Amazon.DynamoDBv2;

namespace OrleansAWSUtils.Reminders
{
public class DynamoDBReminderTable : IReminderTable
{
private const string DEPLOYMENT_ID_PROPERTY_NAME = "DeploymentId";
private const string GRAIN_REFERENCE_PROPERTY_NAME = "GrainReference";
private const string REMINDER_NAME_PROPERTY_NAME = "ReminderName";
private const string SERVICE_ID_PROPERTY_NAME = "ServiceId";
private const string START_TIME_PROPERTY_NAME = "StartTime";
private const string PERIOD_PROPERTY_NAME = "Period";
private const string GRAIN_HASH_PROPERTY_NAME = "GrainHash";
private const string REMINDER_ID_PROPERTY_NAME = "ReminderId";
private const string ETAG_PROPERTY_NAME = "ETag";
private const string CURRENT_ETAG_ALIAS = ":currentETag";
private const string SERVICE_ID_INDEX = "ServiceIdIndex";
private SafeRandom random = new SafeRandom();

private const string TABLE_NAME_DEFAULT_VALUE = "OrleansReminders";
private Logger logger;
private DynamoDBStorage storage;
private string deploymentId;
private Guid serviceId;

public Task Init(GlobalConfiguration config, Logger logger)
{
deploymentId = config.DeploymentId;
serviceId = config.ServiceId;

this.logger = logger;

storage = new DynamoDBStorage(config.DataConnectionStringForReminders, logger);
logger.Info(ErrorCode.ReminderServiceBase, "Initializing AWS DynamoDB Reminders Table");

var secondaryIndex = new GlobalSecondaryIndex
{
IndexName = SERVICE_ID_INDEX,
Projection = new Projection { ProjectionType = ProjectionType.ALL },
KeySchema = new List<KeySchemaElement>
{
new KeySchemaElement { AttributeName = SERVICE_ID_PROPERTY_NAME, KeyType = KeyType.HASH},
new KeySchemaElement { AttributeName = GRAIN_HASH_PROPERTY_NAME, KeyType = KeyType.RANGE }
}
};

return storage.InitializeTable(TABLE_NAME_DEFAULT_VALUE,
new List<KeySchemaElement>
{
new KeySchemaElement { AttributeName = REMINDER_ID_PROPERTY_NAME, KeyType = KeyType.HASH },
new KeySchemaElement { AttributeName = GRAIN_HASH_PROPERTY_NAME, KeyType = KeyType.RANGE }
},
new List<AttributeDefinition>
{
new AttributeDefinition { AttributeName = REMINDER_ID_PROPERTY_NAME, AttributeType = ScalarAttributeType.S },
new AttributeDefinition { AttributeName = GRAIN_HASH_PROPERTY_NAME, AttributeType = ScalarAttributeType.N },
new AttributeDefinition { AttributeName = SERVICE_ID_PROPERTY_NAME, AttributeType = ScalarAttributeType.S }
},
new List<GlobalSecondaryIndex> { secondaryIndex });
}

public async Task<ReminderEntry> ReadRow(GrainReference grainRef, string reminderName)
{
var reminderId = ConstructReminderId(serviceId, grainRef, reminderName);

var keys = new Dictionary<string, AttributeValue>
{
{ $"{REMINDER_ID_PROPERTY_NAME}", new AttributeValue(reminderId) },
{ $"{GRAIN_HASH_PROPERTY_NAME}", new AttributeValue { N = grainRef.GetUniformHashCode().ToString() } }
};

try
{
return await storage.ReadSingleEntryAsync(TABLE_NAME_DEFAULT_VALUE, keys, Resolve).ConfigureAwait(false);
}
catch (Exception exc)
{
logger.Warn(ErrorCode.ReminderServiceBase,
$"Intermediate error reading reminder entry {Utils.DictionaryToString(keys)} from table {TABLE_NAME_DEFAULT_VALUE}.", exc);
throw;
}
}

public async Task<ReminderTableData> ReadRows(GrainReference grainRef)
{
var expressionValues = new Dictionary<string, AttributeValue>
{
{ $":{SERVICE_ID_PROPERTY_NAME}", new AttributeValue(serviceId.ToString()) },
{ $":{GRAIN_REFERENCE_PROPERTY_NAME}", new AttributeValue(grainRef.ToKeyString()) }
};

try
{
var expression = $"{SERVICE_ID_PROPERTY_NAME} = :{SERVICE_ID_PROPERTY_NAME} AND {GRAIN_REFERENCE_PROPERTY_NAME} = :{GRAIN_REFERENCE_PROPERTY_NAME}";
var records = await storage.ScanAsync(TABLE_NAME_DEFAULT_VALUE, expressionValues, expression, Resolve).ConfigureAwait(false);

return new ReminderTableData(records);
}
catch (Exception exc)
{
logger.Warn(ErrorCode.ReminderServiceBase,
$"Intermediate error reading reminder entry {Utils.DictionaryToString(expressionValues)} from table {TABLE_NAME_DEFAULT_VALUE}.", exc);
throw;
}
}

public async Task<ReminderTableData> ReadRows(uint beginHash, uint endHash)
{
var expressionValues = new Dictionary<string, AttributeValue>
{
{ $":{SERVICE_ID_PROPERTY_NAME}", new AttributeValue(serviceId.ToString()) },
{ $":Begin{GRAIN_HASH_PROPERTY_NAME}", new AttributeValue { N = beginHash.ToString() } },
{ $":End{GRAIN_HASH_PROPERTY_NAME}", new AttributeValue { N = endHash.ToString() } }
};

try
{
string expression = string.Empty;
if (beginHash < endHash)
{
expression = $"{SERVICE_ID_PROPERTY_NAME} = :{SERVICE_ID_PROPERTY_NAME} AND {GRAIN_HASH_PROPERTY_NAME} > :Begin{GRAIN_HASH_PROPERTY_NAME} AND {GRAIN_HASH_PROPERTY_NAME} <= :End{GRAIN_HASH_PROPERTY_NAME}";
}
else
{
expression = $"{SERVICE_ID_PROPERTY_NAME} = :{SERVICE_ID_PROPERTY_NAME} AND ({GRAIN_HASH_PROPERTY_NAME} > :Begin{GRAIN_HASH_PROPERTY_NAME} OR {GRAIN_HASH_PROPERTY_NAME} <= :End{GRAIN_HASH_PROPERTY_NAME})";
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not familiar enough with DynamoDB.
Does it matter how you structure the query? I would expect it does. In Azure Table only PK and RK are sorted and indexed, so it was very important to structure the query to be done in one sorted scan.
But in DynamoDB, they have secondary indices, right?
Don't you need to tell it explicitly what columns to index? I would not index by default all columns, right?
If you do NOT index those columns, I think we need to think about efficiency of that query , same as we did for Azure Table.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it index by default only the keys. However, we can't do queries with parts of the field like in Azure (i.e. Substring) but, you are right. I shouls have marked those 2 fields as secondary indexes. Will do that.

Copy link
Copy Markdown
Member Author

@galvesribeiro galvesribeiro Aug 18, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok... Adding, the index doesn't do the trick: KeyConditionExpressions must only contain one condition per key. So is pointless to make the secondary indexes since they can't be used in the Query operation but instead only on Scan. I don't know if there is a benefit on have the them just for scan since it is only used on Query...

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scratch that... The secondary indexes ARE considered on the Scan requests and that impacts directly on the performance. So I've added the index on the ServiceId.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About secondary indices: index on the ServiceId is good, but maybe not enough. In Azure we had a hierarchical index (achieved via PK sort order): service id first and then grainHash:
https://github.com/dotnet/orleans/blob/master/src/OrleansAzureUtils/Storage/RemindersTableManager.cs#L47
That way you can scale within the service as well: each silo reads its range only, and does not have to scan all reminders for this service id. Can you have global secondary index on 2 keys?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I can add 2 keys. So I'll add the ServiceId as the PK and the GrainHash as the RK.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

var records = await storage.ScanAsync(TABLE_NAME_DEFAULT_VALUE, expressionValues, expression, Resolve).ConfigureAwait(false);

return new ReminderTableData(records);
}
catch (Exception exc)
{
logger.Warn(ErrorCode.ReminderServiceBase,
$"Intermediate error reading reminder entry {Utils.DictionaryToString(expressionValues)} from table {TABLE_NAME_DEFAULT_VALUE}.", exc);
throw;
}
}

private static ReminderEntry Resolve(Dictionary<string, AttributeValue> item)
{
return new ReminderEntry
{
ETag = item[ETAG_PROPERTY_NAME].N,
GrainRef = GrainReference.FromKeyString(item[GRAIN_REFERENCE_PROPERTY_NAME].S),
Period = TimeSpan.Parse(item[PERIOD_PROPERTY_NAME].S),
ReminderName = item[REMINDER_NAME_PROPERTY_NAME].S,
StartAt = DateTime.Parse(item[START_TIME_PROPERTY_NAME].S)
};
}

public async Task<bool> RemoveRow(GrainReference grainRef, string reminderName, string eTag)
{
var reminderId = ConstructReminderId(serviceId, grainRef, reminderName);

var keys = new Dictionary<string, AttributeValue>
{
{ $"{REMINDER_ID_PROPERTY_NAME}", new AttributeValue(reminderId) },
{ $"{GRAIN_HASH_PROPERTY_NAME}", new AttributeValue { N = grainRef.GetUniformHashCode().ToString() } }
};

try
{
var conditionalValues = new Dictionary<string, AttributeValue> { { CURRENT_ETAG_ALIAS, new AttributeValue { N = eTag } } };
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here you are using the etag correctly.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'm using that correctly in other places. Only that other method that one of the tests wasn't working as I mentioned. Will check that test with Shay.

var expression = $"{ETAG_PROPERTY_NAME} = {CURRENT_ETAG_ALIAS}";

await storage.DeleteEntryAsync(TABLE_NAME_DEFAULT_VALUE, keys, expression, conditionalValues).ConfigureAwait(false);
return true;
}
catch (ConditionalCheckFailedException)
{
return false;
}
}

public async Task TestOnlyClearTable()
{
var expressionValues = new Dictionary<string, AttributeValue>
{
{ $":{SERVICE_ID_PROPERTY_NAME}", new AttributeValue(serviceId.ToString()) }
};

try
{
var expression = $"{SERVICE_ID_PROPERTY_NAME} = :{SERVICE_ID_PROPERTY_NAME}";
var records = await storage.ScanAsync(TABLE_NAME_DEFAULT_VALUE, expressionValues, expression,
item => new Dictionary<string, AttributeValue>
{
{ REMINDER_ID_PROPERTY_NAME, item[REMINDER_ID_PROPERTY_NAME] },
{ GRAIN_HASH_PROPERTY_NAME, item[GRAIN_HASH_PROPERTY_NAME] }
}).ConfigureAwait(false);

if (records.Count <= 25)
{
await storage.DeleteEntriesAsync(TABLE_NAME_DEFAULT_VALUE, records);
}
else
{
List<Task> tasks = new List<Task>();
foreach (var batch in records.BatchIEnumerable(25))
{
tasks.Add(storage.DeleteEntriesAsync(TABLE_NAME_DEFAULT_VALUE, batch));
}
await Task.WhenAll(tasks);
}
}
catch (Exception exc)
{
logger.Warn(ErrorCode.ReminderServiceBase,
$"Intermediate error removing reminder entries {Utils.DictionaryToString(expressionValues)} from table {TABLE_NAME_DEFAULT_VALUE}.", exc);
throw;
}
}

public async Task<string> UpsertRow(ReminderEntry entry)
{
var reminderId = ConstructReminderId(serviceId, entry.GrainRef, entry.ReminderName);

var fields = new Dictionary<string, AttributeValue>
{
{ REMINDER_ID_PROPERTY_NAME, new AttributeValue(reminderId) },
{ GRAIN_HASH_PROPERTY_NAME, new AttributeValue { N = entry.GrainRef.GetUniformHashCode().ToString() } },
{ SERVICE_ID_PROPERTY_NAME, new AttributeValue(serviceId.ToString()) },
{ GRAIN_REFERENCE_PROPERTY_NAME, new AttributeValue( entry.GrainRef.ToKeyString()) },
{ PERIOD_PROPERTY_NAME, new AttributeValue(entry.Period.ToString()) },
{ START_TIME_PROPERTY_NAME, new AttributeValue(entry.StartAt.ToString()) },
{ REMINDER_NAME_PROPERTY_NAME, new AttributeValue(entry.ReminderName) },
{ ETAG_PROPERTY_NAME, new AttributeValue { N = random.Next(int.MaxValue).ToString() } }
};

try
{
if (logger.IsVerbose) logger.Verbose("UpsertRow entry = {0}, etag = {1}", entry.ToString(), entry.ETag);

await storage.PutEntryAsync(TABLE_NAME_DEFAULT_VALUE, fields);

entry.ETag = fields[ETAG_PROPERTY_NAME].N;
return entry.ETag;
}
catch (Exception exc)
{
logger.Warn(ErrorCode.ReminderServiceBase,
$"Intermediate error updating entry {entry.ToString()} to the table {TABLE_NAME_DEFAULT_VALUE}.", exc);
throw;
}
}

private static string ConstructReminderId(Guid serviceId, GrainReference grainRef, string reminderName)
{
return $"{serviceId}_{grainRef.ToKeyString()}_{reminderName}";
}
}
}
27 changes: 23 additions & 4 deletions src/OrleansAWSUtils/Storage/DynamoDBStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,14 @@ public DynamoDBStorage(string dataConnectionString, Logger logger = null)
/// <param name="tableName">The name of the table</param>
/// <param name="keys">The keys definitions</param>
/// <param name="attributes">The attributes used on the key definition</param>
/// <param name="secondaryIndexes">(optional) The secondary index definitions</param>
/// <returns></returns>
public async Task InitializeTable(string tableName, List<KeySchemaElement> keys, List<AttributeDefinition> attributes)
public async Task InitializeTable(string tableName, List<KeySchemaElement> keys, List<AttributeDefinition> attributes, List<GlobalSecondaryIndex> secondaryIndexes = null)
{
try
{
if (await GetTableDescription(tableName) == null)
await CreateTable(tableName, keys, attributes);
await CreateTable(tableName, keys, attributes, secondaryIndexes);
}
catch (Exception exc)
{
Expand Down Expand Up @@ -139,7 +140,7 @@ private async Task<TableDescription> GetTableDescription(string tableName)
return null;
}

private async Task CreateTable(string tableName, List<KeySchemaElement> keys, List<AttributeDefinition> attributes)
private async Task CreateTable(string tableName, List<KeySchemaElement> keys, List<AttributeDefinition> attributes, List<GlobalSecondaryIndex> secondaryIndexes = null)
{
var request = new CreateTableRequest
{
Expand All @@ -153,6 +154,16 @@ private async Task CreateTable(string tableName, List<KeySchemaElement> keys, Li
}
};

if (secondaryIndexes != null && secondaryIndexes.Count > 0)
{
var indexThroughput = new ProvisionedThroughput { ReadCapacityUnits = readCapacityUnits, WriteCapacityUnits = writeCapacityUnits };
secondaryIndexes.ForEach(i =>
{
i.ProvisionedThroughput = indexThroughput;
});
request.GlobalSecondaryIndexes = secondaryIndexes;
}

try
{
var response = await ddbClient.CreateTableAsync(request);
Expand Down Expand Up @@ -435,8 +446,10 @@ public async Task<TResult> ReadSingleEntryAsync<TResult>(string tableName, Dicti
/// <param name="keys">The table entry keys to search for</param>
/// <param name="keyConditionExpression">the expression that will filter the keys</param>
/// <param name="resolver">Function that will be called to translate the returned fields into a concrete type. This Function is only called if the result is != null and will be called for each entry that match the query and added to the results list</param>
/// <param name="indexName">In case a secondary index is used in the keyConditionExpression</param>
/// <param name="scanIndexForward">In case an index is used, show if the seek order is ascending (true) or descending (false)</param>
/// <returns>The collection containing a list of objects translated by the resolver function</returns>
public async Task<List<TResult>> QueryAsync<TResult>(string tableName, Dictionary<string, AttributeValue> keys, string keyConditionExpression, Func<Dictionary<string, AttributeValue>, TResult> resolver) where TResult : class
public async Task<List<TResult>> QueryAsync<TResult>(string tableName, Dictionary<string, AttributeValue> keys, string keyConditionExpression, Func<Dictionary<string, AttributeValue>, TResult> resolver, string indexName = "", bool scanIndexForward = true) where TResult : class
{
try
{
Expand All @@ -449,6 +462,12 @@ public async Task<List<TResult>> QueryAsync<TResult>(string tableName, Dictionar
Select = Select.ALL_ATTRIBUTES
};

if (!string.IsNullOrWhiteSpace(indexName))
{
request.ScanIndexForward = scanIndexForward;
request.IndexName = indexName;
}

var response = await ddbClient.QueryAsync(request);

var resultList = new List<TResult>();
Expand Down
52 changes: 52 additions & 0 deletions test/TesterInternal/RemindersTest/DynamoDBRemindersTableTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Orleans;
using UnitTests.StorageTests.AWSUtils;
using Xunit;
using OrleansAWSUtils.Reminders;

namespace UnitTests.RemindersTest
{
public class DynamoDBRemindersTableTests : ReminderTableTestsBase, IClassFixture<DynamoDBStorageTestsFixture>
{
public DynamoDBRemindersTableTests(ConnectionStringFixture fixture) : base(fixture)
{
}

protected override IReminderTable CreateRemindersTable()
{
return new DynamoDBReminderTable();
}

protected override string GetConnectionString()
{
return $"Service={AWSTestConstants.Service}";
}

[Fact, TestCategory("Reminders"), TestCategory("AWS")]
public void RemindersTable_AWS_Init()
{
}

[Fact, TestCategory("Reminders"), TestCategory("AWS")]
public async Task RemindersTable_AWS_RemindersRange()
{
await RemindersRange(50);
}

[Fact, TestCategory("Reminders"), TestCategory("AWS")]
public async Task RemindersTable_AWS_RemindersParallelUpsert()
{
await RemindersParallelUpsert();
}

[Fact, TestCategory("Reminders"), TestCategory("AWS")]
public async Task RemindersTable_AWS_ReminderSimple()
{
await ReminderSimple();
}
}
}
1 change: 1 addition & 0 deletions test/TesterInternal/TesterInternal.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
<Compile Include="HostedTestClusterBase.cs" />
<Compile Include="MembershipTests\DynamoDBMembershipTableTest.cs" />
<Compile Include="OrleansRuntime\Streams\SubscriptionMarkerTests.cs" />
<Compile Include="RemindersTest\DynamoDBRemindersTableTests.cs" />
<Compile Include="StorageTests\AWSUtils\AWSTestConstants.cs" />
<Compile Include="StorageTests\AWSUtils\Base_PersistenceGrainTests_AWSStore.cs" />
<Compile Include="StorageTests\AWSUtils\DynamoDBStorageProviderTests.cs" />
Expand Down