Permalink
Browse files

Tweaking tests

  • Loading branch information...
1 parent 1717beb commit 75e16de6103c3a6fd9d59846ccea6eeb88185dca @abdullin abdullin committed Mar 15, 2013
@@ -24,15 +24,10 @@ public void given_empty_cache()
public void given_reloaded_cache()
{
var cache = new LockingInMemoryCache();
+ cache.LoadHistory(CreateFrames("stream2"));
+ cache.Clear(() => { });
- cache.ConcurrentAppend("stream1", new byte[1], (version, storeVersion) => { });
-
- Assert.Throws<LockRecursionException>(() => cache.Clear(() =>
- cache.LoadHistory(CreateFrames("stream2"))
- ));
-
- Assert.AreEqual(1, cache.StoreVersion);
- Assert.AreEqual("stream1", cache.ReadAll(0, 1).First().Key);
+ Assert.AreEqual(0, cache.StoreVersion);
}
@@ -42,13 +37,10 @@ public void given_appended_cache()
var cache = new LockingInMemoryCache();
cache.ConcurrentAppend("stream1", new byte[1], (version, storeVersion) => { });
-
- Assert.Throws<LockRecursionException>(() => cache.Clear(() =>
- cache.ConcurrentAppend("stream2", new byte[1], (version, storeVersion) => { })
- ));
-
- Assert.AreEqual(1, cache.StoreVersion);
- Assert.AreEqual("stream1", cache.ReadAll(0, 1).First().Key);
+
+ cache.Clear(() => { });
+
+ Assert.AreEqual(0, cache.StoreVersion);
}
[Test]
@@ -58,8 +50,6 @@ public void given_filled_cache_and_failing_commit_function()
cache.ConcurrentAppend("stream1", new byte[1], (version, storeVersion) => { });
-
-
Assert.Throws<FileNotFoundException>(() => cache.Clear(() =>
{
throw new FileNotFoundException();
@@ -15,15 +15,23 @@ public void given_empty_cache_and_valid_commit_function()
long? commitStoreVersion = null;
long? commitStreamVersion = null;
- cache.ConcurrentAppend("stream", GetEventBytes(4), (version, storeVersion) =>
+ cache.ConcurrentAppend("stream", GetEventBytes(1), (version, storeVersion) =>
{
commitStoreVersion = storeVersion;
commitStreamVersion = version;
});
Assert.AreEqual(1, commitStoreVersion, "commitStoreVersion");
Assert.AreEqual(1, commitStreamVersion, "commitStreamVersion");
+
Assert.AreEqual(1, cache.StoreVersion);
+
+ var expected = new[]
+ {
+ CreateKey(1, 1, "stream"),
+ };
+ DataAssert.AreEqual(expected, cache.ReadStream("stream",0,100));
+ DataAssert.AreEqual(expected, cache.ReadAll(0, 100));
}
[Test]
@@ -43,18 +51,21 @@ public void given_reloaded_cache_and_commit_function_that_fails()
}
[Test]
- public void given_reloaded_cache_and_non_specified_version_expectation()
+ public void given_filled_cache_and_concurrent_append_with_non_specified_version_expectation()
{
var cache = new LockingInMemoryCache();
cache.LoadHistory(CreateFrames("stream", "otherStream"));
+
+ cache.ConcurrentAppend("stream", GetEventBytes(3), (version, storeVersion) => { }, -1);
- Assert.AreEqual(2, cache.StoreVersion);
+ Assert.AreEqual(3, cache.StoreVersion);
}
[Test]
- public void given_reloaded_and_appended_cache_with_valid_version_expectation()
+ public void given_filled_cache_and_concurrent_append_with_valid_version_expectation()
{
+ // GIVEN
var cache = new LockingInMemoryCache();
cache.LoadHistory(CreateFrames("stream", "otherStream"));
@@ -63,12 +74,15 @@ public void given_reloaded_and_appended_cache_with_valid_version_expectation()
long? commitStoreVersion = null;
long? commitStreamVersion = null;
+ // WHEN
cache.ConcurrentAppend("stream", GetEventBytes(5), (version, storeVersion) =>
{
commitStoreVersion = storeVersion;
commitStreamVersion = version;
}, 2);
+
+ // EXPECT
Assert.AreEqual(4, commitStoreVersion, "commitStoreVersion");
Assert.AreEqual(3, commitStreamVersion, "commitStreamVersion");
Assert.AreEqual(4, cache.StoreVersion);
@@ -78,7 +92,7 @@ public void given_reloaded_and_appended_cache_with_valid_version_expectation()
[Test]
- public void given_reloaded_cache_and_invalid_expected_version()
+ public void given_reloaded_cache_and_concurrent_append_with_invalid_expected_version()
{
var cache = new LockingInMemoryCache();
@@ -130,10 +144,5 @@ public void given_reloaded_cache_and_matching_stream_version()
Assert.AreEqual(3, commitStoreVersion, "commitStoreVersion");
Assert.AreEqual(2, commitStreamVersion, "commitStreamVersion");
}
-
-
-
-
-
}
}
@@ -11,11 +11,6 @@ namespace Lokad.Cqrs.TapeStorage
/// </summary>
public sealed class LockingInMemoryCache
{
-
-
-
-
-
readonly ReaderWriterLockSlim _thread = new ReaderWriterLockSlim();
ConcurrentDictionary<string, DataWithKey[]> _cacheByKey = new ConcurrentDictionary<string, DataWithKey[]>();
DataWithKey[] _cacheFull = new DataWithKey[0];
@@ -25,7 +20,7 @@ public void LoadHistory(IEnumerable<StorageFrameDecoded> sfd)
_thread.EnterWriteLock();
try
{
- if (_storeVersion != 0)
+ if (StoreVersion != 0)
throw new InvalidOperationException("Must clear cache before loading history");
_cacheFull = new DataWithKey[0];
@@ -57,7 +52,7 @@ public void LoadHistory(IEnumerable<StorageFrameDecoded> sfd)
_cacheFull = cacheFullBuilder.ToArray();
_cacheByKey = new ConcurrentDictionary<string, DataWithKey[]>(streamPointerBuilder.Select(p => new KeyValuePair<string, DataWithKey[]>(p.Key, p.Value.ToArray())));
- _storeVersion = newStoreVersion;
+ StoreVersion = newStoreVersion;
}
finally
{
@@ -76,9 +71,7 @@ static T[] ImmutableAdd<T>(T[] source, T item)
return copy;
}
- long _storeVersion;
-
- public long StoreVersion { get { return _storeVersion; } }
+ public long StoreVersion { get; private set; }
public delegate void OnCommit(long streamVersion, long storeVersion);
@@ -97,7 +90,7 @@ public void ConcurrentAppend(string streamName, byte[] data, OnCommit commit, lo
throw new AppendOnlyStoreConcurrencyException(expectedStreamVersion, actualStreamVersion, streamName);
}
long newStreamVersion = actualStreamVersion + 1;
- long newStoreVersion = _storeVersion + 1;
+ long newStoreVersion = StoreVersion + 1;
commit(newStreamVersion, newStoreVersion);
@@ -107,7 +100,7 @@ public void ConcurrentAppend(string streamName, byte[] data, OnCommit commit, lo
var dataWithKey = new DataWithKey(streamName, data, newStreamVersion, newStoreVersion);
_cacheFull = ImmutableAdd(_cacheFull, dataWithKey);
_cacheByKey.AddOrUpdate(streamName, s => new[] { dataWithKey }, (s, records) => ImmutableAdd(records, dataWithKey));
- _storeVersion = newStoreVersion;
+ StoreVersion = newStoreVersion;
}
finally
@@ -148,15 +141,15 @@ public IEnumerable<DataWithKey> ReadAll(long afterStoreVersion, int maxCount)
return _cacheFull.Where(key => key.StoreVersion > afterStoreVersion).Take(maxCount);
}
- public void Clear(Action onCommit)
+ public void Clear(Action executeWhenCommitting)
{
_thread.EnterWriteLock();
try
{
- onCommit();
+ executeWhenCommitting();
_cacheFull = new DataWithKey[0];
_cacheByKey.Clear();
- _storeVersion = 0;
+ StoreVersion = 0;
}
finally
{

0 comments on commit 75e16de

Please sign in to comment.