Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DT.AzureStorage: add randomization to history blob names to avoid races #891

Merged
merged 7 commits into from
May 11, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -1052,8 +1052,8 @@ async Task AbandonAndReleaseSessionAsync(OrchestrationSession session)
// will result in a duplicate replay of the orchestration with no side-effects.
try
{
session.ETag = await this.trackingStore.UpdateStateAsync(runtimeState, workItem.OrchestrationRuntimeState, instanceId, executionId, session.ETag);

session.ETag = await this.trackingStore.UpdateStateAsync(runtimeState, workItem.OrchestrationRuntimeState, instanceId, executionId, session.ETag, session.TrackingStoreData);
sebastianburckhardt marked this conversation as resolved.
Show resolved Hide resolved
// update the runtime state and execution id stored in the session
session.UpdateRuntimeState(runtimeState);

Expand Down
6 changes: 6 additions & 0 deletions src/DurableTask.AzureStorage/MessageManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,12 @@ public Task<string> DownloadAndDecompressAsBytesAsync(Uri blobUri)
return DownloadAndDecompressAsBytesAsync(blob);
}

public Task<bool> DeleteOrphanedBlobAsync(string blobName)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can we call this method DeleteBlobAsync? I don't see any logic here related to orphaned data.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea.

{
Blob blob = this.blobContainer.GetBlobReference(blobName);
return blob.DeleteIfExistsAsync();
}

private async Task<string> DownloadAndDecompressAsBytesAsync(Blob blob)
{
using (MemoryStream memory = new MemoryStream(MaxStorageQueuePayloadSizeInBytes * 2))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ sealed class OrchestrationSession : SessionBase, IOrchestrationSession
OrchestrationRuntimeState runtimeState,
string eTag,
DateTime lastCheckpointTime,
object trackingStoreData,
TimeSpan idleTimeout,
Guid traceActivityId)
: base(settings, storageAccountName, orchestrationInstance, traceActivityId)
Expand All @@ -48,6 +49,7 @@ sealed class OrchestrationSession : SessionBase, IOrchestrationSession
this.RuntimeState = runtimeState ?? throw new ArgumentNullException(nameof(runtimeState));
this.ETag = eTag;
this.LastCheckpointTime = lastCheckpointTime;
this.TrackingStoreData = trackingStoreData;

this.messagesAvailableEvent = new AsyncAutoResetEvent(signaled: false);
this.nextMessageBatch = new MessageCollection();
Expand All @@ -67,6 +69,8 @@ sealed class OrchestrationSession : SessionBase, IOrchestrationSession

public DateTime LastCheckpointTime { get; }

public object TrackingStoreData { get; }

public IReadOnlyList<MessageData> PendingMessages => this.nextMessageBatch;

public override int GetCurrentEpisode()
Expand Down
3 changes: 3 additions & 0 deletions src/DurableTask.AzureStorage/OrchestrationSessionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,7 @@ bool IsScheduledAfterInstanceUpdate(MessageData msg, OrchestrationState? remoteI
batch.OrchestrationState = new OrchestrationRuntimeState(history.Events);
batch.ETag = history.ETag;
batch.LastCheckpointTime = history.LastCheckpointTime;
batch.TrackingStoreData = history.TrackingStoreData;
}

this.readyForProcessingQueue.Enqueue(node);
Expand Down Expand Up @@ -538,6 +539,7 @@ bool IsScheduledAfterInstanceUpdate(MessageData msg, OrchestrationState? remoteI
nextBatch.OrchestrationState,
nextBatch.ETag,
nextBatch.LastCheckpointTime,
nextBatch.TrackingStoreData,
this.settings.ExtendedSessionIdleTimeout,
traceActivityId);

Expand Down Expand Up @@ -683,6 +685,7 @@ public PendingMessageBatch(ControlQueue controlQueue, string instanceId, string?

public string? ETag { get; set; }
public DateTime LastCheckpointTime { get; set; }
public object? TrackingStoreData { get; set; }
}
}
}
58 changes: 48 additions & 10 deletions src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
namespace DurableTask.AzureStorage.Tracking
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
Expand All @@ -32,7 +33,7 @@ namespace DurableTask.AzureStorage.Tracking
/// <summary>
/// Tracking store for use with <see cref="AzureStorageOrchestrationService"/>. Uses Azure Tables and Azure Blobs to store runtime state.
/// </summary>
class AzureTableTrackingStore : TrackingStoreBase
class AzureTableTrackingStore : TrackingStoreBase, ITrackingStore
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TrackingStoreBase already implements ITrackingStore so you can remove this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely.

{
const string NameProperty = "Name";
const string InputProperty = "Input";
Expand Down Expand Up @@ -158,6 +159,7 @@ public override async Task<OrchestrationHistory> GetHistoryEventsAsync(string in
IList<HistoryEvent> historyEvents;
string executionId;
DynamicTableEntity sentinel = null;
TrackingStoreData trackingStoreData = new TrackingStoreData();
if (tableEntities.Count > 0)
{
// The most recent generation will always be in the first history event.
Expand All @@ -183,7 +185,7 @@ public override async Task<OrchestrationHistory> GetHistoryEventsAsync(string in
}

// Some entity properties may be stored in blob storage.
await this.DecompressLargeEntityProperties(entity);
await this.DecompressLargeEntityProperties(entity, trackingStoreData.Blobs);

events.Add((HistoryEvent)this.tableEntityConverter.ConvertFromTableEntity(entity, GetTypeForTableEntity));
}
Expand Down Expand Up @@ -222,7 +224,7 @@ public override async Task<OrchestrationHistory> GetHistoryEventsAsync(string in
eTagValue,
checkpointCompletionTime);

return new OrchestrationHistory(historyEvents, checkpointCompletionTime, eTagValue);
return new OrchestrationHistory(historyEvents, checkpointCompletionTime, eTagValue, trackingStoreData);
}

async Task<TableEntitiesResponseInfo<DynamicTableEntity>> GetHistoryEntitiesResponseInfoAsync(string instanceId, string expectedExecutionId, IList<string> projectionColumns, CancellationToken cancellationToken = default(CancellationToken))
Expand Down Expand Up @@ -843,7 +845,7 @@ public override async Task<PurgeHistoryResult> PurgeInstanceHistoryAsync(string

// It is possible that the queue message was small enough to be written directly to a queue message,
// not a blob, but is too large to be written to a table property.
await this.CompressLargeMessageAsync(entity);
await this.CompressLargeMessageAsync(entity, listOfBlobs: null);

Stopwatch stopwatch = Stopwatch.StartNew();
try
Expand Down Expand Up @@ -928,11 +930,13 @@ public override Task StartAsync()
OrchestrationRuntimeState oldRuntimeState,
string instanceId,
string executionId,
string eTagValue)
string eTagValue,
object trackingStoreData)
{
int estimatedBytes = 0;
IList<HistoryEvent> newEvents = newRuntimeState.NewEvents;
IList<HistoryEvent> allEvents = newRuntimeState.Events;
TrackingStoreData data = (TrackingStoreData) trackingStoreData;
davidmrdavid marked this conversation as resolved.
Show resolved Hide resolved

int episodeNumber = Utils.GetEpisodeNumber(newRuntimeState);

Expand All @@ -953,7 +957,15 @@ public override Task StartAsync()
["LastUpdatedTime"] = new EntityProperty(newEvents.Last().Timestamp),
}
};


// check if we are replacing a previous execution with blobs; those will be deleted from the store after the update
List<string> blobsToDelete = null;
if (oldRuntimeState != newRuntimeState && data.Blobs.Count > 0)
{
blobsToDelete = data.Blobs;
data.Blobs = new List<string>();
}
sebastianburckhardt marked this conversation as resolved.
Show resolved Hide resolved

