Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

added partition support for raven persistence to be able to store mul…

…tiple stream partitions in a single database tenant
  • Loading branch information...
commit 8c9cbaf32107550e9006452a5c82e77619d76998 1 parent 1ef1d0b
@kblooie kblooie authored
Showing with 683 additions and 387 deletions.
  1. +1 −0  src/proj/EventStore.Persistence.RavenPersistence/EventStore.Persistence.RavenPersistence.csproj
  2. +34 −14 src/proj/EventStore.Persistence.RavenPersistence/ExtensionMethods.cs
  3. +37 −0 src/proj/EventStore.Persistence.RavenPersistence/Indexes/EventStoreDocumentsByEntityName.cs
  4. +1 −1  src/proj/EventStore.Persistence.RavenPersistence/Indexes/RavenCommitByDate.cs
  5. +1 −1  src/proj/EventStore.Persistence.RavenPersistence/Indexes/RavenCommitByRevisionRange.cs
  6. +1 −1  src/proj/EventStore.Persistence.RavenPersistence/Indexes/RavenCommitsByDispatched.cs
  7. +1 −1  src/proj/EventStore.Persistence.RavenPersistence/Indexes/RavenSnapshotByStreamIdAndRevision.cs
  8. +1 −1  src/proj/EventStore.Persistence.RavenPersistence/Indexes/RavenStreamHeadBySnapshotAge.cs
  9. +2 −0  src/proj/EventStore.Persistence.RavenPersistence/RavenCommit.cs
  10. +3 −2 src/proj/EventStore.Persistence.RavenPersistence/RavenConfiguration.cs
  11. +378 −355 src/proj/EventStore.Persistence.RavenPersistence/RavenPersistenceEngine.cs
  12. +2 −0  src/proj/EventStore.Persistence.RavenPersistence/RavenSnapshot.cs
  13. +1 −0  src/proj/EventStore.Persistence.RavenPersistence/RavenStreamHead.cs
  14. +21 −11 src/tests/EventStore.Persistence.AcceptanceTests/Engines/AcceptanceTestRavenPersistenceFactory.cs
  15. +1 −0  src/tests/EventStore.Persistence.AcceptanceTests/EventStore.Persistence.AcceptanceTests.csproj
  16. +198 −0 src/tests/EventStore.Persistence.AcceptanceTests/RavenPersistence/RavenPersistencePartitionTests.cs
