Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DT.AzureStorage: add randomization to history blob names to avoid races #891

Merged
merged 7 commits into from
May 11, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -1052,8 +1052,7 @@ async Task AbandonAndReleaseSessionAsync(OrchestrationSession session)
// will result in a duplicate replay of the orchestration with no side-effects.
try
{
session.ETag = await this.trackingStore.UpdateStateAsync(runtimeState, workItem.OrchestrationRuntimeState, instanceId, executionId, session.ETag);

session.ETag = await this.trackingStore.UpdateStateAsync(runtimeState, workItem.OrchestrationRuntimeState, instanceId, executionId, session.ETag, session.TrackingStoreContext);
// update the runtime state and execution id stored in the session
session.UpdateRuntimeState(runtimeState);

Expand Down
6 changes: 6 additions & 0 deletions src/DurableTask.AzureStorage/MessageManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,12 @@ public Task<string> DownloadAndDecompressAsBytesAsync(Uri blobUri)
return DownloadAndDecompressAsBytesAsync(blob);
}

public Task<bool> DeleteBlobAsync(string blobName)
{
Blob blob = this.blobContainer.GetBlobReference(blobName);
return blob.DeleteIfExistsAsync();
}

private async Task<string> DownloadAndDecompressAsBytesAsync(Blob blob)
{
using (MemoryStream memory = new MemoryStream(MaxStorageQueuePayloadSizeInBytes * 2))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ sealed class OrchestrationSession : SessionBase, IOrchestrationSession
OrchestrationRuntimeState runtimeState,
string eTag,
DateTime lastCheckpointTime,
object trackingStoreContext,
TimeSpan idleTimeout,
Guid traceActivityId)
: base(settings, storageAccountName, orchestrationInstance, traceActivityId)
Expand All @@ -48,6 +49,7 @@ sealed class OrchestrationSession : SessionBase, IOrchestrationSession
this.RuntimeState = runtimeState ?? throw new ArgumentNullException(nameof(runtimeState));
this.ETag = eTag;
this.LastCheckpointTime = lastCheckpointTime;
this.TrackingStoreContext = trackingStoreContext;

this.messagesAvailableEvent = new AsyncAutoResetEvent(signaled: false);
this.nextMessageBatch = new MessageCollection();
Expand All @@ -67,6 +69,8 @@ sealed class OrchestrationSession : SessionBase, IOrchestrationSession

public DateTime LastCheckpointTime { get; }

public object TrackingStoreContext { get; }

public IReadOnlyList<MessageData> PendingMessages => this.nextMessageBatch;

public override int GetCurrentEpisode()
Expand Down
3 changes: 3 additions & 0 deletions src/DurableTask.AzureStorage/OrchestrationSessionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,7 @@ bool IsScheduledAfterInstanceUpdate(MessageData msg, OrchestrationState? remoteI
batch.OrchestrationState = new OrchestrationRuntimeState(history.Events);
batch.ETag = history.ETag;
batch.LastCheckpointTime = history.LastCheckpointTime;
batch.TrackingStoreContext = history.TrackingStoreContext;
}

this.readyForProcessingQueue.Enqueue(node);
Expand Down Expand Up @@ -538,6 +539,7 @@ bool IsScheduledAfterInstanceUpdate(MessageData msg, OrchestrationState? remoteI
nextBatch.OrchestrationState,
nextBatch.ETag,
nextBatch.LastCheckpointTime,
nextBatch.TrackingStoreContext,
this.settings.ExtendedSessionIdleTimeout,
traceActivityId);

Expand Down Expand Up @@ -683,6 +685,7 @@ public PendingMessageBatch(ControlQueue controlQueue, string instanceId, string?

public string? ETag { get; set; }
public DateTime LastCheckpointTime { get; set; }
public object? TrackingStoreContext { get; set; }
}
}
}
56 changes: 47 additions & 9 deletions src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
namespace DurableTask.AzureStorage.Tracking
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
Expand Down Expand Up @@ -158,6 +159,7 @@ public override async Task<OrchestrationHistory> GetHistoryEventsAsync(string in
IList<HistoryEvent> historyEvents;
string executionId;
DynamicTableEntity sentinel = null;
TrackingStoreContext trackingStoreContext = new TrackingStoreContext();
if (tableEntities.Count > 0)
{
// The most recent generation will always be in the first history event.
Expand All @@ -183,7 +185,7 @@ public override async Task<OrchestrationHistory> GetHistoryEventsAsync(string in
}

// Some entity properties may be stored in blob storage.
await this.DecompressLargeEntityProperties(entity);
await this.DecompressLargeEntityProperties(entity, trackingStoreContext.Blobs);

events.Add((HistoryEvent)this.tableEntityConverter.ConvertFromTableEntity(entity, GetTypeForTableEntity));
}
Expand Down Expand Up @@ -222,7 +224,7 @@ public override async Task<OrchestrationHistory> GetHistoryEventsAsync(string in
eTagValue,
checkpointCompletionTime);

return new OrchestrationHistory(historyEvents, checkpointCompletionTime, eTagValue);
return new OrchestrationHistory(historyEvents, checkpointCompletionTime, eTagValue, trackingStoreContext);
}

async Task<TableEntitiesResponseInfo<DynamicTableEntity>> GetHistoryEntitiesResponseInfoAsync(string instanceId, string expectedExecutionId, IList<string> projectionColumns, CancellationToken cancellationToken = default(CancellationToken))
Expand Down Expand Up @@ -843,7 +845,7 @@ public override async Task<PurgeHistoryResult> PurgeInstanceHistoryAsync(string

// It is possible that the queue message was small enough to be written directly to a queue message,
// not a blob, but is too large to be written to a table property.
await this.CompressLargeMessageAsync(entity);
await this.CompressLargeMessageAsync(entity, listOfBlobs: null);

Stopwatch stopwatch = Stopwatch.StartNew();
try
Expand Down Expand Up @@ -928,11 +930,13 @@ public override Task StartAsync()
OrchestrationRuntimeState oldRuntimeState,
string instanceId,
string executionId,
string eTagValue)
string eTagValue,
object trackingStoreContext)
{
int estimatedBytes = 0;
IList<HistoryEvent> newEvents = newRuntimeState.NewEvents;
IList<HistoryEvent> allEvents = newRuntimeState.Events;
TrackingStoreContext context = (TrackingStoreContext) trackingStoreContext;

int episodeNumber = Utils.GetEpisodeNumber(newRuntimeState);

Expand All @@ -953,7 +957,15 @@ public override Task StartAsync()
["LastUpdatedTime"] = new EntityProperty(newEvents.Last().Timestamp),
}
};


// check if we are replacing a previous execution with blobs; those will be deleted from the store after the update. This could occur in a ContinueAsNew scenario
List<string> blobsToDelete = null;
if (oldRuntimeState != newRuntimeState && context.Blobs.Count > 0)
{
blobsToDelete = context.Blobs;
context.Blobs = new List<string>();
}

for (int i = 0; i < newEvents.Count; i++)
{
bool isFinalEvent = i == newEvents.Count - 1;
Expand All @@ -969,7 +981,7 @@ public override Task StartAsync()
historyEntity.RowKey = sequenceNumber.ToString("X16");
historyEntity.Properties["ExecutionId"] = new EntityProperty(executionId);

await this.CompressLargeMessageAsync(historyEntity);
await this.CompressLargeMessageAsync(historyEntity, context.Blobs);

// Replacement can happen if the orchestration episode gets replayed due to a commit failure in one of the steps below.
historyEventBatch.InsertOrReplace(historyEntity);
Expand Down Expand Up @@ -1108,6 +1120,18 @@ public override Task StartAsync()
episodeNumber,
orchestrationInstanceUpdateStopwatch.ElapsedMilliseconds);

// finally, delete orphaned blobs from the previous execution history.
// We had to wait until the new history has committed to make sure the blobs are no longer necessary.
if (blobsToDelete != null)
{
var tasks = new List<Task>(blobsToDelete.Count);
foreach (var blobName in blobsToDelete)
{
tasks.Add(this.messageManager.DeleteBlobAsync(blobName));
}
await Task.WhenAll(tasks);
}

return eTagValue;
}

Expand Down Expand Up @@ -1177,7 +1201,7 @@ Type GetTypeForTableEntity(DynamicTableEntity tableEntity)
}
}

async Task CompressLargeMessageAsync(DynamicTableEntity entity)
async Task CompressLargeMessageAsync(DynamicTableEntity entity, List<string> listOfBlobs)
davidmrdavid marked this conversation as resolved.
Show resolved Hide resolved
{
foreach (string propertyName in VariableSizeEntityProperties)
{
Expand All @@ -1194,11 +1218,14 @@ async Task CompressLargeMessageAsync(DynamicTableEntity entity)
string blobPropertyName = GetBlobPropertyName(propertyName);
entity.Properties.Add(blobPropertyName, new EntityProperty(blobName));
entity.Properties[propertyName].StringValue = string.Empty;

// if necessary, keep track of all the blobs associated with this execution
listOfBlobs?.Add(blobName);
}
}
}

async Task DecompressLargeEntityProperties(DynamicTableEntity entity)
async Task DecompressLargeEntityProperties(DynamicTableEntity entity, List<string> listOfBlobs)
{
// Check for entity properties stored in blob storage
foreach (string propertyName in VariableSizeEntityProperties)
Expand All @@ -1210,6 +1237,9 @@ async Task DecompressLargeEntityProperties(DynamicTableEntity entity)
string decompressedMessage = await this.messageManager.DownloadAndDecompressAsBytesAsync(blobName);
entity.Properties[propertyName] = new EntityProperty(decompressedMessage);
entity.Properties.Remove(blobPropertyName);

// keep track of all the blobs associated with this execution
listOfBlobs.Add(blobName);
}
}
}
Expand Down Expand Up @@ -1241,7 +1271,10 @@ static string GetBlobName(DynamicTableEntity entity, string property)
throw new InvalidOperationException($"Could not compute the blob name for property {property}");
}

