Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Fixing race conditions in event stores

  • Loading branch information...
commit 6308ce36d52a58aaf7bd3080e3af032538ed69a6 1 parent e257d7a
@abdullin abdullin authored
View
43 Cqrs.Azure/AppendOnly/BlobAppendOnlyStore.cs
@@ -54,7 +54,7 @@ public void InitializeReader()
LoadCaches();
}
- long _storeVersion;
+
int _pageSizeMultiplier = 1024 * 512;
public void Append(string streamName, byte[] data, long expectedStreamVersion = -1)
@@ -62,14 +62,12 @@ public void Append(string streamName, byte[] data, long expectedStreamVersion =
// should be locked
try
{
- _cache.Append(streamName, data, _storeVersion + 1, streamVersion =>
+ _cache.ConcurrentAppend(streamName, data, (streamVersion, storeVersion) =>
{
- EnsureWriterExists(_storeVersion);
+ EnsureWriterExists(storeVersion);
Persist(streamName, data, streamVersion);
}, expectedStreamVersion);
- _storeVersion += 1;
-
}
catch (AppendOnlyStoreConcurrencyException)
{
@@ -118,15 +116,16 @@ public void ResetStore()
blobs
.AsParallel().ForAll(i => i.DeleteIfExists());
- _storeVersion = 0;
});
}
public long GetCurrentVersion()
{
- return _storeVersion;
+ return _cache.StoreVersion;
}
+
+
IEnumerable<StorageFrameDecoded> EnumerateHistory()
{
// cleanup old pending files
@@ -138,7 +137,6 @@ IEnumerable<StorageFrameDecoded> EnumerateHistory()
BlobListingDetails = BlobListingDetails.Metadata
})
.OrderBy(s => s.Uri.ToString())
-
.OfType<CloudPageBlob>()
.Where(s => s.Name.EndsWith(".dat"));
@@ -156,25 +154,28 @@ IEnumerable<StorageFrameDecoded> EnumerateHistory()
lastValidPosition = stream.Position;
yield return result;
}
-
-
}
var haveSomethingToTruncate = bytes.Length - lastValidPosition >= 512;
if (potentiallyNonTruncatedChunk & haveSomethingToTruncate)
{
- var trunc = lastValidPosition;
- var remainder = lastValidPosition % 512;
- if (remainder > 0)
- {
- trunc += 512 - remainder;
- }
- Trace.WriteLine(string.Format("Truncating {0} to {1}", fileInfo.Name, trunc));
- _container.GetPageBlobReference(fileInfo.Name + ".bak").CopyFromBlob(fileInfo);
- SetLength(fileInfo, trunc);
+ TruncateBlob(lastValidPosition, fileInfo);
}
}
}
+ void TruncateBlob(long lastValidPosition, CloudPageBlob fileInfo)
+ {
+ var trunc = lastValidPosition;
+ var remainder = lastValidPosition % 512;
+ if (remainder > 0)
+ {
+ trunc += 512 - remainder;
+ }
+ Trace.WriteLine(string.Format("Truncating {0} to {1}", fileInfo.Name, trunc));
+ _container.GetPageBlobReference(fileInfo.Name + ".bak").CopyFromBlob(fileInfo);
+ SetLength(fileInfo, trunc);
+ }
+
static void SetLength(CloudPageBlob blob, long newLength, int timeout = 10000)
{
var credentials = blob.ServiceClient.Credentials;
@@ -195,7 +196,7 @@ static void SetLength(CloudPageBlob blob, long newLength, int timeout = 10000)
void LoadCaches()
{
- _storeVersion = _cache.ReloadEverything(EnumerateHistory());
+ _cache.ReloadEverything(EnumerateHistory());
}
void Persist(string key, byte[] buffer, long commit)
@@ -204,7 +205,7 @@ void Persist(string key, byte[] buffer, long commit)
if (!_currentWriter.Fits(frame.Data.Length + frame.Hash.Length))
{
CloseWriter();
- EnsureWriterExists(_storeVersion);
+ EnsureWriterExists(_cache.StoreVersion);
}
_currentWriter.Write(frame.Data);
View
20 Cqrs.Portable/TapeStorage/FileAppendOnlyStore.cs
@@ -40,11 +40,11 @@ public void Initialize()
LoadCaches();
}
- long _storeVersion = 0;
+
public void LoadCaches()
{
- _storeVersion = _cache.ReloadEverything(EnumerateHistory());
+ _cache.ReloadEverything(EnumerateHistory());
}
IEnumerable<StorageFrameDecoded> EnumerateHistory()
@@ -92,14 +92,12 @@ public void Append(string streamName, byte[] data, long expectedStreamVersion =
try
{
- _cache.Append(streamName, data, _storeVersion + 1, streamVersion =>
+ _cache.ConcurrentAppend(streamName, data, (streamVersion, storeVersion) =>
{
- EnsureWriterExists(_storeVersion);
+ EnsureWriterExists(storeVersion);
PersistInFile(streamName, data, streamVersion);
}, expectedStreamVersion);
- _storeVersion += 1;
-
}
catch (AppendOnlyStoreConcurrencyException)
{
@@ -160,20 +158,14 @@ public void Close()
public void ResetStore()
{
Close();
-
-
- _cache.Clear(() =>
- {
- Directory.Delete(_info.FullName, true);
- _storeVersion = 0;
- });
+ _cache.Clear(() => Directory.Delete(_info.FullName, true));
Initialize();
}
public long GetCurrentVersion()
{
- return _storeVersion;
+ return _cache.StoreVersion;
}
}
}
View
23 Cqrs.Portable/TapeStorage/LockingInMemoryCache.cs
@@ -15,7 +15,7 @@ public sealed class LockingInMemoryCache
ConcurrentDictionary<string, DataWithVersion[]> _cacheByKey = new ConcurrentDictionary<string, DataWithVersion[]>();
DataWithKey[] _cacheFull = new DataWithKey[0];
- public long ReloadEverything(IEnumerable<StorageFrameDecoded> sfd)
+ public void ReloadEverything(IEnumerable<StorageFrameDecoded> sfd)
{
_thread.EnterWriteLock();
try
@@ -44,8 +44,7 @@ public long ReloadEverything(IEnumerable<StorageFrameDecoded> sfd)
_cacheFull = cacheFullBuilder.ToArray();
_cacheByKey = new ConcurrentDictionary<string, DataWithVersion[]>(streamPointerBuilder.Select(p => new KeyValuePair<string, DataWithVersion[]>(p.Key, p.Value.ToArray())));
-
- return storeVersion;
+ _storeVersion = storeVersion;
}
finally
{
@@ -64,7 +63,13 @@ static T[] ImmutableAdd<T>(T[] source, T item)
return copy;
}
- public void Append(string streamName, byte[] data, long newStoreVersion, Action<long> commitStreamVersion, long expectedStreamVersion = -1)
+ long _storeVersion;
+
+ public long StoreVersion { get { return _storeVersion; } }
+
+ public delegate void OnCommit(long streamVersion, long storeVersion);
+
+ public void ConcurrentAppend(string streamName, byte[] data, OnCommit commit, long expectedStreamVersion = -1)
{
_thread.EnterWriteLock();
@@ -77,11 +82,16 @@ public void Append(string streamName, byte[] data, long newStoreVersion, Action<
throw new AppendOnlyStoreConcurrencyException(expectedStreamVersion, list.Length, streamName);
}
long newStreamVersion = list.Length + 1;
+ long newStoreVersion = _storeVersion + 1;
+
+ commit(expectedStreamVersion, newStoreVersion);
+
+ // update in-memory cache only after real commit completed
+
var record = new DataWithVersion(newStreamVersion, data, newStoreVersion);
_cacheFull = ImmutableAdd(_cacheFull, new DataWithKey(streamName, data, newStreamVersion, newStoreVersion));
_cacheByKey.AddOrUpdate(streamName, s => new[] { record }, (s, records) => ImmutableAdd(records, record));
-
- commitStreamVersion(newStreamVersion);
+ _storeVersion = newStoreVersion;
}
finally
{
@@ -120,6 +130,7 @@ public void Clear(Action onCommit)
onCommit();
_cacheFull = new DataWithKey[0];
_cacheByKey.Clear();
+ _storeVersion = 0;
}
finally
{
Please sign in to comment.
Something went wrong with that request. Please try again.