Permalink
Browse files

switch azure to use inmemory cache

  • Loading branch information...
1 parent ff2e7a1 commit aa6d25c685a5daba9d956bd4b227279b2fc71a4a @abdullin abdullin committed Mar 1, 2013
@@ -6,7 +6,6 @@
using System;
using System.Linq;
using System.Text;
-using Lokad.Cqrs;
using Lokad.Cqrs.AppendOnly;
using Lokad.Cqrs.TapeStorage;
using Microsoft.WindowsAzure;
@@ -20,13 +20,8 @@ public sealed class BlobAppendOnlyStore : IAppendOnlyStore
{
// Caches
readonly CloudBlobContainer _container;
- readonly ConcurrentDictionary<string, DataWithVersion[]> _items = new ConcurrentDictionary<string, DataWithVersion[]>();
- DataWithKey[] _all = new DataWithKey[0];
- /// <summary>
- /// Used to synchronize access between multiple threads within one process
- /// </summary>
- readonly ReaderWriterLockSlim _cacheLock = new ReaderWriterLockSlim();
+ readonly LockingInMemoryCache _cache = new LockingInMemoryCache();
bool _closed;
@@ -58,52 +53,46 @@ public void InitializeReader()
LoadCaches();
}
+ long _storeVersion = 0;
+
public void Append(string streamName, byte[] data, long expectedStreamVersion = -1)
{
- _cacheLock.EnterWriteLock();
+ // should be locked
try
{
- var list = _items.GetOrAdd(streamName, s => new DataWithVersion[0]);
- if (expectedStreamVersion >= 0)
+ _cache.Append(streamName, data, _storeVersion + 1, streamVersion =>
{
- if (list.Length != expectedStreamVersion)
- throw new AppendOnlyStoreConcurrencyException(expectedStreamVersion, list.Length, streamName);
- }
+ EnsureWriterExists(_storeVersion);
+ Persist(streamName, data, streamVersion);
+ }, expectedStreamVersion);
- EnsureWriterExists(_all.Length);
- long commit = list.Length + 1;
+ _storeVersion += 1;
- Persist(streamName, data, commit);
- AddToCaches(streamName, data, commit);
}
- catch
+ catch (AppendOnlyStoreConcurrencyException)
{
- Close();
+ //store is OK when AOSCE is thrown. This is client's problem
+ // just bubble it upwards
throw;
}
- finally
+ catch
{
- _cacheLock.ExitWriteLock();
+ // store probably corrupted. Close it and then rethrow exception
+ // so that clien will have a chance to retry.
+ Close();
+ throw;
}
}
public IEnumerable<DataWithVersion> ReadRecords(string streamName, long afterVersion, int maxCount)
{
- // no lock is needed, since we are polling immutable object.
- DataWithVersion[] list;
- return _items.TryGetValue(streamName, out list) ? list.Skip((int)afterVersion).Take(maxCount) : Enumerable.Empty<DataWithVersion>();
+ return _cache.ReadRecords(streamName, afterVersion, maxCount);
}
public IEnumerable<DataWithKey> ReadRecords(long afterVersion, int maxCount)
{
- if (afterVersion < 0)
- throw new ArgumentOutOfRangeException("afterVersion", "Must be zero or greater.");
-
- if (maxCount <= 0)
- throw new ArgumentOutOfRangeException("maxCount", "Must be more than zero.");
+ return _cache.ReadRecords(afterVersion, maxCount);
- // collection is immutable so we don't care about locks
- return _all.Skip((int)afterVersion).Take(maxCount);
}
public void Close()
@@ -120,27 +109,16 @@ public void Close()
public void ResetStore()
{
- _cacheLock.EnterWriteLock();
- try
- {
- Close();
- _all = new DataWithKey[0];
- foreach (var item in _items)
- {
- var blob = _container.GetPageBlobReference(item.Key);
- blob.DeleteIfExists();
- }
- _items.Clear();
- }
- finally
- {
- _cacheLock.ExitWriteLock();
- }
+ Close();
+ _cache.Clear(() => _container.ListBlobs()
+ .OfType<CloudPageBlob>()
+ .Where(item => item.Uri.ToString().EndsWith(".dat"))
+ .AsParallel().ForAll(i => i.DeleteIfExists()));
}
public long GetCurrentVersion()
{
- return _all.Length;
+ return _storeVersion;
}
IEnumerable<StorageFrameDecoded> EnumerateHistory()
@@ -169,35 +147,7 @@ IEnumerable<StorageFrameDecoded> EnumerateHistory()
void LoadCaches()
{
- try
- {
- _cacheLock.EnterWriteLock();
-
- foreach (var record in EnumerateHistory())
- {
- AddToCaches(record.Name, record.Bytes, record.Stamp);
- }
- }
- finally
- {
- _cacheLock.ExitWriteLock();
- }
- }
-
- void AddToCaches(string key, byte[] buffer, long commit)
- {
- var storeVersion = _all.Length + 1;
- var record = new DataWithVersion(commit, buffer, storeVersion);
- _all = AddToNewArray(_all, new DataWithKey(key, buffer, commit, storeVersion));
- _items.AddOrUpdate(key, s => new[] { record }, (s, records) => AddToNewArray(records, record));
- }
-
- static T[] AddToNewArray<T>(T[] source, T item)
- {
- var copy = new T[source.Length + 1];
- Array.Copy(source, copy, source.Length);
- copy[source.Length] = item;
- return copy;
+ _storeVersion = _cache.ReloadEverything(EnumerateHistory());
}
void Persist(string key, byte[] buffer, long commit)
@@ -206,7 +156,7 @@ void Persist(string key, byte[] buffer, long commit)
if (!_currentWriter.Fits(frame.Data.Length + frame.Hash.Length))
{
CloseWriter();
- EnsureWriterExists(_all.Length);
+ EnsureWriterExists(_storeVersion);
}
_currentWriter.Write(frame.Data);
@@ -61,6 +61,7 @@
<Compile Include="Envelope\DuplicationMemoryTest.cs" />
<Compile Include="Envelope\EnvelopeDispatcherTest.cs" />
<Compile Include="Envelope\EnvelopeStreamerTest.cs" />
+ <Compile Include="Performance_test_for_LockingInMemoryCache.cs" />
<Compile Include="RedirectToCommandTest.cs" />
<Compile Include="RedirectToDynamicEventTest.cs" />
<Compile Include="specification_with_empty_directory.cs" />
@@ -1,6 +1,4 @@
using System;
-using System.Collections.Generic;
-using System.Diagnostics;
using System.IO;
using System.Linq;
using Cqrs.Portable.Tests.Envelope;
@@ -114,31 +112,4 @@ public void when_append_to_store()
Assert.AreEqual("name1", (records[0].Items[0] as SerializerTest1).Name);
}
}
-
- [TestFixture]
- public sealed class Performance_test_for_LockingInMemoryCache
- {
- [Test]
- public void Name()
- {
- var cache = new LockingInMemoryCache();
-
- var watch = Stopwatch.StartNew();
- var count = 100000;
- cache.ReloadEverything(Generate(count, new byte[200], i => string.Format("stream_{0}", i % 100)));
- watch.Stop();
- Console.WriteLine("Cached {0} events in {1:0.00} sec. {2:0.00} eps", count, (watch.Elapsed.TotalSeconds),
- count / watch.Elapsed.TotalSeconds
- );
-
- }
-
- IEnumerable<StorageFrameDecoded> Generate(int count, byte[] buffer,Func<int,string> streamName)
- {
- for (int i = 0; i < count; i++)
- {
- yield return new StorageFrameDecoded(buffer, streamName(i), i);
- }
- }
- }
}
@@ -5,12 +5,14 @@
using Lokad.Cqrs.TapeStorage;
using NUnit.Framework;
+// ReSharper disable InconsistentNaming
namespace Cqrs.Portable.Tests
{
[TestFixture]
+
public sealed class Performance_test_for_LockingInMemoryCache
{
- [Test]
+ [Test, Explicit]
public void Name()
{
var cache = new LockingInMemoryCache();
@@ -20,8 +22,7 @@ public void Name()
cache.ReloadEverything(Generate(count, new byte[200], i => string.Format("stream_{0}", i % 100)));
watch.Stop();
Console.WriteLine("Cached {0} events in {1:0.00} sec. {2:0.00} eps", count, (watch.Elapsed.TotalSeconds),
- count / watch.Elapsed.TotalSeconds
- );
+ count / watch.Elapsed.TotalSeconds);
}
@@ -107,6 +107,7 @@
<Compile Include="SystemObserver.cs" />
<Compile Include="TapeStorage\FileAppendOnlyStore.cs" />
<Compile Include="TapeStorage\IAppendOnlyStore.cs" />
+ <Compile Include="TapeStorage\LockingInMemoryCache.cs" />
<Compile Include="TapeStorage\MemoryTape.cs" />
<Compile Include="unit.cs" />
</ItemGroup>
Oops, something went wrong.

0 comments on commit aa6d25c

Please sign in to comment.