string blobName = $"{sanitizedInstanceId}/history-{sequenceNumber}-{eventType}-{property}.json.gz";
// randomize the blob name to prevent accidental races in split-brain situations (#890)
uint random = (uint)(new Random()).Next();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason for us not to use a GUID in this case? I would suspect that may be more "unique" than a random number, but perhaps that's incorrect.

Also - do we need to be concerned about creating orphaned blobs as a result of this?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also - do we need to be concerned about creating orphaned blobs as a result of this?

I don't have it in front of me, but it would be good to check our purge logic to see if it might end up missing orphaned blobs.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A GUID of 128 bit is indeed more random than the 32bit used here. But for this particular purpose I think 32bit is more than enough - split brain does not happen frequently enough to make collisions likely. I didn't want to waste so much space in the blob name.

The purge operation seems to wipe out the entire blob directory belonging to the instance id, so no problem there.

One thing I am not sure about is the case where a history is replaced (e.g. after ContinueAsNew). There should be some code somewhere that removes blobs for the old history (even without the proposed randomization) but I haven't quite spotted it yet.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at the current code some more. It simply does not remove any old blobs when updating histories - so those blobs may accumulate until purging the instance. However, importantly, in some select cases, it can overwrite old blobs with a new blob of the same name. Tor example, when calling continue as new and the new blob happens to have the same event type, and the same large field, and be in the same row of the history, then the blob is overwritten.

This latter case is the important one for entities, since they often have large inputs (that is where the entity state is stored) so that state can be overwritten every time the entity does continue-as-new, which is all the time.

This does mean this PR currently has a problem with entities (and eternal orchestrators) since the blobs will accumulate forever. It does not seem very easy to fix since the TrackingStore implementation does not know whether the old history contained any large blobs that need to be removed. We can either issue a query, or track that information somehow.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the issues were addressed.


Comment on lines +1275 to +1276
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have to allocate a new Random() each time or can we use a static Random instance?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a little tricky because the Random class is unfortunately not thread-safe, so I would have to do something complicated or ugly. I think it is better to keep it simple to read and maintain, and the performance is probably fine considering that this is only called when we need a large blob, which is by itself going to use significant CPU and memory (so this little allocation is a tiny fraction of the cost).

string blobName = $"{sanitizedInstanceId}/history-{sequenceNumber}-{eventType}-{random:X8}-{property}.json.gz";

return blobName;
}
Expand Down Expand Up @@ -1346,5 +1379,10 @@ bool ExceedsMaxTablePropertySize(string data)

return false;
}

class TrackingStoreContext
{
public List<string> Blobs { get; set; } = new List<string>();
}
}
}
3 changes: 2 additions & 1 deletion src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ interface ITrackingStore
/// <param name="instanceId">InstanceId for the Orchestration Update</param>
/// <param name="executionId">ExecutionId for the Orchestration Update</param>
/// <param name="eTag">The ETag value to use for safe updates</param>
Task<string> UpdateStateAsync(OrchestrationRuntimeState newRuntimeState, OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, string eTag);
/// <param name="trackingStoreContext">Additional context for the execution that is maintained by the tracking store.</param>
Task<string> UpdateStateAsync(OrchestrationRuntimeState newRuntimeState, OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, string eTag, object trackingStoreContext);

/// <summary>
/// Get The Orchestration State for the Latest or All Executions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public override Task StartAsync()
}

/// <inheritdoc />
public override async Task<string> UpdateStateAsync(OrchestrationRuntimeState newRuntimeState, OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, string eTag)
public override async Task<string> UpdateStateAsync(OrchestrationRuntimeState newRuntimeState, OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, string eTag, object executionData)
{
//In case there is a runtime state for an older execution/iteration as well that needs to be committed, commit it.
//This may be the case if a ContinueAsNew was executed on the orchestration
Expand Down
8 changes: 8 additions & 0 deletions src/DurableTask.AzureStorage/Tracking/OrchestrationHistory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,24 @@ public OrchestrationHistory(IList<HistoryEvent> historyEvents, DateTime lastChec
}

public OrchestrationHistory(IList<HistoryEvent> historyEvents, DateTime lastCheckpointTime, string eTag)
: this(historyEvents, lastCheckpointTime, eTag, null)
{
}

public OrchestrationHistory(IList<HistoryEvent> historyEvents, DateTime lastCheckpointTime, string eTag, object trackingStoreContext)
{
this.Events = historyEvents ?? throw new ArgumentNullException(nameof(historyEvents));
this.LastCheckpointTime = lastCheckpointTime;
this.ETag = eTag;
this.TrackingStoreContext = trackingStoreContext;
}

public IList<HistoryEvent> Events { get; }

public string ETag { get; }

public DateTime LastCheckpointTime { get; }

public object TrackingStoreContext { get; }
}
}
2 changes: 1 addition & 1 deletion src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,6 @@ public virtual Task UpdateStatusForRewindAsync(string instanceId)
public abstract Task StartAsync();

/// <inheritdoc />
public abstract Task<string> UpdateStateAsync(OrchestrationRuntimeState newRuntimeState, OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, string eTag);
public abstract Task<string> UpdateStateAsync(OrchestrationRuntimeState newRuntimeState, OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, string eTag, object executionData);
}
}