davidmrdavid marked this conversation as resolved.
Show resolved Hide resolved
for (int i = 0; i < newEvents.Count; i++)
{
bool isFinalEvent = i == newEvents.Count - 1;
Expand All @@ -969,7 +981,7 @@ public override Task StartAsync()
historyEntity.RowKey = sequenceNumber.ToString("X16");
historyEntity.Properties["ExecutionId"] = new EntityProperty(executionId);

await this.CompressLargeMessageAsync(historyEntity);
await this.CompressLargeMessageAsync(historyEntity, data.Blobs);

// Replacement can happen if the orchestration episode gets replayed due to a commit failure in one of the steps below.
historyEventBatch.InsertOrReplace(historyEntity);
Expand Down Expand Up @@ -1108,6 +1120,18 @@ public override Task StartAsync()
episodeNumber,
orchestrationInstanceUpdateStopwatch.ElapsedMilliseconds);

// finally, delete orphaned blobs from the previous execution history.
// We had to wait until the new history has committed to make sure the blobs are no longer necessary.
if (blobsToDelete != null)
{
var tasks = new List<Task>();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
var tasks = new List<Task>();
var tasks = new List<Task>(blobsToDelete.Count);

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

foreach (var blobName in blobsToDelete)
{
tasks.Add(this.messageManager.DeleteOrphanedBlobAsync(blobName));
}
await Task.WhenAll(tasks);
}

return eTagValue;
}

Expand Down Expand Up @@ -1177,7 +1201,7 @@ Type GetTypeForTableEntity(DynamicTableEntity tableEntity)
}
}

async Task CompressLargeMessageAsync(DynamicTableEntity entity)
async Task CompressLargeMessageAsync(DynamicTableEntity entity, List<string> listOfBlobs)
davidmrdavid marked this conversation as resolved.
Show resolved Hide resolved
{
foreach (string propertyName in VariableSizeEntityProperties)
{
Expand All @@ -1194,11 +1218,14 @@ async Task CompressLargeMessageAsync(DynamicTableEntity entity)
string blobPropertyName = GetBlobPropertyName(propertyName);
entity.Properties.Add(blobPropertyName, new EntityProperty(blobName));
entity.Properties[propertyName].StringValue = string.Empty;

// if necessary, keep track of all the blobs associated with this execution
listOfBlobs?.Add(blobName);
}
}
}

async Task DecompressLargeEntityProperties(DynamicTableEntity entity)
async Task DecompressLargeEntityProperties(DynamicTableEntity entity, List<string> listOfBlobs)
{
// Check for entity properties stored in blob storage
foreach (string propertyName in VariableSizeEntityProperties)
Expand All @@ -1210,6 +1237,9 @@ async Task DecompressLargeEntityProperties(DynamicTableEntity entity)
string decompressedMessage = await this.messageManager.DownloadAndDecompressAsBytesAsync(blobName);
entity.Properties[propertyName] = new EntityProperty(decompressedMessage);
entity.Properties.Remove(blobPropertyName);

// keep track of all the blobs associated with this execution
listOfBlobs.Add(blobName);
}
}
}
Expand Down Expand Up @@ -1241,7 +1271,10 @@ static string GetBlobName(DynamicTableEntity entity, string property)
throw new InvalidOperationException($"Could not compute the blob name for property {property}");
}

string blobName = $"{sanitizedInstanceId}/history-{sequenceNumber}-{eventType}-{property}.json.gz";
// randomize the blob name to prevent accidental races in split-brain situations (#890)
uint random = (uint)(new Random()).Next();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason for us not to use a GUID in this case? I would suspect that may be more "unique" than a random number, but perhaps that's incorrect.

Also - do we need to be concerned about creating orphaned blobs as a result of this?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also - do we need to be concerned about creating orphaned blobs as a result of this?

I don't have it in front of me, but it would be good to check our purge logic to see if it might end up missing orphaned blobs.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A GUID of 128 bit is indeed more random than the 32bit used here. But for this particular purpose I think 32bit is more than enough - split brain does not happen frequently enough to make collisions likely. I didn't want to waste so much space in the blob name.

The purge operation seems to wipe out the entire blob directory belonging to the instance id, so no problem there.

One thing I am not sure about is the case where a history is replaced (e.g. after ContinueAsNew). There should be some code somewhere that removes blobs for the old history (even without the proposed randomization) but I haven't quite spotted it yet.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at the current code some more. It simply does not remove any old blobs when updating histories - so those blobs may accumulate until purging the instance. However, importantly, in some select cases, it can overwrite old blobs with a new blob of the same name. Tor example, when calling continue as new and the new blob happens to have the same event type, and the same large field, and be in the same row of the history, then the blob is overwritten.

