-
Notifications
You must be signed in to change notification settings - Fork 287
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DT.AzureStorage: add randomization to history blob names to avoid races #891
Changes from 6 commits
49dcce5
8920c49
9e575cc
dad88f8
f9efbd0
40ad7f3
5c1742d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -14,6 +14,7 @@ | |||||
namespace DurableTask.AzureStorage.Tracking | ||||||
{ | ||||||
using System; | ||||||
using System.Collections.Concurrent; | ||||||
using System.Collections.Generic; | ||||||
using System.Diagnostics; | ||||||
using System.Linq; | ||||||
|
@@ -32,7 +33,7 @@ namespace DurableTask.AzureStorage.Tracking | |||||
/// <summary> | ||||||
/// Tracking store for use with <see cref="AzureStorageOrchestrationService"/>. Uses Azure Tables and Azure Blobs to store runtime state. | ||||||
/// </summary> | ||||||
class AzureTableTrackingStore : TrackingStoreBase | ||||||
class AzureTableTrackingStore : TrackingStoreBase, ITrackingStore | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Absolutely. |
||||||
{ | ||||||
const string NameProperty = "Name"; | ||||||
const string InputProperty = "Input"; | ||||||
|
@@ -158,6 +159,7 @@ public override async Task<OrchestrationHistory> GetHistoryEventsAsync(string in | |||||
IList<HistoryEvent> historyEvents; | ||||||
string executionId; | ||||||
DynamicTableEntity sentinel = null; | ||||||
TrackingStoreData trackingStoreData = new TrackingStoreData(); | ||||||
if (tableEntities.Count > 0) | ||||||
{ | ||||||
// The most recent generation will always be in the first history event. | ||||||
|
@@ -183,7 +185,7 @@ public override async Task<OrchestrationHistory> GetHistoryEventsAsync(string in | |||||
} | ||||||
|
||||||
// Some entity properties may be stored in blob storage. | ||||||
await this.DecompressLargeEntityProperties(entity); | ||||||
await this.DecompressLargeEntityProperties(entity, trackingStoreData.Blobs); | ||||||
|
||||||
events.Add((HistoryEvent)this.tableEntityConverter.ConvertFromTableEntity(entity, GetTypeForTableEntity)); | ||||||
} | ||||||
|
@@ -222,7 +224,7 @@ public override async Task<OrchestrationHistory> GetHistoryEventsAsync(string in | |||||
eTagValue, | ||||||
checkpointCompletionTime); | ||||||
|
||||||
return new OrchestrationHistory(historyEvents, checkpointCompletionTime, eTagValue); | ||||||
return new OrchestrationHistory(historyEvents, checkpointCompletionTime, eTagValue, trackingStoreData); | ||||||
} | ||||||
|
||||||
async Task<TableEntitiesResponseInfo<DynamicTableEntity>> GetHistoryEntitiesResponseInfoAsync(string instanceId, string expectedExecutionId, IList<string> projectionColumns, CancellationToken cancellationToken = default(CancellationToken)) | ||||||
|
@@ -843,7 +845,7 @@ public override async Task<PurgeHistoryResult> PurgeInstanceHistoryAsync(string | |||||
|
||||||
// It is possible that the queue message was small enough to be written directly to a queue message, | ||||||
// not a blob, but is too large to be written to a table property. | ||||||
await this.CompressLargeMessageAsync(entity); | ||||||
await this.CompressLargeMessageAsync(entity, listOfBlobs: null); | ||||||
|
||||||
Stopwatch stopwatch = Stopwatch.StartNew(); | ||||||
try | ||||||
|
@@ -928,11 +930,13 @@ public override Task StartAsync() | |||||
OrchestrationRuntimeState oldRuntimeState, | ||||||
string instanceId, | ||||||
string executionId, | ||||||
string eTagValue) | ||||||
string eTagValue, | ||||||
object trackingStoreData) | ||||||
{ | ||||||
int estimatedBytes = 0; | ||||||
IList<HistoryEvent> newEvents = newRuntimeState.NewEvents; | ||||||
IList<HistoryEvent> allEvents = newRuntimeState.Events; | ||||||
TrackingStoreData data = (TrackingStoreData) trackingStoreData; | ||||||
davidmrdavid marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
int episodeNumber = Utils.GetEpisodeNumber(newRuntimeState); | ||||||
|
||||||
|
@@ -953,7 +957,15 @@ public override Task StartAsync() | |||||
["LastUpdatedTime"] = new EntityProperty(newEvents.Last().Timestamp), | ||||||
} | ||||||
}; | ||||||
|
||||||
|
||||||
// check if we are replacing a previous execution with blobs; those will be deleted from the store after the update. This could occur in a ContinueAsNew scenario | ||||||
List<string> blobsToDelete = null; | ||||||
if (oldRuntimeState != newRuntimeState && data.Blobs.Count > 0) | ||||||
{ | ||||||
blobsToDelete = data.Blobs; | ||||||
data.Blobs = new List<string>(); | ||||||
} | ||||||
|
||||||
for (int i = 0; i < newEvents.Count; i++) | ||||||
{ | ||||||
bool isFinalEvent = i == newEvents.Count - 1; | ||||||
|
@@ -969,7 +981,7 @@ public override Task StartAsync() | |||||
historyEntity.RowKey = sequenceNumber.ToString("X16"); | ||||||
historyEntity.Properties["ExecutionId"] = new EntityProperty(executionId); | ||||||
|
||||||
await this.CompressLargeMessageAsync(historyEntity); | ||||||
await this.CompressLargeMessageAsync(historyEntity, data.Blobs); | ||||||
|
||||||
// Replacement can happen if the orchestration episode gets replayed due to a commit failure in one of the steps below. | ||||||
historyEventBatch.InsertOrReplace(historyEntity); | ||||||
|
@@ -1108,6 +1120,18 @@ public override Task StartAsync() | |||||
episodeNumber, | ||||||
orchestrationInstanceUpdateStopwatch.ElapsedMilliseconds); | ||||||
|
||||||
// finally, delete orphaned blobs from the previous execution history. | ||||||
// We had to wait until the new history has committed to make sure the blobs are no longer necessary. | ||||||
if (blobsToDelete != null) | ||||||
{ | ||||||
var tasks = new List<Task>(); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure. |
||||||
foreach (var blobName in blobsToDelete) | ||||||
{ | ||||||
tasks.Add(this.messageManager.DeleteOrphanedBlobAsync(blobName)); | ||||||
} | ||||||
await Task.WhenAll(tasks); | ||||||
} | ||||||
|
||||||
return eTagValue; | ||||||
} | ||||||
|
||||||
|
@@ -1177,7 +1201,7 @@ Type GetTypeForTableEntity(DynamicTableEntity tableEntity) | |||||
} | ||||||
} | ||||||
|
||||||
async Task CompressLargeMessageAsync(DynamicTableEntity entity) | ||||||
async Task CompressLargeMessageAsync(DynamicTableEntity entity, List<string> listOfBlobs) | ||||||
davidmrdavid marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
{ | ||||||
foreach (string propertyName in VariableSizeEntityProperties) | ||||||
{ | ||||||
|
@@ -1194,11 +1218,14 @@ async Task CompressLargeMessageAsync(DynamicTableEntity entity) | |||||
string blobPropertyName = GetBlobPropertyName(propertyName); | ||||||
entity.Properties.Add(blobPropertyName, new EntityProperty(blobName)); | ||||||
entity.Properties[propertyName].StringValue = string.Empty; | ||||||
|
||||||
// if necessary, keep track of all the blobs associated with this execution | ||||||
listOfBlobs?.Add(blobName); | ||||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
async Task DecompressLargeEntityProperties(DynamicTableEntity entity) | ||||||
async Task DecompressLargeEntityProperties(DynamicTableEntity entity, List<string> listOfBlobs) | ||||||
{ | ||||||
// Check for entity properties stored in blob storage | ||||||
foreach (string propertyName in VariableSizeEntityProperties) | ||||||
|
@@ -1210,6 +1237,9 @@ async Task DecompressLargeEntityProperties(DynamicTableEntity entity) | |||||
string decompressedMessage = await this.messageManager.DownloadAndDecompressAsBytesAsync(blobName); | ||||||
entity.Properties[propertyName] = new EntityProperty(decompressedMessage); | ||||||
entity.Properties.Remove(blobPropertyName); | ||||||
|
||||||
// keep track of all the blobs associated with this execution | ||||||
listOfBlobs.Add(blobName); | ||||||
} | ||||||
} | ||||||
} | ||||||
|
@@ -1241,7 +1271,10 @@ static string GetBlobName(DynamicTableEntity entity, string property) | |||||
throw new InvalidOperationException($"Could not compute the blob name for property {property}"); | ||||||
} | ||||||
|
||||||
string blobName = $"{sanitizedInstanceId}/history-{sequenceNumber}-{eventType}-{property}.json.gz"; | ||||||
// randomize the blob name to prevent accidental races in split-brain situations (#890) | ||||||
uint random = (uint)(new Random()).Next(); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. any reason for us not to use a GUID in this case? I would suspect that may be more "unique" than a random number, but perhaps that's incorrect. Also - do we need to be concerned about creating orphaned blobs as a result of this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I don't have it in front of me, but it would be good to check our purge logic to see if it might end up missing orphaned blobs. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A GUID of 128 bit is indeed more random than the 32bit used here. But for this particular purpose I think 32bit is more than enough - split brain does not happen frequently enough to make collisions likely. I didn't want to waste so much space in the blob name. The purge operation seems to wipe out the entire blob directory belonging to the instance id, so no problem there. One thing I am not sure about is the case where a history is replaced (e.g. after ContinueAsNew). There should be some code somewhere that removes blobs for the old history (even without the proposed randomization) but I haven't quite spotted it yet. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I looked at the current code some more. It simply does not remove any old blobs when updating histories - so those blobs may accumulate until purging the instance. However, importantly, in some select cases, it can overwrite old blobs with a new blob of the same name. Tor example, when calling continue as new and the new blob happens to have the same event type, and the same large field, and be in the same row of the history, then the blob is overwritten. This latter case is the important one for entities, since they often have large inputs (that is where the entity state is stored) so that state can be overwritten every time the entity does continue-as-new, which is all the time. This does mean this PR currently has a problem with entities (and eternal orchestrators) since the blobs will accumulate forever. It does not seem very easy to fix since the TrackingStore implementation does not know whether the old history contained any large blobs that need to be removed. We can either issue a query, or track that information somehow. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All the issues were addressed. |
||||||
|
||||||
Comment on lines
+1275
to
+1276
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we have to allocate a new There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's a little tricky because the |
||||||
string blobName = $"{sanitizedInstanceId}/history-{sequenceNumber}-{eventType}-{random:X8}-{property}.json.gz"; | ||||||
|
||||||
return blobName; | ||||||
} | ||||||
|
@@ -1346,5 +1379,10 @@ bool ExceedsMaxTablePropertySize(string data) | |||||
|
||||||
return false; | ||||||
} | ||||||
|
||||||
class TrackingStoreData | ||||||
{ | ||||||
public List<string> Blobs { get; set; } = new List<string>(); | ||||||
} | ||||||
} | ||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -69,7 +69,8 @@ interface ITrackingStore | |
/// <param name="instanceId">InstanceId for the Orchestration Update</param> | ||
/// <param name="executionId">ExecutionId for the Orchestration Update</param> | ||
/// <param name="eTag">The ETag value to use for safe updates</param> | ||
Task<string> UpdateStateAsync(OrchestrationRuntimeState newRuntimeState, OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, string eTag); | ||
/// <param name="trackingStoreData">The additional data that is maintained for this execution.</param> | ||
Task<string> UpdateStateAsync(OrchestrationRuntimeState newRuntimeState, OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, string eTag, object trackingStoreData); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like it. |
||
|
||
/// <summary> | ||
/// Get The Orchestration State for the Latest or All Executions | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,11 +35,20 @@ public OrchestrationHistory(IList<HistoryEvent> historyEvents, DateTime lastChec | |
this.LastCheckpointTime = lastCheckpointTime; | ||
this.ETag = eTag; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's update this overload to call your new overload. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Makes sense. |
||
} | ||
public OrchestrationHistory(IList<HistoryEvent> historyEvents, DateTime lastCheckpointTime, string eTag, object trackingStoreData) | ||
{ | ||
this.Events = historyEvents ?? throw new ArgumentNullException(nameof(historyEvents)); | ||
this.LastCheckpointTime = lastCheckpointTime; | ||
this.ETag = eTag; | ||
this.TrackingStoreData = trackingStoreData; | ||
} | ||
|
||
public IList<HistoryEvent> Events { get; } | ||
|
||
public string ETag { get; } | ||
|
||
public DateTime LastCheckpointTime { get; } | ||
|
||
public object TrackingStoreData { get; } | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Can we call this method
DeleteBlobAsync
? I don't see any logic here related to orphaned data.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea.