Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Even more tests

  • Loading branch information...
commit 42faec374735a3a0615d2d088500593f56ca68eb 1 parent 6b4d6a0
@abdullin abdullin authored
Showing with 366 additions and 114 deletions.
  1. +3 −3 Cqrs.Azure/AppendOnly/BlobAppendOnlyStore.cs
  2. +5 −3 Cqrs.Portable.Tests/Cqrs.Portable.Tests.csproj
  3. +1 −1  Cqrs.Portable.Tests/Performance_test_for_LockingInMemoryCache.cs
  4. +1 −1  Cqrs.Portable.Tests/TapeStorage/FileAppendOnlyStoreTest.cs
  5. +15 −1 ...ests/TapeStorage/LockingInMemoryCacheTests/{LockingInMemoryHelpers.cs → fixture_with_cache_helpers.cs}
  6. +2 −2 Cqrs.Portable.Tests/TapeStorage/LockingInMemoryCacheTests/when_checking_store_version.cs
  7. +24 −3 Cqrs.Portable.Tests/TapeStorage/LockingInMemoryCacheTests/when_clearing_cache.cs
  8. +23 −7 Cqrs.Portable.Tests/TapeStorage/LockingInMemoryCacheTests/when_doing_concurrent_append.cs
  9. +0 −72 Cqrs.Portable.Tests/TapeStorage/LockingInMemoryCacheTests/when_enumerating_entire_store.cs
  10. +40 −0 Cqrs.Portable.Tests/TapeStorage/LockingInMemoryCacheTests/when_reading_all_given_empty_cache.cs
  11. +78 −0 Cqrs.Portable.Tests/TapeStorage/LockingInMemoryCacheTests/when_reading_all_given_filled_cache.cs
  12. +0 −11 Cqrs.Portable.Tests/TapeStorage/LockingInMemoryCacheTests/when_reading_stream.cs
  13. +44 −0 Cqrs.Portable.Tests/TapeStorage/LockingInMemoryCacheTests/when_reading_stream_from_empty_cache.cs
  14. +73 −0 Cqrs.Portable.Tests/TapeStorage/LockingInMemoryCacheTests/when_reading_stream_from_loaded_cache.cs
  15. +30 −1 Cqrs.Portable.Tests/TapeStorage/LockingInMemoryCacheTests/when_reloading_all.cs
  16. +1 −1  Cqrs.Portable.Tests/TapeStorage/MemoryAppendOnlyStoreTest.cs
  17. +3 −3 Cqrs.Portable/TapeStorage/FileAppendOnlyStore.cs
  18. +21 −3 Cqrs.Portable/TapeStorage/LockingInMemoryCache.cs
  19. +2 −2 Cqrs.Portable/TapeStorage/MemoryTape.cs
View
6 Cqrs.Azure/AppendOnly/BlobAppendOnlyStore.cs
@@ -85,12 +85,12 @@ public void Append(string streamName, byte[] data, long expectedStreamVersion =
public IEnumerable<DataWithKey> ReadRecords(string streamName, long afterVersion, int maxCount)
{
- return _cache.ReadRecords(streamName, afterVersion, maxCount);
+ return _cache.ReadStream(streamName, afterVersion, maxCount);
}
public IEnumerable<DataWithKey> ReadRecords(long afterVersion, int maxCount)
{
- return _cache.ReadRecords(afterVersion, maxCount);
+ return _cache.ReadAll(afterVersion, maxCount);
}
@@ -195,7 +195,7 @@ static void SetLength(CloudPageBlob blob, long newLength, int timeout = 10000)
void LoadCaches()
{
- _cache.ReloadEverything(EnumerateHistory());
+ _cache.LoadHistory(EnumerateHistory());
}
void Persist(string key, byte[] buffer, long commit)
View
8 Cqrs.Portable.Tests/Cqrs.Portable.Tests.csproj
@@ -79,12 +79,14 @@
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="Partition\StatelessFileQueueReaderTest.cs" />
<Compile Include="StorageFramesEvilTest.cs" />
- <Compile Include="TapeStorage\LockingInMemoryCacheTests\LockingInMemoryHelpers.cs" />
+ <Compile Include="TapeStorage\LockingInMemoryCacheTests\fixture_with_cache_helpers.cs" />
<Compile Include="TapeStorage\LockingInMemoryCacheTests\when_checking_store_version.cs" />
<Compile Include="TapeStorage\LockingInMemoryCacheTests\when_clearing_cache.cs" />
<Compile Include="TapeStorage\LockingInMemoryCacheTests\when_doing_concurrent_append.cs" />
- <Compile Include="TapeStorage\LockingInMemoryCacheTests\when_enumerating_entire_store.cs" />
- <Compile Include="TapeStorage\LockingInMemoryCacheTests\when_reading_stream.cs" />
+ <Compile Include="TapeStorage\LockingInMemoryCacheTests\when_reading_all_given_empty_cache.cs" />
+ <Compile Include="TapeStorage\LockingInMemoryCacheTests\when_reading_all_given_filled_cache.cs" />
+ <Compile Include="TapeStorage\LockingInMemoryCacheTests\when_reading_stream_from_empty_cache.cs" />
+ <Compile Include="TapeStorage\LockingInMemoryCacheTests\when_reading_stream_from_loaded_cache.cs" />
<Compile Include="TapeStorage\LockingInMemoryCacheTests\when_reloading_all.cs" />
<Compile Include="TapeStorage\MemoryAppendOnlyStoreTest.cs" />
</ItemGroup>
View
2  Cqrs.Portable.Tests/Performance_test_for_LockingInMemoryCache.cs
@@ -19,7 +19,7 @@ public void Name()
var watch = Stopwatch.StartNew();
var count = 100000;
- cache.ReloadEverything(Generate(count, new byte[200], i => string.Format("stream_{0}", i % 100)));
+ cache.LoadHistory(Generate(count, new byte[200], i => string.Format("stream_{0}", i % 100)));
watch.Stop();
Console.WriteLine("Cached {0} events in {1:0.00} sec. {2:0.00} eps", count, (watch.Elapsed.TotalSeconds),
count / watch.Elapsed.TotalSeconds);
View
2  Cqrs.Portable.Tests/TapeStorage/FileAppendOnlyStoreTest.cs
@@ -199,7 +199,7 @@ public void read_store_all_records()
for (int i = 0; i < 2; i++)
_store.Append(stream, Encoding.UTF8.GetBytes("test message" + i));
- var records = _store.ReadRecords(-1, Int32.MaxValue).ToArray();
+ var records = _store.ReadRecords(0, Int32.MaxValue).ToArray();
Assert.AreEqual(currentVersion + 2, records.Length);
View
16 ...ckingInMemoryCacheTests/LockingInMemoryHelpers.cs → ...gInMemoryCacheTests/fixture_with_cache_helpers.cs
@@ -1,3 +1,4 @@
+using System;
using System.Collections.Generic;
using System.Text;
using Lokad.Cqrs;
@@ -7,7 +8,7 @@
namespace Cqrs.Portable.Tests.TapeStorage.LockingInMemoryCacheTests
{
- public abstract class LockingInMemoryHelpers
+ public abstract class fixture_with_cache_helpers
{
protected IEnumerable<StorageFrameDecoded> CreateFrames(params string[] streamNames)
{
@@ -21,6 +22,19 @@ protected IEnumerable<StorageFrameDecoded> CreateFrames(params string[] streamNa
}
}
+ protected void EatException(Action action)
+ {
+ try
+ {
+ action();
+ }
+ catch (Exception)
+ {
+
+
+ }
+ }
+
public static class DataAssert
{
public static void AreEqual(IEnumerable<DataWithKey> expected, IEnumerable<DataWithKey> actual)
View
4 Cqrs.Portable.Tests/TapeStorage/LockingInMemoryCacheTests/when_checking_store_version.cs
@@ -29,7 +29,7 @@ public void given_cache_with_one_appended_record()
public void given_empty_reload()
{
var cache = new LockingInMemoryCache();
- cache.ReloadEverything(Enumerable.Empty<StorageFrameDecoded>());
+ cache.LoadHistory(Enumerable.Empty<StorageFrameDecoded>());
Assert.AreEqual(0, cache.StoreVersion);
}
@@ -37,7 +37,7 @@ public void given_empty_reload()
public void given_non_empty_reload()
{
var cache = new LockingInMemoryCache();
- cache.ReloadEverything(new StorageFrameDecoded[]
+ cache.LoadHistory(new StorageFrameDecoded[]
{
new StorageFrameDecoded(new byte[1], "test",0),
});
View
27 Cqrs.Portable.Tests/TapeStorage/LockingInMemoryCacheTests/when_clearing_cache.cs
@@ -1,30 +1,51 @@
+using System.IO;
using System.Text;
using Lokad.Cqrs;
+using Lokad.Cqrs.TapeStorage;
using NUnit.Framework;
namespace Cqrs.Portable.Tests.TapeStorage.LockingInMemoryCacheTests
{
[TestFixture]
- public sealed class when_clearing_cache
+ public sealed class when_clearing_cache : fixture_with_cache_helpers
{
[Test]
public void given_empty_cache()
{
-
+ var cache = new LockingInMemoryCache();
+ cache.Clear(() => { });
+ Assert.AreEqual(0, cache.StoreVersion);
}
[Test]
public void given_reloaded_cache()
{
-
+ // TODO: fill this
}
[Test]
public void given_appended_cache()
{
+ // TODO: fill this
+ }
+
+ [Test]
+ public void given_filled_cache_and_failing_commit_function()
+ {
+ var cache = new LockingInMemoryCache();
+
+ cache.ConcurrentAppend("stream1", new byte[1], (version, storeVersion) => { });
+
+
+ Assert.Throws<FileNotFoundException>(() => cache.Clear(() =>
+ {
+ throw new FileNotFoundException();
+ }));
+
+ Assert.AreEqual(1, cache.StoreVersion);
}
}
}
View
30 Cqrs.Portable.Tests/TapeStorage/LockingInMemoryCacheTests/when_doing_concurrent_append.cs
@@ -5,7 +5,7 @@
namespace Cqrs.Portable.Tests.TapeStorage.LockingInMemoryCacheTests
{
[TestFixture]
- public sealed class when_doing_concurrent_append : LockingInMemoryHelpers
+ public sealed class when_doing_concurrent_append : fixture_with_cache_helpers
{
[Test]
public void given_empty_cache_and_valid_commit_function()
@@ -18,7 +18,7 @@ public void given_reloaded_cache_and_commit_function_that_fails()
{
var cache = new LockingInMemoryCache();
- cache.ReloadEverything(CreateFrames("stream", "otherStream"));
+ cache.LoadHistory(CreateFrames("stream", "otherStream"));
Assert.Throws<FileNotFoundException>(
() => cache.ConcurrentAppend("stream", new byte[1], (version, storeVersion) =>
@@ -36,20 +36,36 @@ public void given_reloaded_cache_and_non_specified_version_expectation()
}
[Test]
- public void given_reloaded_and_appended_cache_with_specified_version_expectation()
+ public void given_reloaded_and_appended_cache_with_valid_version_expectation()
{
-
+ var cache = new LockingInMemoryCache();
+
+ cache.LoadHistory(CreateFrames("stream", "otherStream"));
+ cache.ConcurrentAppend("stream", GetEventBytes(4), (version, storeVersion) => { });
+
+ long? commitStoreVersion = null;
+ long? commitStreamVersion = null;
+
+ cache.ConcurrentAppend("stream", GetEventBytes(5), (version, storeVersion) =>
+ {
+ commitStoreVersion = storeVersion;
+ commitStreamVersion = version;
+ },2);
+
+ Assert.AreEqual(4, commitStoreVersion, "commitStoreVersion");
+ Assert.AreEqual(3, commitStreamVersion, "commitStreamVersion");
+ Assert.AreEqual(4, cache.StoreVersion);
}
[Test]
- public void given_reloaded_cache_and_non_matching_expected_version()
+ public void given_reloaded_cache_and_invalid_expected_version()
{
var cache = new LockingInMemoryCache();
- cache.ReloadEverything(CreateFrames("stream", "otherStream"));
+ cache.LoadHistory(CreateFrames("stream", "otherStream"));
bool commitWasCalled = false;
@@ -83,7 +99,7 @@ public void given_reloaded_cache_and_matching_stream_version()
{
var cache = new LockingInMemoryCache();
- cache.ReloadEverything(CreateFrames("stream", "otherStream"));
+ cache.LoadHistory(CreateFrames("stream", "otherStream"));
long? commitStoreVersion = null;
long? commitStreamVersion = null;
View
72 Cqrs.Portable.Tests/TapeStorage/LockingInMemoryCacheTests/when_enumerating_entire_store.cs
@@ -1,72 +0,0 @@
-using System.Linq;
-using Lokad.Cqrs.TapeStorage;
-using NUnit.Framework;
-
-namespace Cqrs.Portable.Tests.TapeStorage.LockingInMemoryCacheTests
-{
- [TestFixture]
- public sealed class when_enumerating_entire_store : LockingInMemoryHelpers
- {
- [Test]
- public void given_empty_store_with_full_range()
- {
- var cache = new LockingInMemoryCache();
-
- CollectionAssert.IsEmpty(cache.ReadRecords(0, int.MaxValue));
- }
-
- [Test]
- public void given_reloaded_store_and_non_matching_range()
- {
- var cache = new LockingInMemoryCache();
- cache.ReloadEverything(CreateFrames("stream1", "stream2"));
- CollectionAssert.IsEmpty(cache.ReadRecords(2, 10));
- }
-
- [Test]
- public void given_reloaded_store_and_intersecting_range()
- {
- var cache = new LockingInMemoryCache();
- cache.ReloadEverything(CreateFrames("stream1", "stream2","stream3"));
- var dataWithKeys = cache.ReadRecords(1, 1).ToArray();
- DataAssert.AreEqual(new[] { CreateKey(2, 1, "stream2") }, dataWithKeys);
- }
-
-
- [Test]
- public void given_reloaded_store_and_matching_range()
- {
- var cache = new LockingInMemoryCache();
- cache.ReloadEverything(CreateFrames("stream1", "stream2", "stream1"));
- var dataWithKeys = cache.ReadRecords(0, 3).ToArray();
- DataAssert.AreEqual(new[]
- {
- CreateKey(1, 1, "stream1"),
- CreateKey(2, 1, "stream2"),
- CreateKey(3, 2, "stream1")
- }, dataWithKeys);
- }
-
-
- [Test]
- public void given_reloaded_and_appended_store_and_matching_range()
- {
- var cache = new LockingInMemoryCache();
-
-
- cache.ReloadEverything(CreateFrames("stream1", "stream2"));
- var frame = GetEventBytes(3);
- cache.ConcurrentAppend("stream2", frame, (version, storeVersion) => { });
-
- var dataWithKeys = cache.ReadRecords(0, 3);
- DataAssert.AreEqual(new[]
- {
- CreateKey(1,1,"stream1"),
- CreateKey(2,1,"stream2"),
- CreateKey(3,2,"stream2"),
- }, dataWithKeys);
- }
-
-
- }
-}
View
40 Cqrs.Portable.Tests/TapeStorage/LockingInMemoryCacheTests/when_reading_all_given_empty_cache.cs
@@ -0,0 +1,40 @@
+using System;
+using System.Linq;
+using Lokad.Cqrs.TapeStorage;
+using NUnit.Framework;
+
+namespace Cqrs.Portable.Tests.TapeStorage.LockingInMemoryCacheTests
+{
+
+ [TestFixture]
+ public sealed class when_reading_all_given_empty_cache : fixture_with_cache_helpers
+ {
+
+ LockingInMemoryCache Cache;
+
+ [SetUp]
+ public void Setup()
+ {
+ Cache = new LockingInMemoryCache();
+ }
+
+ [Test]
+ public void given_full_range()
+ {
+ CollectionAssert.IsEmpty(Cache.ReadAll(0, int.MaxValue));
+ }
+
+ [Test]
+ public void given_negative_store_version()
+ {
+ Assert.Throws<ArgumentOutOfRangeException>(() => Cache.ReadAll(-1, int.MaxValue));
+ }
+
+ [Test]
+ public void given_zero_count()
+ {
+ Assert.Throws<ArgumentOutOfRangeException>(() => Cache.ReadAll(0, 0));
+ }
+
+ }
+}
View
78 Cqrs.Portable.Tests/TapeStorage/LockingInMemoryCacheTests/when_reading_all_given_filled_cache.cs
@@ -0,0 +1,78 @@
+using System;
+using Lokad.Cqrs.TapeStorage;
+using NUnit.Framework;
+
+namespace Cqrs.Portable.Tests.TapeStorage.LockingInMemoryCacheTests
+{
+ [TestFixture]
+ public sealed class when_reading_all_given_filled_cache : fixture_with_cache_helpers
+ {
+ LockingInMemoryCache Cache;
+
+ [SetUp]
+ public void Setup()
+ {
+ Cache = new LockingInMemoryCache();
+
+ Cache.LoadHistory(CreateFrames("stream1", "stream2"));
+ Cache.ConcurrentAppend("stream1", GetEventBytes(3), (version, storeVersion) => { });
+
+ }
+
+ [Test]
+ public void given_non_matching_range()
+ {
+
+ CollectionAssert.IsEmpty(Cache.ReadAll(3, 10));
+ }
+
+ [Test]
+ public void given_intersecting_range()
+ {
+
+ var dataWithKeys = Cache.ReadAll(1, 1);
+ DataAssert.AreEqual(new[] { CreateKey(2, 1, "stream2") }, dataWithKeys);
+ }
+
+
+ [Test]
+ public void given_matching_range()
+ {
+ var dataWithKeys = Cache.ReadAll(0, 3);
+ DataAssert.AreEqual(new[]
+ {
+ CreateKey(1, 1, "stream1"),
+ CreateKey(2, 1, "stream2"),
+ CreateKey(3, 2, "stream1")
+ }, dataWithKeys);
+ }
+
+ [Test]
+ public void given_full_range()
+ {
+ var dataWithKeys = Cache.ReadAll(0, int.MaxValue);
+ DataAssert.AreEqual(new[]
+ {
+ CreateKey(1, 1, "stream1"),
+ CreateKey(2, 1, "stream2"),
+ CreateKey(3, 2, "stream1")
+ }, dataWithKeys);
+ }
+
+ [Test]
+ public void given_negative_store_version()
+ {
+ Assert.Throws<ArgumentOutOfRangeException>(() => Cache.ReadAll(-1, int.MaxValue));
+ }
+
+ [Test]
+ public void given_zero_count()
+ {
+ Assert.Throws<ArgumentOutOfRangeException>(() => Cache.ReadAll(0, 0));
+ }
+
+
+
+
+ }
+}
View
11 Cqrs.Portable.Tests/TapeStorage/LockingInMemoryCacheTests/when_reading_stream.cs
@@ -1,11 +0,0 @@
-using NUnit.Framework;
-
-namespace Cqrs.Portable.Tests.TapeStorage.LockingInMemoryCacheTests
-{
- [TestFixture]
- public sealed class when_reading_stream
- {
-
-
- }
-}
View
44 Cqrs.Portable.Tests/TapeStorage/LockingInMemoryCacheTests/when_reading_stream_from_empty_cache.cs
@@ -0,0 +1,44 @@
+using System;
+using System.Collections.Generic;
+using Lokad.Cqrs.TapeStorage;
+using NUnit.Framework;
+
+namespace Cqrs.Portable.Tests.TapeStorage.LockingInMemoryCacheTests
+{
+ [TestFixture]
+ public sealed class when_reading_stream_from_empty_cache : fixture_with_cache_helpers
+ {
+
+ LockingInMemoryCache Cache;
+
+ [SetUp]
+ public void Setup()
+ {
+ Cache = new LockingInMemoryCache();
+ }
+
+ [Test]
+ public void given_any_stream()
+ {
+ CollectionAssert.IsEmpty(Cache.ReadStream("stream1", 0, int.MaxValue));
+ }
+
+ [Test]
+ public void given_null_stream_name()
+ {
+ Assert.Throws<ArgumentNullException>(() => Cache.ReadStream(null, 0, int.MaxValue));
+ }
+
+ [Test]
+ public void given_negative_stream_version()
+ {
+ Assert.Throws<ArgumentOutOfRangeException>(() => Cache.ReadStream("s", -1, int.MaxValue));
+ }
+
+ [Test]
+ public void given_zero_count()
+ {
+ Assert.Throws<ArgumentOutOfRangeException>(() => Cache.ReadStream("s", 0, 0));
+ }
+ }
+}
View
73 Cqrs.Portable.Tests/TapeStorage/LockingInMemoryCacheTests/when_reading_stream_from_loaded_cache.cs
@@ -0,0 +1,73 @@
+using System;
+using Lokad.Cqrs.TapeStorage;
+using NUnit.Framework;
+
+namespace Cqrs.Portable.Tests.TapeStorage.LockingInMemoryCacheTests
+{
+ [TestFixture]
+ public sealed class when_reading_stream_from_loaded_cache :fixture_with_cache_helpers
+ {
+
+ LockingInMemoryCache Cache;
+
+ [SetUp]
+ public void Setup()
+ {
+ Cache = new LockingInMemoryCache();
+
+ Cache.LoadHistory(CreateFrames("stream1", "stream2"));
+ Cache.ConcurrentAppend("stream1", GetEventBytes(3), (version, storeVersion) => { });
+
+ }
+
+ [Test]
+ public void given_full_range()
+ {
+ DataAssert.AreEqual(new []
+ {
+ CreateKey(1,1,"stream1"),
+ CreateKey(3,2, "stream1")
+ }, Cache.ReadStream("stream1", 0, int.MaxValue));
+ }
+
+ [Test]
+ public void given_tail_range()
+ {
+ DataAssert.AreEqual(new DataWithKey[]
+ {
+ CreateKey(3,2,"stream1")
+ }, Cache.ReadStream("stream1",1, 1));
+ }
+
+ [Test]
+ public void given_nonmatching_range()
+ {
+ CollectionAssert.IsEmpty(Cache.ReadStream("stream1", 2, int.MaxValue));
+ }
+
+ [Test]
+ public void given_non_matching_stream()
+ {
+ CollectionAssert.IsEmpty(Cache.ReadStream("streamZ", 0, int.MaxValue));
+
+ }
+
+ [Test]
+ public void given_null_stream_name()
+ {
+ Assert.Throws<ArgumentNullException>(() => Cache.ReadStream(null, 0, int.MaxValue));
+ }
+
+ [Test]
+ public void given_negative_stream_version()
+ {
+ Assert.Throws<ArgumentOutOfRangeException>(() => Cache.ReadStream("s", -1, int.MaxValue));
+ }
+
+ [Test]
+ public void given_zero_count()
+ {
+ Assert.Throws<ArgumentOutOfRangeException>(() => Cache.ReadStream("s", 0, 0));
+ }
+ }
+}
View
31 Cqrs.Portable.Tests/TapeStorage/LockingInMemoryCacheTests/when_reloading_all.cs
@@ -1,10 +1,39 @@
+using System;
+using Lokad.Cqrs.TapeStorage;
using NUnit.Framework;
namespace Cqrs.Portable.Tests.TapeStorage.LockingInMemoryCacheTests
{
[TestFixture]
- public sealed class when_reloading_all
+ public sealed class when_reloading_all : fixture_with_cache_helpers
{
+ [Test]
+ public void given_reloaded_cache()
+ {
+ var cache = new LockingInMemoryCache();
+ cache.LoadHistory(CreateFrames("s1","s2"));
+
+ Assert.Throws<InvalidOperationException>(() => cache.LoadHistory(CreateFrames("s1")));
+
+ }
+
+ [Test]
+ public void given_empty_cache()
+ {
+ var cache = new LockingInMemoryCache();
+ cache.Clear(() => { });
+ }
+
+ [Test]
+ public void given_cleared_cache()
+ {
+ var cache = new LockingInMemoryCache();
+ cache.LoadHistory(CreateFrames("s1", "s2"));
+ cache.Clear(() => { });
+ cache.LoadHistory(CreateFrames("s1"));
+
+ Assert.AreEqual(1, cache.StoreVersion, "storeVersion");
+ }
}
View
2  Cqrs.Portable.Tests/TapeStorage/MemoryAppendOnlyStoreTest.cs
@@ -109,7 +109,7 @@ public void read_store_all_records()
for (int i = 0; i < 2; i++)
_store.Append(stream, Encoding.UTF8.GetBytes("test message" + i));
- var records = _store.ReadRecords(-1, Int32.MaxValue).ToArray();
+ var records = _store.ReadRecords(0, Int32.MaxValue).ToArray();
Assert.AreEqual(currentVersion + 2, records.Length);
View
6 Cqrs.Portable/TapeStorage/FileAppendOnlyStore.cs
@@ -44,7 +44,7 @@ public void Initialize()
public void LoadCaches()
{
- _cache.ReloadEverything(EnumerateHistory());
+ _cache.LoadHistory(EnumerateHistory());
}
IEnumerable<StorageFrameDecoded> EnumerateHistory()
@@ -133,12 +133,12 @@ void EnsureWriterExists(long storeVersion)
public IEnumerable<DataWithKey> ReadRecords(string streamName, long afterVersion, int maxCount)
{
- return _cache.ReadRecords(streamName, afterVersion, maxCount);
+ return _cache.ReadStream(streamName, afterVersion, maxCount);
}
public IEnumerable<DataWithKey> ReadRecords(long afterVersion, int maxCount)
{
- return _cache.ReadRecords(afterVersion, maxCount);
+ return _cache.ReadAll(afterVersion, maxCount);
}
View
24 Cqrs.Portable/TapeStorage/LockingInMemoryCache.cs
@@ -11,15 +11,23 @@ namespace Lokad.Cqrs.TapeStorage
/// </summary>
public sealed class LockingInMemoryCache
{
+
+
+
+
+
readonly ReaderWriterLockSlim _thread = new ReaderWriterLockSlim();
ConcurrentDictionary<string, DataWithKey[]> _cacheByKey = new ConcurrentDictionary<string, DataWithKey[]>();
DataWithKey[] _cacheFull = new DataWithKey[0];
- public void ReloadEverything(IEnumerable<StorageFrameDecoded> sfd)
+ public void LoadHistory(IEnumerable<StorageFrameDecoded> sfd)
{
_thread.EnterWriteLock();
try
{
+ if (_storeVersion != 0)
+ throw new InvalidOperationException("Must clear cache before loading history");
+
_cacheFull = new DataWithKey[0];
// [abdullin]: known performance problem identified by Nicolas Mehlei
@@ -100,6 +108,7 @@ public void ConcurrentAppend(string streamName, byte[] data, OnCommit commit, lo
_cacheFull = ImmutableAdd(_cacheFull, dataWithKey);
_cacheByKey.AddOrUpdate(streamName, s => new[] { dataWithKey }, (s, records) => ImmutableAdd(records, dataWithKey));
_storeVersion = newStoreVersion;
+
}
finally
{
@@ -108,8 +117,10 @@ public void ConcurrentAppend(string streamName, byte[] data, OnCommit commit, lo
}
- public IEnumerable<DataWithKey> ReadRecords(string streamName, long afterStreamVersion, int maxCount)
+ public IEnumerable<DataWithKey> ReadStream(string streamName, long afterStreamVersion, int maxCount)
{
+ if (null == streamName)
+ throw new ArgumentNullException("streamName");
if (afterStreamVersion < 0)
throw new ArgumentOutOfRangeException("afterStreamVersion", "Must be zero or greater.");
@@ -124,8 +135,15 @@ public IEnumerable<DataWithKey> ReadRecords(string streamName, long afterStreamV
}
- public IEnumerable<DataWithKey> ReadRecords(long afterStoreVersion, int maxCount)
+ public IEnumerable<DataWithKey> ReadAll(long afterStoreVersion, int maxCount)
{
+ if (afterStoreVersion < 0)
+ throw new ArgumentOutOfRangeException("afterStoreVersion", "Must be zero or greater.");
+
+ if (maxCount <= 0)
+ throw new ArgumentOutOfRangeException("maxCount", "Must be more than zero.");
+
+
// collection is immutable so we don't care about locks
return _cacheFull.Where(key => key.StoreVersion > afterStoreVersion).Take(maxCount);
}
View
4 Cqrs.Portable/TapeStorage/MemoryTape.cs
@@ -22,12 +22,12 @@ public void Append(string streamName, byte[] data, long expectedStreamVersion =
public IEnumerable<DataWithKey> ReadRecords(string streamName, long startingFrom, int maxCount)
{
- return _cache.ReadRecords(streamName, startingFrom, maxCount);
+ return _cache.ReadStream(streamName, startingFrom, maxCount);
}
public IEnumerable<DataWithKey> ReadRecords(long startingFrom, int maxCount)
{
- return _cache.ReadRecords(startingFrom, maxCount);
+ return _cache.ReadAll(startingFrom, maxCount);
}
public void Close()
Please sign in to comment.
Something went wrong with that request. Please try again.