This latter case is the important one for entities, since they often have large inputs (that is where the entity state is stored) so that state can be overwritten every time the entity does continue-as-new, which is all the time.

This does mean this PR currently has a problem with entities (and eternal orchestrators) since the blobs will accumulate forever. It does not seem very easy to fix since the TrackingStore implementation does not know whether the old history contained any large blobs that need to be removed. We can either issue a query, or track that information somehow.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the issues were addressed.


Comment on lines +1275 to +1276
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have to allocate a new Random() each time or can we use a static Random instance?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a little tricky because the Random class is unfortunately not thread-safe, so I would have to do something complicated or ugly. I think it is better to keep it simple to read and maintain, and the performance is probably fine considering that this is only called when we need a large blob, which is by itself going to use significant CPU and memory (so this little allocation is a tiny fraction of the cost).

string blobName = $"{sanitizedInstanceId}/history-{sequenceNumber}-{eventType}-{random:X8}-{property}.json.gz";

return blobName;
}
Expand Down Expand Up @@ -1346,5 +1379,10 @@ bool ExceedsMaxTablePropertySize(string data)

return false;
}

class TrackingStoreData
{
public List<string> Blobs { get; set; } = new List<string>();
}
}
}
3 changes: 2 additions & 1 deletion src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ interface ITrackingStore
/// <param name="instanceId">InstanceId for the Orchestration Update</param>
/// <param name="executionId">ExecutionId for the Orchestration Update</param>
/// <param name="eTag">The ETag value to use for safe updates</param>
Task<string> UpdateStateAsync(OrchestrationRuntimeState newRuntimeState, OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, string eTag);
/// <param name="trackingStoreData">The additional data that is maintained for this execution.</param>
Task<string> UpdateStateAsync(OrchestrationRuntimeState newRuntimeState, OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, string eTag, object trackingStoreData);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: trackingStoreContext might be a better word since "data" is heavily overloaded in this context (it would almost read as if this "data" is part of what we're updating).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like it.


/// <summary>
/// Get The Orchestration State for the Latest or All Executions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public override Task StartAsync()
}

/// <inheritdoc />
public override async Task<string> UpdateStateAsync(OrchestrationRuntimeState newRuntimeState, OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, string eTag)
public override async Task<string> UpdateStateAsync(OrchestrationRuntimeState newRuntimeState, OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, string eTag, object executionData)
{
//In case there is a runtime state for an older execution/iteration as well that needs to be committed, commit it.
//This may be the case if a ContinueAsNew was executed on the orchestration
Expand Down
9 changes: 9 additions & 0 deletions src/DurableTask.AzureStorage/Tracking/OrchestrationHistory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,20 @@ public OrchestrationHistory(IList<HistoryEvent> historyEvents, DateTime lastChec
this.LastCheckpointTime = lastCheckpointTime;
this.ETag = eTag;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's update this overload to call your new overload.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense.

}
public OrchestrationHistory(IList<HistoryEvent> historyEvents, DateTime lastCheckpointTime, string eTag, object trackingStoreData)
{
this.Events = historyEvents ?? throw new ArgumentNullException(nameof(historyEvents));
this.LastCheckpointTime = lastCheckpointTime;
this.ETag = eTag;
this.TrackingStoreData = trackingStoreData;
}

public IList<HistoryEvent> Events { get; }

public string ETag { get; }

public DateTime LastCheckpointTime { get; }

public object TrackingStoreData { get; }
}
}
2 changes: 1 addition & 1 deletion src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,6 @@ public virtual Task UpdateStatusForRewindAsync(string instanceId)
public abstract Task StartAsync();

/// <inheritdoc />
public abstract Task<string> UpdateStateAsync(OrchestrationRuntimeState newRuntimeState, OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, string eTag);
public abstract Task<string> UpdateStateAsync(OrchestrationRuntimeState newRuntimeState, OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, string eTag, object executionData);
}
}