View
1  src/proj/EventStore.Persistence.RavenPersistence/EventStore.Persistence.RavenPersistence.csproj
@@ -45,6 +45,7 @@
<Link>Properties\VersionAssemblyInfo.cs</Link>
</Compile>
<Compile Include="ExtensionMethods.cs" />
+ <Compile Include="Indexes\EventStoreDocumentsByEntityName.cs" />
<Compile Include="Indexes\RavenCommitsByDispatched.cs" />
<Compile Include="Indexes\RavenCommitByRevisionRange.cs" />
<Compile Include="Indexes\RavenCommitByDate.cs" />
View
48 src/proj/EventStore.Persistence.RavenPersistence/ExtensionMethods.cs
@@ -9,16 +9,22 @@ namespace EventStore.Persistence.RavenPersistence
public static class ExtensionMethods
{
- public static string ToRavenCommitId(this Commit commit)
+ public static string ToRavenCommitId(this Commit commit, string partition)
{
- return string.Format(CultureInfo.InvariantCulture, "{0}/{1}", commit.StreamId, commit.CommitSequence);
+ var id = string.Format(CultureInfo.InvariantCulture, "{0}/{1}", commit.StreamId, commit.CommitSequence);
+
+ if (!string.IsNullOrEmpty(partition))
+ id = string.Format("{0}/{1}", partition, id);
+
+ return id;
}
- public static RavenCommit ToRavenCommit(this Commit commit, IDocumentSerializer serializer)
+ public static RavenCommit ToRavenCommit(this Commit commit, string partition, IDocumentSerializer serializer)
{
return new RavenCommit
{
- Id = ToRavenCommitId(commit),
+ Id = ToRavenCommitId(commit, partition),
+ Partition = partition,
StreamId = commit.StreamId,
CommitSequence = commit.CommitSequence,
StartingStreamRevision = commit.StreamRevision - (commit.Events.Count - 1),
@@ -42,17 +48,24 @@ public static Commit ToCommit(this RavenCommit commit, IDocumentSerializer seria
serializer.Deserialize<List<EventMessage>>(commit.Payload));
}
- public static RavenSnapshot ToRavenSnapshot(this Snapshot snapshot, IDocumentSerializer serializer)
+ public static string ToRavenSnapshotId(Snapshot snapshot, string partition)
+ {
+ return string.Format("Snapshots/{0}/{1}", snapshot.StreamId, snapshot.StreamRevision);
+ }
+
+ public static RavenSnapshot ToRavenSnapshot(this Snapshot snapshot, string partition, IDocumentSerializer serializer)
{
- return new RavenSnapshot
+ return new RavenSnapshot
{
+ Id = ToRavenSnapshotId(snapshot, partition),
+ Partition = partition,
StreamId = snapshot.StreamId,
StreamRevision = snapshot.StreamRevision,
Payload = serializer.Serialize(snapshot.Payload)
};
}
-
- public static Snapshot ToSnapshot(this RavenSnapshot snapshot, IDocumentSerializer serializer)
+
+ public static Snapshot ToSnapshot(this RavenSnapshot snapshot, IDocumentSerializer serializer)
{
if (snapshot == null)
return null;
@@ -63,27 +76,34 @@ public static Snapshot ToSnapshot(this RavenSnapshot snapshot, IDocumentSerializ
serializer.Deserialize<object>(snapshot.Payload));
}
- public static string ToRavenStreamId(this Guid streamId)
+ public static string ToRavenStreamId(this Guid streamId, string partition)
{
- return string.Format("StreamHeads/{0}", streamId);
+ var id = string.Format("StreamHeads/{0}", streamId);
+
+ if (!string.IsNullOrEmpty(partition))
+ id = string.Format("{0}/{1}", partition, id);
+
+ return id;
}
- public static RavenStreamHead ToRavenStreamHead(this Commit commit)
+ public static RavenStreamHead ToRavenStreamHead(this Commit commit, string partition)
{
return new RavenStreamHead
{
- Id = commit.StreamId.ToRavenStreamId(),
+ Id = commit.StreamId.ToRavenStreamId(partition),
+ Partition = partition,
StreamId = commit.StreamId,
HeadRevision = commit.StreamRevision,
SnapshotRevision = 0
};
}
- public static RavenStreamHead ToRavenStreamHead(this Snapshot snapshot)
+ public static RavenStreamHead ToRavenStreamHead(this Snapshot snapshot, string partition)
{
return new RavenStreamHead
{
- Id = snapshot.StreamId.ToRavenStreamId(),
+ Id = snapshot.StreamId.ToRavenStreamId(partition),
+ Partition = partition,
StreamId = snapshot.StreamId,
HeadRevision = snapshot.StreamRevision,
SnapshotRevision = snapshot.StreamRevision
View
37 src/proj/EventStore.Persistence.RavenPersistence/Indexes/EventStoreDocumentsByEntityName.cs
@@ -0,0 +1,37 @@
+using System;
+using System.Linq;
+using Raven.Abstractions.Indexing;
+using Raven.Client.Indexes;
+
+namespace EventStore.Persistence.RavenPersistence.Indexes
+{
+ public class EventStoreDocumentsByEntityName : AbstractIndexCreationTask
+ {
+ public override string IndexName
+ {
+ get { return "EventStoreDocumentsByEntityName"; }
+ }
+
+ public override IndexDefinition CreateIndexDefinition()
+ {
+ return new IndexDefinition
+ {
+ Map = @"from doc in docs
+ let Tag = doc[""@metadata""][""Raven-Entity-Name""]
+ where Tag != null
+ select new { Tag, LastModified = (DateTime)doc[""@metadata""][""Last-Modified""], Partition = doc.Partition };",
+ Indexes =
+ {
+ {"Tag", FieldIndexing.NotAnalyzed},
+ {"Partition", FieldIndexing.NotAnalyzed},
+ },
+ Stores =
+ {
+ {"Tag", FieldStorage.No},
+ {"LastModified", FieldStorage.No},
+ {"Partition", FieldStorage.No}
+ }
+ };
+ }
+ }
+}
View
2  src/proj/EventStore.Persistence.RavenPersistence/Indexes/RavenCommitByDate.cs
@@ -7,7 +7,7 @@ public class RavenCommitByDate : AbstractIndexCreationTask<RavenCommit>
{
public RavenCommitByDate()
{
- this.Map = commits => from c in commits select new { c.CommitStamp };
+ this.Map = commits => from c in commits select new { c.CommitStamp, c.Partition };
}
}
}
View
2  src/proj/EventStore.Persistence.RavenPersistence/Indexes/RavenCommitByRevisionRange.cs
@@ -8,7 +8,7 @@ public class RavenCommitByRevisionRange : AbstractIndexCreationTask<RavenCommit>
public RavenCommitByRevisionRange()
{
this.Map = commits => from c in commits
- select new { c.StreamId, c.StartingStreamRevision, c.StreamRevision };
+ select new { c.StreamId, c.StartingStreamRevision, c.StreamRevision, c.Partition };
}
}
}
View
2  src/proj/EventStore.Persistence.RavenPersistence/Indexes/RavenCommitsByDispatched.cs
@@ -7,7 +7,7 @@ public class RavenCommitsByDispatched : AbstractIndexCreationTask<RavenCommit>
{
public RavenCommitsByDispatched()
{
- this.Map = commits => from c in commits select new { c.Dispatched };
+ this.Map = commits => from c in commits select new { c.Dispatched, c.Partition };
}
}
}
View
2  src/proj/EventStore.Persistence.RavenPersistence/Indexes/RavenSnapshotByStreamIdAndRevision.cs
@@ -7,7 +7,7 @@ public class RavenSnapshotByStreamIdAndRevision : AbstractIndexCreationTask<Rave
{
public RavenSnapshotByStreamIdAndRevision()
{
- Map = snapshots => from s in snapshots select new { s.StreamId, s.StreamRevision };
+ Map = snapshots => from s in snapshots select new { s.StreamId, s.StreamRevision, s.Partition };
}
}
}
View
2  src/proj/EventStore.Persistence.RavenPersistence/Indexes/RavenStreamHeadBySnapshotAge.cs
@@ -7,7 +7,7 @@ public class RavenStreamHeadBySnapshotAge : AbstractIndexCreationTask<RavenStrea
{
public RavenStreamHeadBySnapshotAge()
{
- Map = snapshots => from s in snapshots select new { SnapshotAge = s.HeadRevision - s.SnapshotRevision };
+ Map = snapshots => from s in snapshots select new { SnapshotAge = s.HeadRevision - s.SnapshotRevision, s.Partition };
}
}
}
View
2  src/proj/EventStore.Persistence.RavenPersistence/RavenCommit.cs
@@ -18,6 +18,8 @@ public class RavenCommit
public Guid CommitId { get; set; }
public DateTime CommitStamp { get; set; }
+ public string Partition { get; set; }
+
[SuppressMessage("Microsoft.Usage", "CA2227:CollectionPropertiesShouldBeReadOnly",
Justification = "This is a simple DTO and is only used internally by Raven.")]
public Dictionary<string, object> Headers { get; set; }
View
5 src/proj/EventStore.Persistence.RavenPersistence/RavenConfiguration.cs
@@ -8,7 +8,8 @@ public class RavenConfiguration
{
public string ConnectionName { get; set; }
public Uri Url { get; set; }
- public string DefaultDatabase { get; set; }
+ public string DefaultDatabase { get; set; }
+ public string Partition { get; set; }
public IDocumentSerializer Serializer { get; set; }
public TransactionScopeOption ScopeOption { get; set; }
@@ -24,6 +25,6 @@ public int PageSize
return this.RequestedPageSize;
}
- }
+ }
}
}
View
733 src/proj/EventStore.Persistence.RavenPersistence/RavenPersistenceEngine.cs
@@ -1,243 +1,252 @@
namespace EventStore.Persistence.RavenPersistence
{
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Linq.Expressions;
- using System.Net;
- using System.Threading;
- using System.Transactions;
- using Indexes;
- using Logging;
- using Raven.Abstractions.Commands;
- using Raven.Abstractions.Data;
- using Raven.Client;
- using Raven.Client.Connection;
- using Raven.Client.Exceptions;
- using Raven.Client.Indexes;
- using Raven.Json.Linq;
- using Serialization;
-
- public class RavenPersistenceEngine : IPersistStreams
- {
- private const int MinPageSize = 10;
- private static readonly ILog Logger = LogFactory.BuildLogger(typeof(RavenPersistenceEngine));
- private readonly IDocumentStore store;
- private readonly IDocumentSerializer serializer;
- private readonly TransactionScopeOption scopeOption;
- private readonly bool consistentQueries;
- private readonly int pageSize;
- private int initialized;
-
- public RavenPersistenceEngine(IDocumentStore store, RavenConfiguration config)
- {
- if (store == null)
- throw new ArgumentNullException("store");
-
- if (config == null)
- throw new ArgumentNullException("config");
-
- if (config.Serializer == null)
- throw new ArgumentException(Messages.SerializerCannotBeNull, "config");
-
- if (config.PageSize < MinPageSize)
- throw new ArgumentException(Messages.PagingSizeTooSmall, "config");
-
- this.store = store;
- this.serializer = config.Serializer;
- this.scopeOption = config.ScopeOption;
- this.consistentQueries = config.ConsistentQueries;
- this.pageSize = config.PageSize;
- }
-
- public void Dispose()
- {
- this.Dispose(true);
- GC.SuppressFinalize(this);
- }
- protected virtual void Dispose(bool disposing)
- {
- if (!disposing)
- return;
-
- Logger.Debug(Messages.ShuttingDownPersistence);
- this.store.Dispose();
- }
-
- public virtual void Initialize()
- {
- if (Interlocked.Increment(ref this.initialized) > 1)
- return;
-
- Logger.Debug(Messages.InitializingStorage);
-
- this.TryRaven(() =>
- {
- using (var scope = this.OpenCommandScope())
- {
- new RavenCommitByDate().Execute(this.store);
- new RavenCommitByRevisionRange().Execute(this.store);
- new RavenCommitsByDispatched().Execute(this.store);
- new RavenSnapshotByStreamIdAndRevision().Execute(this.store);
- new RavenStreamHeadBySnapshotAge().Execute(this.store);
- scope.Complete();
- }
-
- return true;
- });
- }
-
- public virtual IEnumerable<Commit> GetFrom(Guid streamId, int minRevision, int maxRevision)
- {
- Logger.Debug(Messages.GettingAllCommitsBetween, streamId, minRevision, maxRevision);
-
- return this.QueryCommits<RavenCommitByRevisionRange>(x =>
- x.StreamId == streamId && x.StreamRevision >= minRevision && x.StartingStreamRevision <= maxRevision)
- .OrderBy(x => x.CommitSequence);
- }
- public virtual IEnumerable<Commit> GetFrom(DateTime start)
- {
- Logger.Debug(Messages.GettingAllCommitsFrom, start);
-
- return this.QueryCommits<RavenCommitByDate>(x => x.CommitStamp >= start)
- .OrderBy(x => x.CommitStamp);
- }
- public virtual void Commit(Commit attempt)
- {
- Logger.Debug(Messages.AttemptingToCommit,
- attempt.Events.Count, attempt.StreamId, attempt.CommitSequence);
-
- try
- {
- this.TryRaven(() =>
- {
- using (var scope = this.OpenCommandScope())
- using (var session = this.store.OpenSession())
- {
- session.Advanced.UseOptimisticConcurrency = true;
- session.Store(attempt.ToRavenCommit(this.serializer));
- session.SaveChanges();
- scope.Complete();
- }
-
- Logger.Debug(Messages.CommitPersisted, attempt.CommitId);
- this.SaveStreamHead(attempt.ToRavenStreamHead());
- return true;
- });
- }
- catch (Raven.Abstractions.Exceptions.ConcurrencyException)
- {
- var savedCommit = this.LoadSavedCommit(attempt);
- if (savedCommit.CommitId == attempt.CommitId)
- throw new DuplicateCommitException();
-
- Logger.Debug(Messages.ConcurrentWriteDetected);
- throw new ConcurrencyException();
- }
- }
- private RavenCommit LoadSavedCommit(Commit attempt)
- {
- Logger.Debug(Messages.DetectingConcurrency);
-
- return this.TryRaven(() =>
- {
- using (var scope = this.OpenQueryScope())
- using (var session = this.store.OpenSession())
- {
- var commit = session.Load<RavenCommit>(attempt.ToRavenCommitId());
- scope.Complete();
- return commit;
- }
- });
- }
-
- public virtual IEnumerable<Commit> GetUndispatchedCommits()
- {
- Logger.Debug(Messages.GettingUndispatchedCommits);
- return this.QueryCommits<RavenCommitsByDispatched>(c => c.Dispatched == false)
- .OrderBy(x => x.CommitStamp);
- }
- public virtual void MarkCommitAsDispatched(Commit commit)
- {
- if (commit == null)
- throw new ArgumentNullException("commit");
-
- var patch = new PatchRequest
- {
- Type = PatchCommandType.Set,
- Name = "Dispatched",
- Value = RavenJToken.Parse("true")
- };
- var data = new PatchCommandData
- {
- Key = commit.ToRavenCommitId(),
- Patches = new[] { patch }
- };
-
- Logger.Debug(Messages.MarkingCommitAsDispatched, commit.CommitId);
-
- this.TryRaven(() =>
- {
- using (var scope = this.OpenCommandScope())
- using (var session = this.store.OpenSession())
- {
- session.Advanced.DatabaseCommands.Batch(new[] { data });
- session.SaveChanges();
- scope.Complete();
- return true;
- }
- });
- }
-
- public virtual IEnumerable<StreamHead> GetStreamsToSnapshot(int maxThreshold)
- {
- Logger.Debug(Messages.GettingStreamsToSnapshot);
-
- return this.Query<RavenStreamHead, RavenStreamHeadBySnapshotAge>(s => s.SnapshotAge >= maxThreshold)
- .Select(s => s.ToStreamHead());
- }
- public virtual Snapshot GetSnapshot(Guid streamId, int maxRevision)
- {
- Logger.Debug(Messages.GettingRevision, streamId, maxRevision);
-
- return this.Query<RavenSnapshot, RavenSnapshotByStreamIdAndRevision>(
- x => x.StreamId == streamId && x.StreamRevision <= maxRevision)
- .OrderByDescending(x => x.StreamRevision)
- .FirstOrDefault()
- .ToSnapshot(this.serializer);
- }
- public virtual bool AddSnapshot(Snapshot snapshot)
- {
- if (snapshot == null)
- return false;
-
- Logger.Debug(Messages.AddingSnapshot, snapshot.StreamId, snapshot.StreamRevision);
-
- try
- {
- return this.TryRaven(() =>
- {
- using (var scope = this.OpenCommandScope())
- using (var session = this.store.OpenSession())
- {
- var ravenSnapshot = snapshot.ToRavenSnapshot(this.serializer);
- session.Store(ravenSnapshot);
- session.SaveChanges();
- scope.Complete();
- }
-
- this.SaveStreamHead(snapshot.ToRavenStreamHead());
-
- return true;
- });
- }
- catch (Raven.Abstractions.Exceptions.ConcurrencyException)
- {
- return false;
- }
- }
-
- public virtual void Purge()
+ using System;
+ using System.Collections.Generic;
+ using System.Linq;
+ using System.Linq.Expressions;
+ using System.Net;
+ using System.Threading;
+ using System.Transactions;
+ using Indexes;
+ using Logging;
+ using Raven.Abstractions.Commands;
+ using Raven.Abstractions.Data;
+ using Raven.Client;
+ using Raven.Client.Connection;
+ using Raven.Client.Exceptions;
+ using Raven.Client.Indexes;
+ using Raven.Json.Linq;
+ using Serialization;
+
+ public class RavenPersistenceEngine : IPersistStreams
+ {
+ private const int MinPageSize = 10;
+ private static readonly ILog Logger = LogFactory.BuildLogger(typeof(RavenPersistenceEngine));
+ private readonly IDocumentStore store;
+ private readonly IDocumentSerializer serializer;
+ private readonly TransactionScopeOption scopeOption;
+ private readonly bool consistentQueries;
+ private readonly int pageSize;
+ private int initialized;
+ private readonly string partition;
+
+ public RavenPersistenceEngine(IDocumentStore store, RavenConfiguration config)
+ {
+ if (store == null)
+ throw new ArgumentNullException("store");
+
+ if (config == null)
+ throw new ArgumentNullException("config");
+
+ if (config.Serializer == null)
+ throw new ArgumentException(Messages.SerializerCannotBeNull, "config");
+
+ if (config.PageSize < MinPageSize)
+ throw new ArgumentException(Messages.PagingSizeTooSmall, "config");
+
+ this.store = store;
+ this.serializer = config.Serializer;
+ this.scopeOption = config.ScopeOption;
+ this.consistentQueries = config.ConsistentQueries;
+ this.pageSize = config.PageSize;
+ this.partition = config.Partition;
+ }
+
+ public void Dispose()
+ {
+ this.Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ protected virtual void Dispose(bool disposing)
+ {
+ if (!disposing)
+ return;
+
+ Logger.Debug(Messages.ShuttingDownPersistence);
+ this.store.Dispose();
+ }
+
+ public virtual void Initialize()
+ {
+ if (Interlocked.Increment(ref this.initialized) > 1)
+ return;
+
+ Logger.Debug(Messages.InitializingStorage);
+
+ this.TryRaven(() =>
+ {
+ using (var scope = this.OpenCommandScope())
+ {
+ new RavenCommitByDate().Execute(this.store);
+ new RavenCommitByRevisionRange().Execute(this.store);
+ new RavenCommitsByDispatched().Execute(this.store);
+ new RavenSnapshotByStreamIdAndRevision().Execute(this.store);
+ new RavenStreamHeadBySnapshotAge().Execute(this.store);
+ new EventStoreDocumentsByEntityName().Execute(this.store);
+ scope.Complete();
+ }
+
+ return true;
+ });
+ }
+
+ public virtual IEnumerable<Commit> GetFrom(Guid streamId, int minRevision, int maxRevision)
+ {
+ Logger.Debug(Messages.GettingAllCommitsBetween, streamId, minRevision, maxRevision);
+
+ return this.QueryCommits<RavenCommitByRevisionRange>(x =>
+ x.StreamId == streamId && x.StreamRevision >= minRevision && x.StartingStreamRevision <= maxRevision)
+ .OrderBy(x => x.CommitSequence);
+ }
+
+ public virtual IEnumerable<Commit> GetFrom(DateTime start)
+ {
+ Logger.Debug(Messages.GettingAllCommitsFrom, start);
+
+ return this.QueryCommits<RavenCommitByDate>(x => x.CommitStamp >= start)
+ .OrderBy(x => x.CommitStamp);
+ }
+
+ public virtual void Commit(Commit attempt)
+ {
+ Logger.Debug(Messages.AttemptingToCommit,
+ attempt.Events.Count, attempt.StreamId, attempt.CommitSequence);
+
+ try
+ {
+ this.TryRaven(() =>
+ {
+ using (var scope = this.OpenCommandScope())
+ using (var session = this.store.OpenSession())
+ {
+ session.Advanced.UseOptimisticConcurrency = true;
+ session.Store(attempt.ToRavenCommit(this.partition, this.serializer));
+ session.SaveChanges();
+ scope.Complete();
+ }
+
+ Logger.Debug(Messages.CommitPersisted, attempt.CommitId);
+ this.SaveStreamHead(attempt.ToRavenStreamHead(this.partition));
+ return true;
+ });
+ }
+ catch (Raven.Abstractions.Exceptions.ConcurrencyException)
+ {
+ var savedCommit = this.LoadSavedCommit(attempt);
+ if (savedCommit.CommitId == attempt.CommitId)
+ throw new DuplicateCommitException();
+
+ Logger.Debug(Messages.ConcurrentWriteDetected);
+ throw new ConcurrencyException();
+ }
+ }
+
+ private RavenCommit LoadSavedCommit(Commit attempt)
+ {
+ Logger.Debug(Messages.DetectingConcurrency);
+
+ return this.TryRaven(() =>
+ {
+ using (var scope = this.OpenQueryScope())
+ using (var session = this.store.OpenSession())
+ {
+ var commit = session.Load<RavenCommit>(attempt.ToRavenCommitId(this.partition));
+ scope.Complete();
+ return commit;
+ }
+ });
+ }
+
+ public virtual IEnumerable<Commit> GetUndispatchedCommits()
+ {
+ Logger.Debug(Messages.GettingUndispatchedCommits);
+ return this.QueryCommits<RavenCommitsByDispatched>(c => c.Dispatched == false)
+ .OrderBy(x => x.CommitStamp);
+ }
+
+ public virtual void MarkCommitAsDispatched(Commit commit)
+ {
+ if (commit == null)
+ throw new ArgumentNullException("commit");
+
+ var patch = new PatchRequest
+ {
+ Type = PatchCommandType.Set,
+ Name = "Dispatched",
+ Value = RavenJToken.Parse("true")
+ };
+ var data = new PatchCommandData
+ {
+ Key = commit.ToRavenCommitId(this.partition),
+ Patches = new[] { patch }
+ };
+
+ Logger.Debug(Messages.MarkingCommitAsDispatched, commit.CommitId);
+
+ this.TryRaven(() =>
+ {
+ using (var scope = this.OpenCommandScope())
+ using (var session = this.store.OpenSession())
+ {
+ session.Advanced.DatabaseCommands.Batch(new[] { data });
+ session.SaveChanges();
+ scope.Complete();
+ return true;
+ }
+ });
+ }
+
+ public virtual IEnumerable<StreamHead> GetStreamsToSnapshot(int maxThreshold)
+ {
+ Logger.Debug(Messages.GettingStreamsToSnapshot);
+
+ return this.Query<RavenStreamHead, RavenStreamHeadBySnapshotAge>(s => s.SnapshotAge >= maxThreshold && s.Partition == this.partition)
+ .Select(s => s.ToStreamHead());
+ }
+
+ public virtual Snapshot GetSnapshot(Guid streamId, int maxRevision)
+ {
+ Logger.Debug(Messages.GettingRevision, streamId, maxRevision);
+
+ return Query<RavenSnapshot, RavenSnapshotByStreamIdAndRevision>(x => x.StreamId == streamId && x.StreamRevision <= maxRevision && x.Partition == this.partition)
+ .OrderByDescending(x => x.StreamRevision)
+ .FirstOrDefault()
+ .ToSnapshot(this.serializer);
+ }
+
+ public virtual bool AddSnapshot(Snapshot snapshot)
+ {
+ if (snapshot == null)
+ return false;
+
+ Logger.Debug(Messages.AddingSnapshot, snapshot.StreamId, snapshot.StreamRevision);
+
+ try
+ {
+ return this.TryRaven(() =>
+ {
+ using (var scope = this.OpenCommandScope())
+ using (var session = this.store.OpenSession())
+ {
+ var ravenSnapshot = snapshot.ToRavenSnapshot(this.partition, this.serializer);
+ session.Store(ravenSnapshot);
+ session.SaveChanges();
+ scope.Complete();
+ }
+
+ this.SaveStreamHead(snapshot.ToRavenStreamHead(this.partition));
+
+ return true;
+ });
+ }
+ catch (Raven.Abstractions.Exceptions.ConcurrencyException)
+ {
+ return false;
+ }
+ }
+
+ public virtual void Purge()
{
Logger.Warn(Messages.PurgingStorage);
@@ -246,10 +255,7 @@ public virtual void Purge()
using (var scope = this.OpenCommandScope())
using (var session = this.store.OpenSession())
{
- var cmd = session.Advanced.DatabaseCommands;
- PurgeCollection(cmd, "Tag:[[RavenCommits]]");
- PurgeCollection(cmd, "Tag:[[RavenSnapshots]]");
- PurgeCollection(cmd, "Tag:[[RavenStreamHeads]]");
+ PurgeDocuments(session);
session.SaveChanges();
scope.Complete();
@@ -257,118 +263,135 @@ public virtual void Purge()
}
});
}
- private static void PurgeCollection(IDatabaseCommands commands, string tag)
- {
- commands.DeleteByIndex("Raven/DocumentsByEntityName", new IndexQuery { Query = tag }, true);
- }
-
- private IEnumerable<Commit> QueryCommits<TIndex>(Expression<Func<RavenCommit, bool>> query)
- where TIndex : AbstractIndexCreationTask, new()
- {
- return this.Query<RavenCommit, TIndex>(query).Select(x => x.ToCommit(this.serializer));
- }
- private IEnumerable<T> Query<T, TIndex>(Expression<Func<T, bool>> query)
- where TIndex : AbstractIndexCreationTask, new()
- {
- return this.TryRaven(() =>
- {
- var scope = this.OpenQueryScope();
-
- try
- {
- using (var session = this.OpenQuerySession())
- return session.Query<T, TIndex>()
- .Customize(x => { if (this.consistentQueries) x.WaitForNonStaleResults(); })
- .Where(query)
- .Page(this.pageSize, scope);
- }
- catch (Exception)
- {
- scope.Dispose();
- throw;
- }
- });
- }
- private IDocumentSession OpenQuerySession()
- {
- var session = this.store.OpenSession();
- // defaults to 30 total requests per session (not good for paging over large data sets)
- // which may be encountered when calling GetFrom() and enumerating over the entire store.
- // see http://ravendb.net/documentation/safe-by-default for more information.
- session.Advanced.MaxNumberOfRequestsPerSession = int.MaxValue;
-
- return session;
- }
-
- private void SaveStreamHead(RavenStreamHead streamHead)
- {
- if (this.consistentQueries)
- this.SaveStreamHeadAsync(streamHead);
- else
- ThreadPool.QueueUserWorkItem(x => this.SaveStreamHeadAsync(streamHead), null);
- }
- private void SaveStreamHeadAsync(RavenStreamHead updated)
- {
- this.TryRaven(() =>
- {
- using (var scope = this.OpenCommandScope())
- using (var session = this.store.OpenSession())
- {
- var current = session.Load<RavenStreamHead>(updated.StreamId.ToRavenStreamId()) ?? updated;
- current.HeadRevision = updated.HeadRevision;
-
- if (updated.SnapshotRevision > 0)
- current.SnapshotRevision = updated.SnapshotRevision;
-
- session.Advanced.UseOptimisticConcurrency = false;
- session.Store(current);
- session.SaveChanges();
- scope.Complete(); // if this fails it's no big deal, stream heads can be updated whenever
- }
- return true;
- });
- }
-
- protected virtual T TryRaven<T>(Func<T> callback)
- {
- try
- {
- return callback();
- }
- catch (WebException e)
- {
- Logger.Warn(Messages.StorageUnavailable);
- throw new StorageUnavailableException(e.Message, e);
- }
- catch (NonUniqueObjectException e)
- {
- Logger.Warn(Messages.DuplicateCommitDetected);
- throw new DuplicateCommitException(e.Message, e);
- }
- catch (Raven.Abstractions.Exceptions.ConcurrencyException)
- {
- Logger.Warn(Messages.ConcurrentWriteDetected);
- throw;
- }
- catch (ObjectDisposedException)
- {
- Logger.Warn(Messages.StorageAlreadyDisposed);
- throw;
- }
- catch (Exception e)
- {
- Logger.Error(Messages.StorageThrewException, e.GetType());
- throw new StorageException(e.Message, e);
- }
- }
- protected virtual TransactionScope OpenQueryScope()
- {
- return this.OpenCommandScope() ?? new TransactionScope(TransactionScopeOption.Suppress);
- }
- protected virtual TransactionScope OpenCommandScope()
- {
- return new TransactionScope(this.scopeOption);
- }
- }
+ private void PurgeDocuments(IDocumentSession session)
+ {
+ Func<Type, string> getTagCondition = t => "Tag:" + session.Advanced.DocumentStore.Conventions.GetTypeTagName(t);
+
+ var typeQuery = "(" + getTagCondition(typeof(RavenCommit)) + " OR " + getTagCondition(typeof(RavenSnapshot)) + " OR " + getTagCondition(typeof(RavenStreamHead)) + ")";
+ var partitionQuery = "Partition:" + (this.partition ?? "[[NULL_VALUE]]");
+ var queryText = partitionQuery + " AND " + typeQuery;
+
+ var query = new IndexQuery { Query = queryText };
+
+ session.Advanced.DatabaseCommands
+ .DeleteByIndex("EventStoreDocumentsByEntityName", query, true);
+ }
+
+ private IEnumerable<Commit> QueryCommits<TIndex>(Expression<Func<RavenCommit, bool>> query)
+ where TIndex : AbstractIndexCreationTask, new()
+ {
+ var commits = Query<RavenCommit, TIndex>(query, c => c.Partition == this.partition);
+
+ return commits.Select(x => x.ToCommit(this.serializer));
+ }
+
+ private IEnumerable<T> Query<T, TIndex>(params Expression<Func<T, bool>>[] conditions)
+ where TIndex : AbstractIndexCreationTask, new()
+ {
+ return this.TryRaven(() =>
+ {
+ var scope = this.OpenQueryScope();
+
+ try
+ {
+ using (var session = this.OpenQuerySession())
+ {
+ IQueryable<T> query = session.Query<T, TIndex>()
+ .Customize(x => { if (this.consistentQueries) x.WaitForNonStaleResults(); });
+
+ query = conditions.Aggregate(query, (current, condition) => current.Where(condition));
+
+ return query.Page(this.pageSize, scope);
+ }
+ }
+ catch (Exception)
+ {
+ scope.Dispose();
+ throw;
+ }
+ });
+ }
+ private IDocumentSession OpenQuerySession()
+ {
+ var session = this.store.OpenSession();
+
+ // defaults to 30 total requests per session (not good for paging over large data sets)
+ // which may be encountered when calling GetFrom() and enumerating over the entire store.
+ // see http://ravendb.net/documentation/safe-by-default for more information.
+ session.Advanced.MaxNumberOfRequestsPerSession = int.MaxValue;
+
+ return session;
+ }
+
+ private void SaveStreamHead(RavenStreamHead streamHead)
+ {
+ if (this.consistentQueries)
+ this.SaveStreamHeadAsync(streamHead);
+ else
+ ThreadPool.QueueUserWorkItem(x => this.SaveStreamHeadAsync(streamHead), null);
+ }
+ private void SaveStreamHeadAsync(RavenStreamHead updated)
+ {
+ this.TryRaven(() =>
+ {
+ using (var scope = this.OpenCommandScope())
+ using (var session = this.store.OpenSession())
+ {
+ var current = session.Load<RavenStreamHead>(updated.StreamId.ToRavenStreamId(this.partition)) ?? updated;
+ current.HeadRevision = updated.HeadRevision;
+
+ if (updated.SnapshotRevision > 0)
+ current.SnapshotRevision = updated.SnapshotRevision;
+
+ session.Advanced.UseOptimisticConcurrency = false;
+ session.Store(current);
+ session.SaveChanges();
+ scope.Complete(); // if this fails it's no big deal, stream heads can be updated whenever
+ }
+ return true;
+ });
+ }
+
+ protected virtual T TryRaven<T>(Func<T> callback)
+ {
+ try
+ {
+ return callback();
+ }
+ catch (WebException e)
+ {
+ Logger.Warn(Messages.StorageUnavailable);
+ throw new StorageUnavailableException(e.Message, e);
+ }
+ catch (NonUniqueObjectException e)
+ {
+ Logger.Warn(Messages.DuplicateCommitDetected);
+ throw new DuplicateCommitException(e.Message, e);
+ }
+ catch (Raven.Abstractions.Exceptions.ConcurrencyException)
+ {
+ Logger.Warn(Messages.ConcurrentWriteDetected);
+ throw;
+ }
+ catch (ObjectDisposedException)
+ {
+ Logger.Warn(Messages.StorageAlreadyDisposed);
+ throw;
+ }
+ catch (Exception e)
+ {
+ Logger.Error(Messages.StorageThrewException, e.GetType());
+ throw new StorageException(e.Message, e);
+ }
+ }
+ protected virtual TransactionScope OpenQueryScope()
+ {
+ return this.OpenCommandScope() ?? new TransactionScope(TransactionScopeOption.Suppress);
+ }
+ protected virtual TransactionScope OpenCommandScope()
+ {
+ return new TransactionScope(this.scopeOption);
+ }
+ }
}
View
2  src/proj/EventStore.Persistence.RavenPersistence/RavenSnapshot.cs
@@ -4,6 +4,8 @@
public class RavenSnapshot
{
+ public string Id { get; set; }
+ public string Partition { get; set; }
public Guid StreamId { get; set; }
public int StreamRevision { get; set; }
public object Payload { get; set; }
View
1  src/proj/EventStore.Persistence.RavenPersistence/RavenStreamHead.cs
@@ -5,6 +5,7 @@ namespace EventStore.Persistence.RavenPersistence
public class RavenStreamHead
{
public string Id { get; set; }
+ public string Partition { get; set; }
public Guid StreamId { get; set; }
public int HeadRevision { get; set; }
public int SnapshotRevision { get; set; }
View
32 src/tests/EventStore.Persistence.AcceptanceTests/Engines/AcceptanceTestRavenPersistenceFactory.cs
@@ -1,24 +1,34 @@
+
+
namespace EventStore.Persistence.AcceptanceTests.Engines
{
using System.Transactions;
- using RavenPersistence;
using Serialization;
+ using Persistence.RavenPersistence;
public class AcceptanceTestRavenPersistenceFactory : RavenPersistenceFactory
{
- private static readonly RavenConfiguration Config = new RavenConfiguration
- {
- Serializer = new DocumentObjectSerializer(),
- ScopeOption = TransactionScopeOption.Suppress,
- ConsistentQueries = true, // helps tests pass consistently
- RequestedPageSize = int.Parse("pageSize".GetSetting() ?? "10"), // smaller values help bring out bugs
- MaxServerPageSize = int.Parse("serverPageSize".GetSetting() ?? "1024"), // raven default
- ConnectionName = "Raven"
- };
+ public static RavenConfiguration GetDefaultConfig()
+ {
+ return new RavenConfiguration
+ {
+ Serializer = new DocumentObjectSerializer(),
+ ScopeOption = TransactionScopeOption.Suppress,
+ ConsistentQueries = true, // helps tests pass consistently
+ RequestedPageSize = int.Parse("pageSize".GetSetting() ?? "10"), // smaller values help bring out bugs
+ MaxServerPageSize = int.Parse("serverPageSize".GetSetting() ?? "1024"), // raven default
+ ConnectionName = "Raven"
+ };
+ }
public AcceptanceTestRavenPersistenceFactory()
- : base(Config)
+ : base(GetDefaultConfig())
{
}
+
+ public AcceptanceTestRavenPersistenceFactory(RavenConfiguration config)
+ : base(config)
+ {
+ }
}
}
View
1  src/tests/EventStore.Persistence.AcceptanceTests/EventStore.Persistence.AcceptanceTests.csproj
@@ -53,6 +53,7 @@
<Compile Include="PersistenceFactoryScanner.cs" />
<Compile Include="PersistenceTests.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
+ <Compile Include="RavenPersistence\RavenPersistencePartitionTests.cs" />
</ItemGroup>
<ItemGroup>
<Content Include="..\..\proj\CustomDictionary.xml">
View
198 src/tests/EventStore.Persistence.AcceptanceTests/RavenPersistence/RavenPersistencePartitionTests.cs
@@ -0,0 +1,198 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using EventStore.Persistence.AcceptanceTests.Engines;
+using Machine.Specifications;
+
+#pragma warning disable 169
+// ReSharper disable InconsistentNaming
+
+namespace EventStore.Persistence.AcceptanceTests.RavenPersistence
+{
+ [Subject("RavenPersistence - Partitions")]
+ public class when_committing_a_stream_with_the_same_id_as_a_stream_in_another_partition : using_raven_persistence_with_partitions
+ {
+ static IPersistStreams persistence1, persistence2;
+ static Commit attempt1, attempt2;
+
+ static Exception thrown;
+
+ Establish context = () =>
+ {
+ persistence1 = NewEventStoreWithPartition();
+ persistence2 = NewEventStoreWithPartition();
+
+ var now = SystemTime.UtcNow;
+ attempt1 = streamId.BuildAttempt(now);
+ attempt2 = streamId.BuildAttempt(now.Subtract(TimeSpan.FromDays(1)));
+
+ persistence1.Commit(attempt1);
+ };
+
+ Because of = () =>
+ thrown = Catch.Exception(() => persistence2.Commit(attempt2));
+
+ It should_succeed = () =>
+ thrown.ShouldBeNull();
+
+ It should_persist_to_the_correct_partition = () =>
+ {
+ var stream = persistence2.GetFrom(streamId, 0, int.MaxValue).ToArray();
+ stream.ShouldNotBeNull();
+ stream.Count().ShouldEqual(1);
+ stream.First().CommitStamp.ShouldEqual(attempt2.CommitStamp);
+ };
+
+ It should_not_affect_the_stream_from_the_other_partition = () =>
+ {
+ var stream = persistence1.GetFrom(streamId, 0, int.MaxValue).ToArray();
+ stream.ShouldNotBeNull();
+ stream.Count().ShouldEqual(1);
+ stream.First().CommitStamp.ShouldEqual(attempt1.CommitStamp);
+ };
+ }
+
+ [Subject("RavenPersistence - Partitions")]
+ public class when_saving_a_snapshot_in_a_partition : using_raven_persistence_with_partitions
+ {
+ static Snapshot snapshot;
+ static IPersistStreams persistence1, persistence2;
+ static bool added;
+
+ Establish context = () =>
+ {
+ snapshot = new Snapshot(streamId, 1, "Snapshot");
+ persistence1 = NewEventStoreWithPartition();
+ persistence2 = NewEventStoreWithPartition();
+ persistence1.Commit(streamId.BuildAttempt());
+ };
+ Because of = () =>
+ added = persistence1.AddSnapshot(snapshot);
+
+ It should_indicate_the_snapshot_was_added = () =>
+ added.ShouldBeTrue();
+
+ It should_be_able_to_retrieve_the_snapshot = () =>
+ persistence1.GetSnapshot(streamId, snapshot.StreamRevision).ShouldNotBeNull();
+
+ It should_not_be_able_to_retrieve_the_snapshot_from_another_partition = () =>
+ persistence2.GetSnapshot(streamId, snapshot.StreamRevision).ShouldBeNull();
+ }
+
+ [Subject("RavenPersistence - Partitions")]
+ public class when_reading_all_commits_from_a_particular_point_in_time_from_a_partition : using_raven_persistence_with_partitions
+ {
+ static DateTime now;
+ static IPersistStreams persistence1, persistence2;
+ static Commit first, second, third, fourth, fifth;
+ static Commit[] committed1, committed2;
+
+ Establish context = () =>
+ {
+ now = SystemTime.UtcNow.AddYears(1);
+ first = Guid.NewGuid().BuildAttempt(now.AddSeconds(1));
+ second = first.BuildNextAttempt();
+ third = second.BuildNextAttempt();
+ fourth = third.BuildNextAttempt();
+ fifth = Guid.NewGuid().BuildAttempt(now.AddSeconds(1));
+
+ persistence1 = NewEventStoreWithPartition();
+ persistence2 = NewEventStoreWithPartition();
+
+ persistence1.Commit(first);
+ persistence1.Commit(second);
+ persistence1.Commit(third);
+ persistence1.Commit(fourth);
+ persistence2.Commit(fifth);
+ };
+
+ Because of = () =>
+ committed1 = persistence1.GetFrom(now).ToArray();
+
+ It should_return_all_commits_on_or_after_the_point_in_time_specified = () =>
+ committed1.Length.ShouldEqual(4);
+
+ It should_not_return_commits_from_other_partitions = () =>
+ committed1.Any(c => c.CommitId.Equals(fifth.CommitId)).ShouldBeFalse();
+ }
+
+ [Subject("RavenPersistence - Partitions")]
+ public class when_purging_all_commits : using_raven_persistence_with_partitions
+ {
+ static IPersistStreams persistence1, persistence2;
+
+ Establish context = () =>
+ {
+ persistence1 = NewEventStoreWithPartition();
+ persistence2 = NewEventStoreWithPartition();
+
+ persistence1.Commit(streamId.BuildAttempt());
+ persistence2.Commit(streamId.BuildAttempt());
+ };
+ Because of = () =>
+ {
+ Thread.Sleep(50); // 50 ms = enough time for Raven to become consistent
+ persistence1.Purge();
+ };
+
+ It should_purge_all_commits_stored = () =>
+ persistence1.GetFrom(DateTime.MinValue).Count().ShouldEqual(0);
+
+ It should_purge_all_streams_to_snapshot = () =>
+ persistence1.GetStreamsToSnapshot(0).Count().ShouldEqual(0);
+
+ It should_purge_all_undispatched_commits = () =>
+ persistence1.GetUndispatchedCommits().Count().ShouldEqual(0);
+
+ It should_not_purge_all_commits_stored_in_other_partitions = () =>
+ persistence2.GetFrom(DateTime.MinValue).Count().ShouldNotEqual(0);
+
+ It should_not_purge_all_streams_to_snapshot_in_other_partitions = () =>
+ persistence2.GetStreamsToSnapshot(0).Count().ShouldNotEqual(0);
+
+ It should_not_purge_all_undispatched_commits_in_other_partitions = () =>
+ persistence2.GetUndispatchedCommits().Count().ShouldNotEqual(0);
+ }
+
+ public abstract class using_raven_persistence_with_partitions
+ {
+ protected static Guid streamId;
+ protected static List<IPersistStreams> instantiatedPersistence;
+
+ Establish context = () =>
+ {
+ streamId = Guid.NewGuid();
+ instantiatedPersistence = new List<IPersistStreams>();
+ };
+
+ Cleanup everything = () =>
+ {
+ foreach (var persistence in instantiatedPersistence)
+ {
+ persistence.Dispose();
+ }
+ };
+
+ protected static IPersistStreams NewEventStoreWithPartition()
+ {
+ return NewEventStoreWithPartition(Guid.NewGuid().ToString());
+ }
+
+ protected static IPersistStreams NewEventStoreWithPartition(string partition)
+ {
+ var config = AcceptanceTestRavenPersistenceFactory.GetDefaultConfig();
+ config.Partition = partition;
+
+ var persistence = new AcceptanceTestRavenPersistenceFactory(config).Build();
+ persistence.Initialize();
+
+ instantiatedPersistence.Add(persistence);
+
+ return persistence;
+ }
+ }
+}
+
+// ReSharper enable InconsistentNaming
+#pragma warning restore 169
Please sign in to comment.
Something went wrong with that request. Please try again.