Skip to content

Commit

Permalink
Ensure reminder table is initialized before access (dotnet#8982)
Browse files Browse the repository at this point in the history
* Ensure reminder table is initialized before access
  • Loading branch information
ReubenBond committed May 8, 2024
1 parent d597044 commit d54dd32
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 65 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Azure;
using Microsoft.Extensions.Logging;
Expand All @@ -16,7 +17,8 @@ public sealed class AzureBasedReminderTable : IReminderTable
private readonly ILoggerFactory loggerFactory;
private readonly ClusterOptions clusterOptions;
private readonly AzureTableReminderStorageOptions storageOptions;
private RemindersTableManager remTableManager;
private readonly RemindersTableManager remTableManager;
private readonly TaskCompletionSource _initializationTask = new(TaskCreationOptions.RunContinuationsAsynchronously);

public AzureBasedReminderTable(
ILoggerFactory loggerFactory,
Expand All @@ -27,15 +29,50 @@ public sealed class AzureBasedReminderTable : IReminderTable
this.loggerFactory = loggerFactory;
this.clusterOptions = clusterOptions.Value;
this.storageOptions = storageOptions.Value;
this.remTableManager = new RemindersTableManager(
this.clusterOptions.ServiceId,
this.clusterOptions.ClusterId,
this.storageOptions,
this.loggerFactory);
}

public async Task Init()
public async Task StartAsync(CancellationToken cancellationToken)
{
this.remTableManager = await RemindersTableManager.GetManager(
this.clusterOptions.ServiceId,
this.clusterOptions.ClusterId,
this.loggerFactory,
options: this.storageOptions);
try
{
while (true)
{
try
{
await remTableManager.InitTableAsync();
_initializationTask.TrySetResult();
return;
}
catch (Exception ex) when (!cancellationToken.IsCancellationRequested)
{
logger.LogError((int)AzureReminderErrorCode.AzureTable_39, ex, "Exception trying to create or connect to the Azure table");
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
}
}
}
catch (OperationCanceledException ex)
{
logger.LogError(ex, "Reminder table initialization canceled.");
_initializationTask.TrySetCanceled(ex.CancellationToken);
throw;
}
catch (Exception ex)
{
logger.LogError(ex, "Error initializing reminder table.");
_initializationTask.TrySetException(ex);
throw;
}
}

public Task StopAsync(CancellationToken cancellationToken)
{
_initializationTask.TrySetCanceled(CancellationToken.None);
return Task.CompletedTask;
}

private ReminderTableData ConvertFromTableEntryList(List<(ReminderTableEntry Entity, string ETag)> entries)
Expand Down Expand Up @@ -78,7 +115,7 @@ private ReminderEntry ConvertFromTableEntry(ReminderTableEntry tableEntry, strin
}
finally
{
string serviceIdStr = this.remTableManager.ServiceId;
string serviceIdStr = this.clusterOptions.ServiceId;
if (!tableEntry.ServiceId.Equals(serviceIdStr))
{
this.logger.LogWarning(
Expand Down Expand Up @@ -116,15 +153,19 @@ private static ReminderTableEntry ConvertToTableEntry(ReminderEntry remEntry, st
};
}

public Task TestOnlyClearTable()
public async Task TestOnlyClearTable()
{
return this.remTableManager.DeleteTableEntries();
await _initializationTask.Task;

await this.remTableManager.DeleteTableEntries();
}

public async Task<ReminderTableData> ReadRows(GrainId grainId)
{
try
{
await _initializationTask.Task;

var entries = await this.remTableManager.FindReminderEntries(grainId);
ReminderTableData data = ConvertFromTableEntryList(entries);
if (this.logger.IsEnabled(LogLevel.Trace)) this.logger.LogTrace($"Read for grain {{GrainId}} Table={Environment.NewLine}{{Data}}", grainId, data.ToString());
Expand All @@ -143,6 +184,8 @@ public async Task<ReminderTableData> ReadRows(uint begin, uint end)
{
try
{
await _initializationTask.Task;

var entries = await this.remTableManager.FindReminderEntries(begin, end);
ReminderTableData data = ConvertFromTableEntryList(entries);
if (this.logger.IsEnabled(LogLevel.Trace)) this.logger.LogTrace($"Read in {{RingRange}} Table={Environment.NewLine}{{Data}}", RangeFactory.CreateRange(begin, end), data);
Expand All @@ -161,6 +204,8 @@ public async Task<ReminderEntry> ReadRow(GrainId grainId, string reminderName)
{
try
{
await _initializationTask.Task;

if (this.logger.IsEnabled(LogLevel.Debug)) this.logger.LogDebug("ReadRow grainRef = {GrainId} reminderName = {ReminderName}", grainId, reminderName);
var result = await this.remTableManager.FindReminderEntry(grainId, reminderName);
return result.Entity is null ? null : ConvertFromTableEntry(result.Entity, result.ETag);
Expand All @@ -178,8 +223,10 @@ public async Task<string> UpsertRow(ReminderEntry entry)
{
try
{
await _initializationTask.Task;

if (this.logger.IsEnabled(LogLevel.Debug)) this.logger.LogDebug("UpsertRow entry = {Data}", entry.ToString());
ReminderTableEntry remTableEntry = ConvertToTableEntry(entry, this.remTableManager.ServiceId, this.remTableManager.ClusterId);
ReminderTableEntry remTableEntry = ConvertToTableEntry(entry, this.clusterOptions.ServiceId, this.clusterOptions.ClusterId);

string result = await this.remTableManager.UpsertRow(remTableEntry);
if (result == null)
Expand All @@ -202,12 +249,15 @@ public async Task<bool> RemoveRow(GrainId grainId, string reminderName, string e
{
var entry = new ReminderTableEntry
{
PartitionKey = ReminderTableEntry.ConstructPartitionKey(this.remTableManager.ServiceId, grainId),
PartitionKey = ReminderTableEntry.ConstructPartitionKey(this.clusterOptions.ServiceId, grainId),
RowKey = ReminderTableEntry.ConstructRowKey(grainId, reminderName),
ETag = new ETag(eTag),
};

try
{
await _initializationTask.Task;

if (this.logger.IsEnabled(LogLevel.Trace)) this.logger.LogTrace("RemoveRow entry = {Data}", entry.ToString());

bool result = await this.remTableManager.DeleteReminderEntryConditionally(entry, eTag);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
using Azure;
using Azure.Data.Tables;
using Microsoft.Extensions.Logging;
using Orleans.AzureUtils.Utilities;
using Orleans.Reminders.AzureStorage;

namespace Orleans.Runtime.ReminderService
Expand Down Expand Up @@ -63,40 +62,24 @@ public static (string LowerBound, string UpperBound) ConstructPartitionKeyBounds

internal sealed class RemindersTableManager : AzureTableDataManager<ReminderTableEntry>
{
public string ServiceId { get; private set; }
public string ClusterId { get; private set; }
private readonly string _serviceId;
private readonly string _clusterId;

public static async Task<RemindersTableManager> GetManager(string serviceId, string clusterId, ILoggerFactory loggerFactory, AzureStorageOperationOptions options)
{
var singleton = new RemindersTableManager(serviceId, clusterId, options, loggerFactory);
try
{
singleton.Logger.LogInformation("Creating RemindersTableManager for service id {ServiceId} and clusterId {ClusterId}.", serviceId, clusterId);
await singleton.InitTableAsync();
}
catch (Exception ex)
{
singleton.Logger.LogError((int)AzureReminderErrorCode.AzureTable_39, ex, "Exception trying to create or connect to the Azure table");
throw new OrleansException("Exception trying to create or connect to the Azure table", ex);
}
return singleton;
}

private RemindersTableManager(
public RemindersTableManager(
string serviceId,
string clusterId,
AzureStorageOperationOptions options,
ILoggerFactory loggerFactory)
: base(options, loggerFactory.CreateLogger<RemindersTableManager>())
{
ClusterId = clusterId;
ServiceId = serviceId;
_clusterId = clusterId;
_serviceId = serviceId;
}

internal async Task<List<(ReminderTableEntry Entity, string ETag)>> FindReminderEntries(uint begin, uint end)
{
string sBegin = ReminderTableEntry.ConstructPartitionKey(ServiceId, begin);
string sEnd = ReminderTableEntry.ConstructPartitionKey(ServiceId, end);
string sBegin = ReminderTableEntry.ConstructPartitionKey(_serviceId, begin);
string sEnd = ReminderTableEntry.ConstructPartitionKey(_serviceId, end);
string query;
if (begin < end)
{
Expand All @@ -106,7 +89,7 @@ internal async Task<List<(ReminderTableEntry Entity, string ETag)>> FindReminder
}
else
{
var (partitionKeyLowerBound, partitionKeyUpperBound) = ReminderTableEntry.ConstructPartitionKeyBounds(ServiceId);
var (partitionKeyLowerBound, partitionKeyUpperBound) = ReminderTableEntry.ConstructPartitionKeyBounds(_serviceId);
if (begin == end)
{
// Query the entire range
Expand All @@ -129,7 +112,7 @@ internal async Task<List<(ReminderTableEntry Entity, string ETag)>> FindReminder

internal async Task<List<(ReminderTableEntry Entity, string ETag)>> FindReminderEntries(GrainId grainId)
{
var partitionKey = ReminderTableEntry.ConstructPartitionKey(ServiceId, grainId);
var partitionKey = ReminderTableEntry.ConstructPartitionKey(_serviceId, grainId);
var (rowKeyLowerBound, rowKeyUpperBound) = ReminderTableEntry.ConstructRowKeyBounds(grainId);
var query = TableClient.CreateQueryFilter($"(PartitionKey eq {partitionKey}) and ((RowKey gt {rowKeyLowerBound}) and (RowKey le {rowKeyUpperBound}))");
var queryResults = await ReadTableEntriesAndEtagsAsync(query);
Expand All @@ -138,7 +121,7 @@ internal async Task<List<(ReminderTableEntry Entity, string ETag)>> FindReminder

internal async Task<(ReminderTableEntry Entity, string ETag)> FindReminderEntry(GrainId grainId, string reminderName)
{
string partitionKey = ReminderTableEntry.ConstructPartitionKey(ServiceId, grainId);
string partitionKey = ReminderTableEntry.ConstructPartitionKey(_serviceId, grainId);
string rowKey = ReminderTableEntry.ConstructRowKey(grainId, reminderName);

return await ReadSingleTableEntryAsync(partitionKey, rowKey);
Expand Down Expand Up @@ -200,8 +183,8 @@ internal async Task DeleteTableEntries()
// group by grain hashcode so each query goes to different partition
var tasks = new List<Task>();
var groupedByHash = entries
.Where(tuple => tuple.Entity.ServiceId.Equals(ServiceId))
.Where(tuple => tuple.Entity.DeploymentId.Equals(ClusterId)) // delete only entries that belong to our DeploymentId.
.Where(tuple => tuple.Entity.ServiceId.Equals(_serviceId))
.Where(tuple => tuple.Entity.DeploymentId.Equals(_clusterId)) // delete only entries that belong to our DeploymentId.
.GroupBy(x => x.Entity.GrainRefConsistentHash).ToDictionary(g => g.Key, g => g.ToList());

foreach (var entriesPerPartition in groupedByHash.Values)
Expand Down
42 changes: 26 additions & 16 deletions src/Orleans.Reminders/ReminderService/LocalReminderService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ internal sealed class LocalReminderService : GrainService, IReminderService, ILi
private readonly Dictionary<ReminderIdentity, LocalReminderData> localReminders = new();
private readonly IReminderTable reminderTable;
private readonly TaskCompletionSource<bool> startedTask;
private readonly TimeSpan initTimeout;
private readonly IAsyncTimerFactory asyncTimerFactory;
private readonly IAsyncTimer listRefreshTimer; // timer that refreshes our list of reminders to reflect global reminder table
private readonly GrainReferenceActivator _referenceActivator;
Expand Down Expand Up @@ -54,7 +53,6 @@ internal sealed class LocalReminderService : GrainService, IReminderService, ILi
_referenceActivator = referenceActivator;
_grainInterfaceType = interfaceTypeResolver.GetGrainInterfaceType(typeof(IRemindable));
this.reminderOptions = reminderOptions.Value;
this.initTimeout = this.reminderOptions.InitializationTimeout;
this.reminderTable = reminderTable;
this.asyncTimerFactory = asyncTimerFactory;
ReminderInstruments.RegisterActiveRemindersObserve(() => localReminders.Count);
Expand All @@ -68,36 +66,46 @@ void ILifecycleParticipant<ISiloLifecycle>.Participate(ISiloLifecycle observer)
{
observer.Subscribe(
nameof(LocalReminderService),
ServiceLifecycleStage.Active,
ServiceLifecycleStage.BecomeActive,
async ct =>
{
using var timeoutCancellation = new CancellationTokenSource(initTimeout);
var cts = CancellationTokenSource.CreateLinkedTokenSource(ct, timeoutCancellation.Token);
await this.QueueTask(Start)
.WithCancellation($"Starting ReminderService failed due to timeout {initTimeout}", ct);
await this.QueueTask(() => Initialize(ct));
},
ct =>
async ct =>
{
return this.QueueTask(Stop)
.WithCancellation("Stopping ReminderService failed because the task was cancelled", ct);
await this.QueueTask(Stop)
.WithCancellation("Stopping ReminderService failed because the task was cancelled.", ct);
});
observer.Subscribe(
nameof(LocalReminderService),
ServiceLifecycleStage.Active,
async ct =>
{
using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
cts.CancelAfter(this.reminderOptions.InitializationTimeout);
await this.QueueTask(Start)
.WithCancellation($"Starting ReminderService failed because the task was canceled.", cts.Token);
},
ct => Task.CompletedTask);
}

/// <summary>
/// Attempt to retrieve reminders, that are my responsibility, from the global reminder table when starting this silo (reminder service instance)
/// </summary>
/// <returns></returns>
public override async Task Start()
private async Task Initialize(CancellationToken cancellationToken)
{
// confirm that it can access the underlying store, as after this the ReminderService will load in the background, without the opportunity to prevent the Silo from starting
await reminderTable.Init().WithTimeout(initTimeout, $"ReminderTable Initialization failed due to timeout {initTimeout}");
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cts.CancelAfter(this.reminderOptions.InitializationTimeout);

_ = base.Start();
// Confirm that it can access the underlying store, as after this the ReminderService will load in the background, without the opportunity to prevent the Silo from starting
await reminderTable.StartAsync(cts.Token);
}

public async override Task Stop()
{
_ = base.Stop();
await base.Stop();

if (listRefreshTimer != null)
{
Expand All @@ -113,7 +121,9 @@ public async override Task Stop()
r.StopReminder();
}

// for a graceful shutdown, also handover reminder responsibilities to new owner, and update the ReminderTable
await reminderTable.StopAsync();

// For a graceful shutdown, also handover reminder responsibilities to new owner, and update the ReminderTable
// currently, this is taken care of by periodically reading the reminder table
}

Expand Down
22 changes: 20 additions & 2 deletions src/Orleans.Reminders/SystemTargetInterfaces/IReminderTable.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Orleans.Concurrency;
using Orleans.Runtime;
Expand All @@ -16,8 +17,18 @@ public interface IReminderTable
/// <summary>
/// Initializes this instance.
/// </summary>
/// <returns>A Task representing the work performed.</returns>
Task Init();
/// <returns>A <see cref="Task"/> representing the work performed.</returns>
Task StartAsync(CancellationToken cancellationToken = default)
#pragma warning disable CS0618 // Type or member is obsolete
=> Init();
#pragma warning restore CS0618 // Type or member is obsolete

/// <summary>
/// Initializes this instance.
/// </summary>
/// <returns>A <see cref="Task"/> representing the work performed.</returns>
[Obsolete("Implement and use StartAsync instead")]
Task Init() => Task.CompletedTask;

/// <summary>
/// Reads the reminder table entries associated with the specified grain.
Expand Down Expand Up @@ -63,6 +74,13 @@ public interface IReminderTable
/// </summary>
/// <returns>A <see cref="Task"/> representing the work performed.</returns>
Task TestOnlyClearTable();

/// <summary>
/// Stops the reminder table.
/// </summary>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A <see cref="Task"/> representing the work performed.</returns>
Task StopAsync(CancellationToken cancellationToken = default) => Task.CompletedTask;
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public async Task Reminders_AzureTable_InsertRate()
storageOptions.Value.ConfigureTestDefaults();

IReminderTable table = new CosmosReminderTable(_loggerFactory, _fixture.Services, storageOptions, clusterOptions);
await table.Init();
using var cancellation = new CancellationTokenSource(new ReminderOptions().InitializationTimeout);
await table.StartAsync(cancellation.Token);

await TestTableInsertRate(table, 10);
await TestTableInsertRate(table, 500);
Expand All @@ -56,7 +57,8 @@ public async Task Reminders_AzureTable_InsertNewRowAndReadBack()
var storageOptions = Options.Create(new CosmosReminderTableOptions());
storageOptions.Value.ConfigureTestDefaults();
IReminderTable table = new CosmosReminderTable(_loggerFactory, _fixture.Services, storageOptions, clusterOptions);
await table.Init();
using var cancellation = new CancellationTokenSource(new ReminderOptions().InitializationTimeout);
await table.StartAsync(cancellation.Token);

ReminderEntry[] rows = (await GetAllRows(table)).ToArray();
Assert.Empty(rows); // "The reminder table (sid={0}, did={1}) was not empty.", ServiceId, clusterId);
Expand Down
Loading

0 comments on commit d54dd32

Please sign in to comment.