diff --git a/src/NEventStore.Persistence.MongoDB.Tests/AcceptanceTestMongoPersistenceFactory.cs b/src/NEventStore.Persistence.MongoDB.Tests/AcceptanceTestMongoPersistenceFactory.cs deleted file mode 100644 index e2d12e610..000000000 --- a/src/NEventStore.Persistence.MongoDB.Tests/AcceptanceTestMongoPersistenceFactory.cs +++ /dev/null @@ -1,33 +0,0 @@ -namespace NEventStore.Persistence.MongoDB.Tests -{ - using System; - using NEventStore.Serialization; - - public class AcceptanceTestMongoPersistenceFactory : MongoPersistenceFactory - { - private const string EnvVarKey = "NEventStore.MongoDB"; - - public AcceptanceTestMongoPersistenceFactory() - : base(GetConnectionString, new DocumentObjectSerializer()) - {} - - private static string GetConnectionString() - { - string connectionString = Environment.GetEnvironmentVariable(EnvVarKey, EnvironmentVariableTarget.Process); - - if (connectionString == null) - { - string message = string.Format( - "Cannot initialize acceptance tests for Mongo. Cannot find the '{0}' environment variable. Please ensure " + - "you have correctly setup the connection string environment variables. Refer to the " + - "NEventStore wiki for details.", - EnvVarKey); - throw new InvalidOperationException(message); - } - - connectionString = connectionString.TrimStart('"').TrimEnd('"'); - - return connectionString; - } - } -} diff --git a/src/NEventStore.Persistence.MongoDB.Tests/AcceptanceTests/OptimisticLoopTests.cs b/src/NEventStore.Persistence.MongoDB.Tests/AcceptanceTests/OptimisticLoopTests.cs deleted file mode 100644 index f06adc257..000000000 --- a/src/NEventStore.Persistence.MongoDB.Tests/AcceptanceTests/OptimisticLoopTests.cs +++ /dev/null @@ -1,408 +0,0 @@ - -namespace NEventStore.Persistence.MongoDB.Tests.AcceptanceTests -{ - using System; - using System.Collections.Generic; - using System.Linq; - using System.Text; - using System.Threading; - using System.Diagnostics; - using NEventStore.Client; - using NEventStore.Diagnostics; - using NEventStore.Persistence.AcceptanceTests; - using NEventStore.Persistence.AcceptanceTests.BDD; - using Xunit; - using Xunit.Should; - - public class Observer : IObserver - { - private int _counter; - - public int Counter - { - get { return _counter; } - } - - private string _lastCommit; - - public void OnNext(ICommit value) - { - if (value.CheckpointToken != _lastCommit) - _counter++; - - _lastCommit = value.CheckpointToken; - } - - public void OnError(Exception error) - { - } - - public void OnCompleted() - { - } - } - - public class when_a_reader_observe_commits_from_a_lot_of_writers : SpecificationBase - { - protected const int IterationsPerWriter = 10; - protected const int ParallelWriters = 30; - protected const int PollingInterval = 1; - readonly IList _writers = new List(); - private PollingClient _client; - private Observer _observer; - private IObserveCommits _observeCommits; - private IDisposable _subscription; - - protected override void Context() - { - for (int c = 1; c <= ParallelWriters; c++) - { - var client = new AcceptanceTestMongoPersistenceFactory().Build(); - - if (c == 1) - { - client.Drop(); - client.Initialize(); - } - - _writers.Add(client); - } - - _observer = new Observer(); - - var reader = new AcceptanceTestMongoPersistenceFactory().Build(); - _client = new PollingClient(reader, PollingInterval); - - _observeCommits = _client.ObserveFrom(null); - _subscription = _observeCommits.Subscribe(_observer); - _observeCommits.Start(); - } - - protected override void Because() - { - var start = new ManualResetEventSlim(false); - var stop = new ManualResetEventSlim(false); - long counter = 0; - for (int t = 0; t < ParallelWriters; t++) - { - int t1 = t; - var runner = new Thread(() => - { - start.Wait(); - for (int c = 0; c < IterationsPerWriter; c++) - { - try - { - _writers[t1].Commit(Guid.NewGuid().ToString().BuildAttempt()); - } - catch (Exception ex) - { - Debug.WriteLine(ex.Message); - throw; - } - Thread.Sleep(1); - } - Interlocked.Increment(ref counter); - if (counter == ParallelWriters) - { - stop.Set(); - } - }); - - runner.Start(); - } - start.Set(); - stop.Wait(); - Thread.Sleep(500); - _subscription.Dispose(); - } - - [Fact] - public void should_never_miss_a_commit() - { - _observer.Counter.ShouldBe(IterationsPerWriter * ParallelWriters); - } - - protected override void Cleanup() - { - for (int c = 0; c < ParallelWriters; c++) - { - if (c == ParallelWriters - 1) - _writers[c].Drop(); - - _writers[c].Dispose(); - } - } - } - - public class when_first_commit_is_persisted : PersistenceEngineConcern - { - ICommit _commit; - protected override void Context() - { - } - - protected override void Because() - { - _commit = Persistence.Commit(Guid.NewGuid().ToString().BuildAttempt()); - } - - [Fact] - public void should_have_checkpoint_equal_to_one() - { - LongCheckpoint.Parse(_commit.CheckpointToken).LongValue.ShouldBe(1); - } - } - - public class when_second_commit_is_persisted : PersistenceEngineConcern - { - ICommit _commit; - protected override void Context() - { - Persistence.Commit(Guid.NewGuid().ToString().BuildAttempt()); - } - - protected override void Because() - { - _commit = Persistence.Commit(Guid.NewGuid().ToString().BuildAttempt()); - } - - [Fact] - public void should_have_checkpoint_equal_to_two() - { - LongCheckpoint.Parse(_commit.CheckpointToken).LongValue.ShouldBe(2); - } - - } - - public class when_commit_is_persisted_after_a_stream_deletion : PersistenceEngineConcern - { - ICommit _commit; - protected override void Context() - { - var commit = Persistence.Commit(Guid.NewGuid().ToString().BuildAttempt()); - Persistence.DeleteStream(commit.BucketId, commit.StreamId); - } - - protected override void Because() - { - _commit = Persistence.Commit(Guid.NewGuid().ToString().BuildAttempt()); - } - - [Fact] - public void should_have_checkpoint_equal_to_two() - { - LongCheckpoint.Parse(_commit.CheckpointToken).LongValue.ShouldBe(2); - } - } - - public class when_commit_is_persisted_after_concurrent_insertions_and_deletions : PersistenceEngineConcern - { - const int Iterations = 10; - const int Clients = 10; - string _checkpointToken; - - protected override void Context() - { - var lazyInitializer = Persistence; - - var start = new ManualResetEventSlim(false); - var stop = new ManualResetEventSlim(false); - int counter = 0; - - for (int c = 0; c < Clients; c++) - { - new Thread(() => - { - start.Wait(); - for (int i = 0; i < Iterations; i++) - { - var commit = Persistence.Commit(Guid.NewGuid().ToString().BuildAttempt()); - Persistence.DeleteStream(commit.BucketId, commit.StreamId); - } - - Interlocked.Increment(ref counter); - if (counter >= Clients) - stop.Set(); - - }).Start(); - } - - start.Set(); - stop.Wait(); - } - - protected override void Because() - { - _checkpointToken = Persistence.Commit(Guid.NewGuid().ToString().BuildAttempt()).CheckpointToken; - } - - [Fact] - public void should_have_correct_checkpoint() - { - LongCheckpoint.Parse(_checkpointToken).LongValue.ShouldBe(Clients * Iterations + 1); - } - } - - public class when_a_stream_is_deleted : PersistenceEngineConcern - { - ICommit _commit; - - protected override void Context() - { - _commit = Persistence.Commit(Guid.NewGuid().ToString().BuildAttempt()); - } - - protected override void Because() - { - Persistence.DeleteStream(_commit.BucketId, _commit.StreamId); - } - - [Fact] - public void the_commits_cannot_be_loaded_from_the_stream() - { - Persistence.GetFrom(_commit.StreamId, int.MinValue, int.MaxValue).ShouldBeEmpty(); - } - - [Fact] - public void the_commits_cannot_be_loaded_from_the_bucket() - { - Persistence.GetFrom(_commit.BucketId,DateTime.MinValue).ShouldBeEmpty(); - } - - [Fact] - public void the_commits_cannot_be_loaded_from_the_checkpoint() - { - const string origin = null; - Persistence.GetFrom(origin).ShouldBeEmpty(); - } - - [Fact] - public void the_commits_cannot_be_loaded_from_bucket_and_start_date() - { - Persistence.GetFrom(_commit.BucketId,DateTime.MinValue).ShouldBeEmpty(); - } - - [Fact] - public void the_commits_cannot_be_loaded_from_bucket_and_date_range() - { - Persistence.GetFromTo(_commit.BucketId, DateTime.MinValue, DateTime.MaxValue).ShouldBeEmpty(); - } - } - - public class when_deleted_streams_are_purged_and_last_commit_is_marked_as_deleted : PersistenceEngineConcern - { - ICommit[] _commits; - - protected override void Context() - { - Persistence.Commit(Guid.NewGuid().ToString().BuildAttempt()); - var commit = Persistence.Commit(Guid.NewGuid().ToString().BuildAttempt()); - Persistence.DeleteStream(commit.BucketId, commit.StreamId); - Persistence.Commit(Guid.NewGuid().ToString().BuildAttempt()); - commit = Persistence.Commit(Guid.NewGuid().ToString().BuildAttempt()); - Persistence.DeleteStream(commit.BucketId, commit.StreamId); - } - - protected override void Because() - { - var mongoEngine = (MongoPersistenceEngine)(((PerformanceCounterPersistenceEngine)Persistence).UnwrapPersistenceEngine()); - mongoEngine.EmptyRecycleBin(); - _commits = mongoEngine.GetDeletedCommits().ToArray(); - } - - [Fact] - public void last_deleted_commit_is_not_purged_to_preserve_checkpoint_numbering() - { - _commits.Length.ShouldBe(1); - } - - [Fact] - public void last_deleted_commit_has_the_higher_checkpoint_number() - { - LongCheckpoint.Parse(_commits[0].CheckpointToken).LongValue.ShouldBe(4); - } - } - - public class when_deleted_streams_are_purged : PersistenceEngineConcern - { - ICommit[] _commits; - - protected override void Context() - { - Persistence.Commit(Guid.NewGuid().ToString().BuildAttempt()); - var commit = Persistence.Commit(Guid.NewGuid().ToString().BuildAttempt()); - Persistence.DeleteStream(commit.BucketId, commit.StreamId); - commit = Persistence.Commit(Guid.NewGuid().ToString().BuildAttempt()); - Persistence.DeleteStream(commit.BucketId, commit.StreamId); - Persistence.Commit(Guid.NewGuid().ToString().BuildAttempt()); - } - - protected override void Because() - { - var mongoEngine = (MongoPersistenceEngine)(((PerformanceCounterPersistenceEngine)Persistence).UnwrapPersistenceEngine()); - mongoEngine.EmptyRecycleBin(); - _commits = mongoEngine.GetDeletedCommits().ToArray(); - } - - [Fact] - public void all_deleted_commits_are_purged() - { - _commits.Length.ShouldBe(0); - } - } - - public class when_stream_is_added_after_a_bucket_purge : PersistenceEngineConcern - { - LongCheckpoint _checkpointBeforePurge; - LongCheckpoint _checkpointAfterPurge; - - protected override void Context() - { - var commit = Persistence.Commit(Guid.NewGuid().ToString().BuildAttempt()); - _checkpointBeforePurge = LongCheckpoint.Parse(commit.CheckpointToken); - Persistence.DeleteStream(commit.StreamId); - Persistence.Purge("default"); - } - - protected override void Because() - { - var commit = Persistence.Commit(Guid.NewGuid().ToString().BuildAttempt()); - _checkpointAfterPurge = LongCheckpoint.Parse(commit.CheckpointToken); - } - - [Fact] - public void checkpoint_number_must_be_greater_than () - { - _checkpointAfterPurge.ShouldBeGreaterThan(_checkpointBeforePurge); - } - } - - public class when_a_stream_with_two_or_more_commits_is_deleted : PersistenceEngineConcern - { - private string _streamId; - private string _bucketId; - - protected override void Context() - { - _streamId = Guid.NewGuid().ToString(); - var commit = Persistence.Commit(_streamId.BuildAttempt()); - _bucketId = commit.BucketId; - - Persistence.Commit(commit.BuildNextAttempt()); - } - - protected override void Because() - { - Persistence.DeleteStream(_bucketId, _streamId); - } - - [Fact] - public void all_commits_are_deleted() - { - var commits = Persistence.GetFrom(_bucketId, _streamId, int.MinValue, int.MaxValue).ToArray(); - - Assert.Equal(0, commits.Length); - } - } -} diff --git a/src/NEventStore.Persistence.MongoDB.Tests/AcceptanceTests/PersistenceEngineFixture.cs b/src/NEventStore.Persistence.MongoDB.Tests/AcceptanceTests/PersistenceEngineFixture.cs deleted file mode 100644 index e652becef..000000000 --- a/src/NEventStore.Persistence.MongoDB.Tests/AcceptanceTests/PersistenceEngineFixture.cs +++ /dev/null @@ -1,13 +0,0 @@ -// ReSharper disable once CheckNamespace -namespace NEventStore.Persistence.AcceptanceTests -{ - using NEventStore.Persistence.MongoDB.Tests; - - public partial class PersistenceEngineFixture - { - public PersistenceEngineFixture() - { - _createPersistence = _ => new AcceptanceTestMongoPersistenceFactory().Build(); - } - } -} \ No newline at end of file diff --git a/src/NEventStore.Persistence.MongoDB.Tests/AcceptanceTests/SharedPersistenceTests.cs b/src/NEventStore.Persistence.MongoDB.Tests/AcceptanceTests/SharedPersistenceTests.cs deleted file mode 100644 index 08c7c5f5e..000000000 --- a/src/NEventStore.Persistence.MongoDB.Tests/AcceptanceTests/SharedPersistenceTests.cs +++ /dev/null @@ -1,54 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace NEventStore.Persistence.MongoDB.Tests.AcceptanceTests -{ - using System.Diagnostics; - using NEventStore.Diagnostics; - using NEventStore.Persistence.AcceptanceTests; - using NEventStore.Persistence.AcceptanceTests.BDD; - using Xunit; - using Xunit.Should; - - public class when_a_commit_is_persisted_from_a_second_process : SpecificationBase - { - IPersistStreams _process1; - ICommit _commit1; - IPersistStreams _process2; - ICommit _commit2; - - protected override void Context() - { - _process1 = new AcceptanceTestMongoPersistenceFactory().Build(); - _process1.Initialize(); - _commit1 = _process1.Commit(Guid.NewGuid().ToString().BuildAttempt()); - - _process2 = new AcceptanceTestMongoPersistenceFactory().Build(); - _process2.Initialize(); - } - - protected override void Because() - { - _commit2 = _process2.Commit(Guid.NewGuid().ToString().BuildAttempt()); - } - - [Fact] - public void should_have_a_checkpoint_greater_than_the_previous_commit_on_the_other_process() - { - var chkNum1 = LongCheckpoint.Parse(_commit1.CheckpointToken).LongValue; - var chkNum2 = LongCheckpoint.Parse(_commit2.CheckpointToken).LongValue; - - chkNum2.ShouldBeGreaterThan(chkNum1); - } - - protected override void Cleanup() - { - _process1.Drop(); - _process1.Dispose(); - _process2.Dispose(); - } - } -} diff --git a/src/NEventStore.Persistence.MongoDB.Tests/NEventStore.Persistence.MongoDB.Tests.csproj b/src/NEventStore.Persistence.MongoDB.Tests/NEventStore.Persistence.MongoDB.Tests.csproj deleted file mode 100644 index e29603369..000000000 --- a/src/NEventStore.Persistence.MongoDB.Tests/NEventStore.Persistence.MongoDB.Tests.csproj +++ /dev/null @@ -1,90 +0,0 @@ - - - - - Debug - AnyCPU - {49C544AF-02FB-4BE9-BDDF-79EB087E00C3} - Library - Properties - NEventStore.Persistence.MongoDB.Tests - NEventStore.Persistence.MongoDB.Tests - v4.5 - 512 - - ..\..\ - true - - - true - full - false - bin\Debug\ - DEBUG;TRACE - prompt - 4 - false - - - pdbonly - true - bin\Release\ - TRACE - prompt - 4 - false - - - - - - - - - - - False - ..\packages\xunit.1.9.1\lib\net20\xunit.dll - - - False - ..\packages\xunit.should.1.1\lib\net35\xunit.should.dll - - - - - AcceptanceTests\PersistenceTests.cs - - - - - - - - - - {32ADD8CE-0F3F-41D8-BFA1-6E5D685E64DD} - NEventStore.Persistence.MongoDB - - - {03946843-F343-419C-88EF-3E446D08DFA6} - NEventStore - - - {3FE594FE-16FF-4405-97D5-5A58FB12520B} - NEventStore.Persistence.AcceptanceTests - - - - - - - - - \ No newline at end of file diff --git a/src/NEventStore.Persistence.MongoDB.Tests/Properties/AssemblyInfo.cs b/src/NEventStore.Persistence.MongoDB.Tests/Properties/AssemblyInfo.cs deleted file mode 100644 index d064aef66..000000000 --- a/src/NEventStore.Persistence.MongoDB.Tests/Properties/AssemblyInfo.cs +++ /dev/null @@ -1,39 +0,0 @@ -using System.Reflection; -using System.Runtime.InteropServices; - -// General Information about an assembly is controlled through the following -// set of attributes. Change these attribute values to modify the information -// associated with an assembly. - -[assembly: AssemblyTitle("NEventStore.Persistence.MongoPersistence.Tests")] -[assembly: AssemblyDescription("")] -[assembly: AssemblyConfiguration("")] -[assembly: AssemblyCompany("")] -[assembly: AssemblyProduct("NEventStore")] -[assembly: AssemblyCopyright("Copyright © 2013")] -[assembly: AssemblyTrademark("")] -[assembly: AssemblyCulture("")] - -// Setting ComVisible to false makes the types in this assembly not visible -// to COM components. If you need to access a type in this assembly from -// COM, set the ComVisible attribute to true on that type. - -[assembly: ComVisible(false)] - -// The following GUID is for the ID of the typelib if this project is exposed to COM - -[assembly: Guid("dd2cf09b-c391-4f93-b60b-2d8522ad73bf")] - -// Version information for an assembly consists of the following four values: -// -// Major Version -// Minor Version -// Build Number -// Revision -// -// You can specify all the values or you can default the Build and Revision Numbers -// by using the '*' as shown below: -// [assembly: AssemblyVersion("1.0.*")] - -[assembly: AssemblyVersion("1.0.0.0")] -[assembly: AssemblyFileVersion("1.0.0.0")] \ No newline at end of file diff --git a/src/NEventStore.Persistence.MongoDB.Tests/packages.config b/src/NEventStore.Persistence.MongoDB.Tests/packages.config deleted file mode 100644 index 919f9ab8b..000000000 --- a/src/NEventStore.Persistence.MongoDB.Tests/packages.config +++ /dev/null @@ -1,5 +0,0 @@ - - - - - \ No newline at end of file diff --git a/src/NEventStore.Persistence.MongoDB/ExtensionMethods.cs b/src/NEventStore.Persistence.MongoDB/ExtensionMethods.cs deleted file mode 100644 index 3fa8a8bf8..000000000 --- a/src/NEventStore.Persistence.MongoDB/ExtensionMethods.cs +++ /dev/null @@ -1,177 +0,0 @@ -namespace NEventStore.Persistence.MongoDB -{ - using System; - using System.Collections.Generic; - using System.Linq; - using global::MongoDB.Bson; - using global::MongoDB.Bson.IO; - using global::MongoDB.Bson.Serialization.Options; - using global::MongoDB.Bson.Serialization.Serializers; - using global::MongoDB.Driver; - using global::MongoDB.Driver.Builders; - using NEventStore.Serialization; - using BsonSerializer = global::MongoDB.Bson.Serialization.BsonSerializer; - - public static class ExtensionMethods - { - public static Dictionary AsDictionary(this BsonValue bsonValue) - { - using (BsonReader reader = BsonReader.Create(bsonValue.ToJson())) - { - var dictionarySerializer = new DictionarySerializer(); - object result = dictionarySerializer.Deserialize(reader, - typeof(Dictionary), - new DictionarySerializationOptions()); - return (Dictionary)result; - } - } - - public static BsonDocument ToMongoCommit(this CommitAttempt commit, Func getNextCheckpointNumber, IDocumentSerializer serializer) - { - int streamRevision = commit.StreamRevision - (commit.Events.Count - 1); - int streamRevisionStart = streamRevision; - IEnumerable events = commit - .Events - .Select(e => - new BsonDocument - { - {MongoCommitFields.StreamRevision, streamRevision++}, - {MongoCommitFields.Payload, new BsonDocumentWrapper(typeof (EventMessage), serializer.Serialize(e))} - }); - return new BsonDocument - { - {MongoCommitFields.CheckpointNumber, getNextCheckpointNumber()}, - {MongoCommitFields.CommitId, commit.CommitId}, - {MongoCommitFields.CommitStamp, commit.CommitStamp}, - {MongoCommitFields.Headers, BsonDocumentWrapper.Create(commit.Headers)}, - {MongoCommitFields.Events, new BsonArray(events)}, - {MongoCommitFields.Dispatched, false}, - {MongoCommitFields.StreamRevisionFrom, streamRevisionStart}, - {MongoCommitFields.StreamRevisionTo, streamRevision - 1}, - {MongoCommitFields.BucketId, commit.BucketId}, - {MongoCommitFields.StreamId, commit.StreamId}, - {MongoCommitFields.CommitSequence, commit.CommitSequence} - }; - } - - public static ICommit ToCommit(this BsonDocument doc, IDocumentSerializer serializer) - { - if (doc == null) - { - return null; - } - - string bucketId = doc[MongoCommitFields.BucketId].AsString; - string streamId = doc[MongoCommitFields.StreamId].AsString; - int commitSequence = doc[MongoCommitFields.CommitSequence].AsInt32; - - List events = doc[MongoCommitFields.Events] - .AsBsonArray - .Select(e => e.AsBsonDocument[MongoCommitFields.Payload].IsBsonDocument - ? BsonSerializer.Deserialize(e.AsBsonDocument[MongoCommitFields.Payload].ToBsonDocument()) - : serializer.Deserialize(e.AsBsonDocument[MongoCommitFields.Payload].AsByteArray)) - .ToList(); - //int streamRevision = doc[MongoCommitFields.Events].AsBsonArray.Last().AsBsonDocument[MongoCommitFields.StreamRevision].AsInt32; - int streamRevision = doc[MongoCommitFields.StreamRevisionTo].AsInt32; - return new Commit(bucketId, - streamId, - streamRevision, - doc[MongoCommitFields.CommitId].AsGuid, - commitSequence, - doc[MongoCommitFields.CommitStamp].ToUniversalTime(), - new LongCheckpoint(doc[MongoCommitFields.CheckpointNumber].ToInt64()).Value, - doc[MongoCommitFields.Headers].AsDictionary(), - events); - } - - public static BsonDocument ToMongoSnapshot(this ISnapshot snapshot, IDocumentSerializer serializer) - { - return new BsonDocument - { - { MongoShapshotFields.Id, new BsonDocument - { - {MongoShapshotFields.BucketId, snapshot.BucketId}, - {MongoShapshotFields.StreamId, snapshot.StreamId}, - {MongoShapshotFields.StreamRevision, snapshot.StreamRevision} - } - }, - { MongoShapshotFields.Payload, BsonDocumentWrapper.Create(serializer.Serialize(snapshot.Payload)) } - }; - } - - public static Snapshot ToSnapshot(this BsonDocument doc, IDocumentSerializer serializer) - { - if (doc == null) - { - return null; - } - - BsonDocument id = doc[MongoShapshotFields.Id].AsBsonDocument; - string bucketId = id[MongoShapshotFields.BucketId].AsString; - string streamId = id[MongoShapshotFields.StreamId].AsString; - int streamRevision = id[MongoShapshotFields.StreamRevision].AsInt32; - BsonValue bsonPayload = doc[MongoShapshotFields.Payload]; - - object payload; - switch (bsonPayload.BsonType) - { - case BsonType.Binary: - payload = serializer.Deserialize(bsonPayload.AsByteArray); - break; - case BsonType.Document: - payload = BsonSerializer.Deserialize(bsonPayload.AsBsonDocument); - break; - default: - payload = BsonTypeMapper.MapToDotNetValue(bsonPayload); - break; - } - - return new Snapshot(bucketId, streamId, streamRevision, payload); - } - - public static StreamHead ToStreamHead(this BsonDocument doc) - { - BsonDocument id = doc[MongoStreamHeadFields.Id].AsBsonDocument; - string bucketId = id[MongoStreamHeadFields.BucketId].AsString; - string streamId = id[MongoStreamHeadFields.StreamId].AsString; - return new StreamHead(bucketId, streamId, doc[MongoStreamHeadFields.HeadRevision].AsInt32, doc[MongoStreamHeadFields.SnapshotRevision].AsInt32); - } - - public static IMongoQuery ToMongoCommitIdQuery(this CommitAttempt commit) - { - return Query.And( - Query.EQ(MongoCommitFields.BucketId, commit.BucketId), - Query.EQ(MongoCommitFields.StreamId, commit.StreamId), - Query.EQ(MongoCommitFields.CommitSequence, commit.CommitSequence) - ); - } - - public static IMongoQuery ToMongoCommitIdQuery(this ICommit commit) - { - return Query.And( - Query.EQ(MongoCommitFields.BucketId, commit.BucketId), - Query.EQ(MongoCommitFields.StreamId, commit.StreamId), - Query.EQ(MongoCommitFields.CommitSequence, commit.CommitSequence) - ); - } - - public static IMongoQuery GetSnapshotQuery(string bucketId, string streamId, int maxRevision) - { - return - Query.And( - Query.GT(MongoShapshotFields.Id, - Query.And( - Query.EQ(MongoShapshotFields.BucketId, bucketId), - Query.EQ(MongoShapshotFields.StreamId, streamId), - Query.EQ(MongoShapshotFields.StreamRevision, BsonNull.Value) - ).ToBsonDocument()), - Query.LTE(MongoShapshotFields.Id, - Query.And( - Query.EQ(MongoShapshotFields.BucketId, bucketId), - Query.EQ(MongoShapshotFields.StreamId, streamId), - Query.EQ(MongoShapshotFields.StreamRevision, maxRevision) - ).ToBsonDocument()) - ); - } - } -} \ No newline at end of file diff --git a/src/NEventStore.Persistence.MongoDB/Messages.Designer.cs b/src/NEventStore.Persistence.MongoDB/Messages.Designer.cs deleted file mode 100644 index 135ab90c3..000000000 --- a/src/NEventStore.Persistence.MongoDB/Messages.Designer.cs +++ /dev/null @@ -1,252 +0,0 @@ -//------------------------------------------------------------------------------ -// -// This code was generated by a tool. -// Runtime Version:4.0.30319.18051 -// -// Changes to this file may cause incorrect behavior and will be lost if -// the code is regenerated. -// -//------------------------------------------------------------------------------ - -namespace NEventStore.Persistence.MongoDB { - using System; - - - /// - /// A strongly-typed resource class, for looking up localized strings, etc. - /// - // This class was auto-generated by the StronglyTypedResourceBuilder - // class via a tool like ResGen or Visual Studio. - // To add or remove a member, edit your .ResX file then rerun ResGen - // with the /str option, or rebuild your VS project. - [global::System.CodeDom.Compiler.GeneratedCodeAttribute("System.Resources.Tools.StronglyTypedResourceBuilder", "4.0.0.0")] - [global::System.Diagnostics.DebuggerNonUserCodeAttribute()] - [global::System.Runtime.CompilerServices.CompilerGeneratedAttribute()] - internal class Messages { - - private static global::System.Resources.ResourceManager resourceMan; - - private static global::System.Globalization.CultureInfo resourceCulture; - - [global::System.Diagnostics.CodeAnalysis.SuppressMessageAttribute("Microsoft.Performance", "CA1811:AvoidUncalledPrivateCode")] - internal Messages() { - } - - /// - /// Returns the cached ResourceManager instance used by this class. - /// - [global::System.ComponentModel.EditorBrowsableAttribute(global::System.ComponentModel.EditorBrowsableState.Advanced)] - internal static global::System.Resources.ResourceManager ResourceManager { - get { - if (object.ReferenceEquals(resourceMan, null)) { - global::System.Resources.ResourceManager temp = new global::System.Resources.ResourceManager("NEventStore.Persistence.MongoDB.Messages", typeof(Messages).Assembly); - resourceMan = temp; - } - return resourceMan; - } - } - - /// - /// Overrides the current thread's CurrentUICulture property for all - /// resource lookups using this strongly typed resource class. - /// - [global::System.ComponentModel.EditorBrowsableAttribute(global::System.ComponentModel.EditorBrowsableState.Advanced)] - internal static global::System.Globalization.CultureInfo Culture { - get { - return resourceCulture; - } - set { - resourceCulture = value; - } - } - - /// - /// Looks up a localized string similar to Adding snapshot to stream '{0}' in bucket '{1}' at position {2}.. - /// - internal static string AddingSnapshot { - get { - return ResourceManager.GetString("AddingSnapshot", resourceCulture); - } - } - - /// - /// Looks up a localized string similar to Attempting to commit {0} events on stream '{1}' at sequence {2}.. - /// - internal static string AttemptingToCommit { - get { - return ResourceManager.GetString("AttemptingToCommit", resourceCulture); - } - } - - /// - /// Looks up a localized string similar to Commit '{0}' persisted.. - /// - internal static string CommitPersisted { - get { - return ResourceManager.GetString("CommitPersisted", resourceCulture); - } - } - - /// - /// Looks up a localized string similar to Concurrent write detected.. - /// - internal static string ConcurrentWriteDetected { - get { - return ResourceManager.GetString("ConcurrentWriteDetected", resourceCulture); - } - } - - /// - /// Looks up a localized string similar to Deleting stream '{0}' from bucket '{1}'.. - /// - internal static string DeletingStream { - get { - return ResourceManager.GetString("DeletingStream", resourceCulture); - } - } - - /// - /// Looks up a localized string similar to Concurrency issue; determining whether attempt was duplicate.. - /// - internal static string DetectingConcurrency { - get { - return ResourceManager.GetString("DetectingConcurrency", resourceCulture); - } - } - - /// - /// Looks up a localized string similar to Getting all commits for stream '{0}' in bucket '{1}' between revisions '{2}' and '{3}'.. - /// - internal static string GettingAllCommitsBetween { - get { - return ResourceManager.GetString("GettingAllCommitsBetween", resourceCulture); - } - } - - /// - /// Looks up a localized string similar to Getting all commits from '{0}' forward from bucket '{1}'.. - /// - internal static string GettingAllCommitsFrom { - get { - return ResourceManager.GetString("GettingAllCommitsFrom", resourceCulture); - } - } - - /// - /// Looks up a localized string similar to Getting all commits since checkpoint '{0}'.. - /// - internal static string GettingAllCommitsFromCheckpoint { - get { - return ResourceManager.GetString("GettingAllCommitsFromCheckpoint", resourceCulture); - } - } - - /// - /// Looks up a localized string similar to Getting all commits from '{0}' to '{1}'.. - /// - internal static string GettingAllCommitsFromTo { - get { - return ResourceManager.GetString("GettingAllCommitsFromTo", resourceCulture); - } - } - - /// - /// Looks up a localized string similar to Getting snapshot for stream '{0}' on or before revision {1}.. - /// - internal static string GettingRevision { - get { - return ResourceManager.GetString("GettingRevision", resourceCulture); - } - } - - /// - /// Looks up a localized string similar to Getting a list of streams to snapshot.. - /// - internal static string GettingStreamsToSnapshot { - get { - return ResourceManager.GetString("GettingStreamsToSnapshot", resourceCulture); - } - } - - /// - /// Looks up a localized string similar to Getting the list of all undispatched commits.. - /// - internal static string GettingUndispatchedCommits { - get { - return ResourceManager.GetString("GettingUndispatchedCommits", resourceCulture); - } - } - - /// - /// Looks up a localized string similar to Initializing storage engine.. - /// - internal static string InitializingStorage { - get { - return ResourceManager.GetString("InitializingStorage", resourceCulture); - } - } - - /// - /// Looks up a localized string similar to Marking commit '{0}' as dispatched.. - /// - internal static string MarkingCommitAsDispatched { - get { - return ResourceManager.GetString("MarkingCommitAsDispatched", resourceCulture); - } - } - - /// - /// Looks up a localized string similar to Purging all stored data for bucket '{0}'.. - /// - internal static string PurgingBucket { - get { - return ResourceManager.GetString("PurgingBucket", resourceCulture); - } - } - - /// - /// Looks up a localized string similar to Purging all stored data.. - /// - internal static string PurgingStorage { - get { - return ResourceManager.GetString("PurgingStorage", resourceCulture); - } - } - - /// - /// Looks up a localized string similar to Shutting down persistence.. - /// - internal static string ShuttingDownPersistence { - get { - return ResourceManager.GetString("ShuttingDownPersistence", resourceCulture); - } - } - - /// - /// Looks up a localized string similar to Storage threw exception of type '{0}'.. - /// - internal static string StorageThrewException { - get { - return ResourceManager.GetString("StorageThrewException", resourceCulture); - } - } - - /// - /// Looks up a localized string similar to Storage is unavailabe.. - /// - internal static string StorageUnavailable { - get { - return ResourceManager.GetString("StorageUnavailable", resourceCulture); - } - } - - /// - /// Looks up a localized string similar to Unsuppored checkpoint type. Expected {0} but got {1}.. - /// - internal static string UnsupportedCheckpointType { - get { - return ResourceManager.GetString("UnsupportedCheckpointType", resourceCulture); - } - } - } -} diff --git a/src/NEventStore.Persistence.MongoDB/Messages.resx b/src/NEventStore.Persistence.MongoDB/Messages.resx deleted file mode 100644 index 56cc1d32f..000000000 --- a/src/NEventStore.Persistence.MongoDB/Messages.resx +++ /dev/null @@ -1,183 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - text/microsoft-resx - - - 2.0 - - - System.Resources.ResXResourceReader, System.Windows.Forms, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089 - - - System.Resources.ResXResourceWriter, System.Windows.Forms, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089 - - - Adding snapshot to stream '{0}' in bucket '{1}' at position {2}. - - - Attempting to commit {0} events on stream '{1}' at sequence {2}. - - - Commit '{0}' persisted. - - - Concurrent write detected. - - - Concurrency issue; determining whether attempt was duplicate. - - - Getting all commits for stream '{0}' in bucket '{1}' between revisions '{2}' and '{3}'. - - - Getting all commits from '{0}' forward from bucket '{1}'. - - - Getting snapshot for stream '{0}' on or before revision {1}. - - - Getting a list of streams to snapshot. - - - Getting the list of all undispatched commits. - - - Initializing storage engine. - - - Marking commit '{0}' as dispatched. - - - Purging all stored data. - - - Shutting down persistence. - - - Storage is unavailabe. - - - Storage threw exception of type '{0}'. - - - Getting all commits from '{0}' to '{1}'. - - - Getting all commits since checkpoint '{0}'. - - - Deleting stream '{0}' from bucket '{1}'. - - - Purging all stored data for bucket '{0}'. - - - Unsuppored checkpoint type. Expected {0} but got {1}. - - \ No newline at end of file diff --git a/src/NEventStore.Persistence.MongoDB/MongoFields.cs b/src/NEventStore.Persistence.MongoDB/MongoFields.cs deleted file mode 100644 index 18be6fe29..000000000 --- a/src/NEventStore.Persistence.MongoDB/MongoFields.cs +++ /dev/null @@ -1,61 +0,0 @@ -namespace NEventStore.Persistence.MongoDB -{ - public static class MongoSystemBuckets - { - public const string RecycleBin = ":rb"; - } - - public static class MongoStreamHeadFields - { - public const string Id = "_id"; - public const string BucketId = "BucketId"; - public const string StreamId = "StreamId"; - public const string HeadRevision = "HeadRevision"; - public const string SnapshotRevision = "SnapshotRevision"; - public const string Unsnapshotted = "Unsnapshotted"; - public const string FullQualifiedBucketId = Id + "." + BucketId; - } - - - public static class MongoShapshotFields - { - public const string Id = "_id"; - public const string BucketId = "BucketId"; - public const string StreamId = "StreamId"; - public const string Payload = "Payload"; - public const string StreamRevision = "StreamRevision"; - public const string FullQualifiedBucketId = Id + "." + BucketId; - } - - public static class MongoCommitFields - { - public const string CheckpointNumber = "_id"; - - public const string BucketId = "BucketId"; - public const string StreamId = "StreamId"; - public const string StreamRevision = "StreamRevision"; - public const string StreamRevisionFrom = "StreamRevisionFrom"; - public const string StreamRevisionTo = "StreamRevisionTo"; - - public const string CommitId = "CommitId"; - public const string CommitStamp = "CommitStamp"; - public const string CommitSequence = "CommitSequence"; - public const string Events = "Events"; - public const string Headers = "Headers"; - public const string Dispatched = "Dispatched"; - public const string Payload = "Payload"; - } - - public static class MongoCommitIndexes - { - public const string CheckpointNumber = "$_id_"; - public const string CommitStamp = "CommitStamp_Index"; - public const string GetFrom = "GetFrom_Index"; - public const string Dispatched = "Dispatched_Index"; - } - - public static class MongoStreamIndexes - { - public const string Unsnapshotted = "Unsnapshotted_Index"; - } -} \ No newline at end of file diff --git a/src/NEventStore.Persistence.MongoDB/MongoPersistenceEngine.cs b/src/NEventStore.Persistence.MongoDB/MongoPersistenceEngine.cs deleted file mode 100644 index e25ad08ed..000000000 --- a/src/NEventStore.Persistence.MongoDB/MongoPersistenceEngine.cs +++ /dev/null @@ -1,483 +0,0 @@ -namespace NEventStore.Persistence.MongoDB -{ - using System; - using System.Collections.Generic; - using System.Linq; - using System.Threading; - using global::MongoDB.Bson; - using global::MongoDB.Driver; - using global::MongoDB.Driver.Builders; - using NEventStore.Logging; - using NEventStore.Serialization; - - public class MongoPersistenceEngine : IPersistStreams - { - private const string ConcurrencyException = "E1100"; - private static readonly ILog Logger = LogFactory.BuildLogger(typeof (MongoPersistenceEngine)); - private readonly MongoCollectionSettings _commitSettings; - private readonly IDocumentSerializer _serializer; - private readonly MongoCollectionSettings _snapshotSettings; - private readonly MongoDatabase _store; - private readonly MongoCollectionSettings _streamSettings; - private bool _disposed; - private int _initialized; - private readonly Func _getNextCheckpointNumber; - private readonly Func _getLastCheckPointNumber; - private readonly MongoPersistenceOptions _options; - private readonly WriteConcern _insertCommitWriteConcern; - - public MongoPersistenceEngine(MongoDatabase store, IDocumentSerializer serializer, MongoPersistenceOptions options) - { - if (store == null) - { - throw new ArgumentNullException("store"); - } - - if (serializer == null) - { - throw new ArgumentNullException("serializer"); - } - - if (options == null) - { - throw new ArgumentNullException("options"); - } - - _store = store; - _serializer = serializer; - _options = options; - - // set config options - _commitSettings = _options.GetCommitSettings(); - _snapshotSettings = _options.GetSnapshotSettings(); - _streamSettings = _options.GetStreamSettings(); - _insertCommitWriteConcern = _options.GetInsertCommitWriteConcern(); - - _getLastCheckPointNumber = () => TryMongo(() => - { - var max = PersistedCommits - .FindAll() - .SetFields(Fields.Include(MongoCommitFields.CheckpointNumber)) - .SetSortOrder(SortBy.Descending(MongoCommitFields.CheckpointNumber)) - .SetLimit(1) - .FirstOrDefault(); - - return max != null ? max[MongoCommitFields.CheckpointNumber].AsInt64 : 0L; - }); - - _getNextCheckpointNumber = () => _getLastCheckPointNumber() + 1L; - } - - protected virtual MongoCollection PersistedCommits - { - get { return _store.GetCollection("Commits", _commitSettings); } - } - - protected virtual MongoCollection PersistedStreamHeads - { - get { return _store.GetCollection("Streams", _streamSettings); } - } - - protected virtual MongoCollection PersistedSnapshots - { - get { return _store.GetCollection("Snapshots", _snapshotSettings); } - } - - public void Dispose() - { - Dispose(true); - GC.SuppressFinalize(this); - } - - public virtual void Initialize() - { - if (Interlocked.Increment(ref _initialized) > 1) - { - return; - } - - Logger.Debug(Messages.InitializingStorage); - - TryMongo(() => - { - PersistedCommits.EnsureIndex( - IndexKeys - .Ascending(MongoCommitFields.Dispatched) - .Ascending(MongoCommitFields.CommitStamp), - IndexOptions.SetName(MongoCommitIndexes.Dispatched).SetUnique(false) - ); - - PersistedCommits.EnsureIndex( - IndexKeys.Ascending( - MongoCommitFields.BucketId, - MongoCommitFields.StreamId, - MongoCommitFields.StreamRevisionFrom, - MongoCommitFields.StreamRevisionTo - //,MongoCommitFields.FullqualifiedStreamRevision - ), - IndexOptions.SetName(MongoCommitIndexes.GetFrom).SetUnique(true) - ); - - PersistedCommits.EnsureIndex( - IndexKeys.Ascending(MongoCommitFields.CommitStamp), - IndexOptions.SetName(MongoCommitIndexes.CommitStamp).SetUnique(false) - ); - - PersistedStreamHeads.EnsureIndex( - IndexKeys.Ascending(MongoStreamHeadFields.Unsnapshotted), - IndexOptions.SetName(MongoStreamIndexes.Unsnapshotted).SetUnique(false) - ); - - EmptyRecycleBin(); - }); - } - - public virtual IEnumerable GetFrom(string bucketId, string streamId, int minRevision, int maxRevision) - { - Logger.Debug(Messages.GettingAllCommitsBetween, streamId, bucketId, minRevision, maxRevision); - - return TryMongo(() => - { - IMongoQuery query = Query.And( - Query.EQ(MongoCommitFields.BucketId, bucketId), - Query.EQ(MongoCommitFields.StreamId, streamId), - Query.GTE(MongoCommitFields.StreamRevisionTo, minRevision), - Query.LTE(MongoCommitFields.StreamRevisionFrom, maxRevision)); - //Query.GTE(MongoCommitFields.FullqualifiedStreamRevision, minRevision), - //Query.LTE(MongoCommitFields.FullqualifiedStreamRevision, maxRevision)); - - return PersistedCommits - .Find(query) - .SetSortOrder(MongoCommitFields.CheckpointNumber) - //.SetSortOrder(MongoCommitFields.FullqualifiedStreamRevision) - .Select(mc => mc.ToCommit(_serializer)); - }); - } - - public virtual IEnumerable GetFrom(string bucketId, DateTime start) - { - Logger.Debug(Messages.GettingAllCommitsFrom, start, bucketId); - - return TryMongo(() => PersistedCommits - .Find( - Query.And( - Query.EQ(MongoCommitFields.BucketId, bucketId), - Query.GTE(MongoCommitFields.CommitStamp, start) - ) - ) - .SetSortOrder(MongoCommitFields.CheckpointNumber) - .Select(x => x.ToCommit(_serializer))); - } - - public IEnumerable GetFrom(string checkpointToken) - { - var intCheckpoint = LongCheckpoint.Parse(checkpointToken); - Logger.Debug(Messages.GettingAllCommitsFromCheckpoint, intCheckpoint.Value); - - return TryMongo(() => PersistedCommits - .Find( - Query.And( - Query.NE(MongoCommitFields.BucketId, MongoSystemBuckets.RecycleBin), - Query.GT(MongoCommitFields.CheckpointNumber, intCheckpoint.LongValue) - ) - ) - .SetSortOrder(MongoCommitFields.CheckpointNumber) - .Select(x => x.ToCommit(_serializer)) - ); - } - - public ICheckpoint GetCheckpoint(string checkpointToken = null) - { - return LongCheckpoint.Parse(checkpointToken); - } - - public virtual IEnumerable GetFromTo(string bucketId, DateTime start, DateTime end) - { - Logger.Debug(Messages.GettingAllCommitsFromTo, start, end, bucketId); - - return TryMongo(() => PersistedCommits - .Find(Query.And( - Query.EQ(MongoCommitFields.BucketId, bucketId), - Query.GTE(MongoCommitFields.CommitStamp, start), - Query.LT(MongoCommitFields.CommitStamp, end)) - ) - .SetSortOrder(MongoCommitFields.CheckpointNumber) - .Select(x => x.ToCommit(_serializer))); - } - - public virtual ICommit Commit(CommitAttempt attempt) - { - Logger.Debug(Messages.AttemptingToCommit, attempt.Events.Count, attempt.StreamId, attempt.CommitSequence); - - return TryMongo(() => - { - BsonDocument commitDoc = attempt.ToMongoCommit(_getNextCheckpointNumber, _serializer); - bool retry = true; - while (retry) - { - try - { - // for concurrency / duplicate commit detection safe mode is required - PersistedCommits.Insert(commitDoc, _insertCommitWriteConcern); - retry = false; - UpdateStreamHeadAsync(attempt.BucketId, attempt.StreamId, attempt.StreamRevision, attempt.Events.Count); - Logger.Debug(Messages.CommitPersisted, attempt.CommitId); - } - catch (MongoException e) - { - if (!e.Message.Contains(ConcurrencyException)) - { - throw; - } - - // checkpoint index? - if (e.Message.Contains(MongoCommitIndexes.CheckpointNumber)) - { - commitDoc[MongoCommitFields.CheckpointNumber] = _getNextCheckpointNumber(); - } - else - { - ICommit savedCommit = PersistedCommits.FindOne(attempt.ToMongoCommitIdQuery()).ToCommit(_serializer); - - if (savedCommit.CommitId == attempt.CommitId) - { - throw new DuplicateCommitException(); - } - Logger.Debug(Messages.ConcurrentWriteDetected); - throw new ConcurrencyException(); - } - } - } - - return commitDoc.ToCommit(_serializer); - }); - } - - public virtual IEnumerable GetUndispatchedCommits() - { - Logger.Debug(Messages.GettingUndispatchedCommits); - - return TryMongo(() => PersistedCommits - .Find(Query.EQ("Dispatched", false)) - .SetSortOrder(MongoCommitFields.CheckpointNumber) - .Select(mc => mc.ToCommit(_serializer))); - } - - public virtual void MarkCommitAsDispatched(ICommit commit) - { - Logger.Debug(Messages.MarkingCommitAsDispatched, commit.CommitId); - - TryMongo(() => - { - IMongoQuery query = commit.ToMongoCommitIdQuery(); - UpdateBuilder update = Update.Set(MongoCommitFields.Dispatched, true); - PersistedCommits.Update(query, update); - }); - } - - public virtual IEnumerable GetStreamsToSnapshot(string bucketId, int maxThreshold) - { - Logger.Debug(Messages.GettingStreamsToSnapshot); - - return TryMongo(() => - { - IMongoQuery query = Query.GTE(MongoStreamHeadFields.Unsnapshotted, maxThreshold); - return PersistedStreamHeads - .Find(query) - .SetSortOrder(SortBy.Descending(MongoStreamHeadFields.Unsnapshotted)) - .Select(x => x.ToStreamHead()); - }); - } - - public virtual ISnapshot GetSnapshot(string bucketId, string streamId, int maxRevision) - { - Logger.Debug(Messages.GettingRevision, streamId, maxRevision); - - return TryMongo(() =>PersistedSnapshots - .Find(ExtensionMethods.GetSnapshotQuery(bucketId, streamId, maxRevision)) - .SetSortOrder(SortBy.Descending(MongoShapshotFields.Id)) - .SetLimit(1) - .Select(mc => mc.ToSnapshot(_serializer)) - .FirstOrDefault()); - } - - public virtual bool AddSnapshot(ISnapshot snapshot) - { - if (snapshot == null) - { - return false; - } - Logger.Debug(Messages.AddingSnapshot, snapshot.StreamId, snapshot.BucketId, snapshot.StreamRevision); - try - { - BsonDocument mongoSnapshot = snapshot.ToMongoSnapshot(_serializer); - IMongoQuery query = Query.EQ(MongoShapshotFields.Id, mongoSnapshot[MongoShapshotFields.Id]); - UpdateBuilder update = Update.Set(MongoShapshotFields.Payload, mongoSnapshot[MongoShapshotFields.Payload]); - - // Doing an upsert instead of an insert allows us to overwrite an existing snapshot and not get stuck with a - // stream that needs to be snapshotted because the insert fails and the SnapshotRevision isn't being updated. - PersistedSnapshots.Update(query, update, UpdateFlags.Upsert); - - // More commits could have been made between us deciding that a snapshot is required and writing it so just - // resetting the Unsnapshotted count may be a little off. Adding snapshots should be a separate process so - // this is a good chance to make sure the numbers are still in-sync - it only adds a 'read' after all ... - BsonDocument streamHeadId = GetStreamHeadId(snapshot.BucketId, snapshot.StreamId); - StreamHead streamHead = PersistedStreamHeads.FindOneById(streamHeadId).ToStreamHead(); - int unsnapshotted = streamHead.HeadRevision - snapshot.StreamRevision; - PersistedStreamHeads.Update( - Query.EQ(MongoStreamHeadFields.Id, streamHeadId), - Update.Set(MongoStreamHeadFields.SnapshotRevision, snapshot.StreamRevision).Set(MongoStreamHeadFields.Unsnapshotted, unsnapshotted)); - - return true; - } - catch (Exception) - { - return false; - } - } - - public virtual void Purge() - { - Logger.Warn(Messages.PurgingStorage); - PersistedCommits.RemoveAll(); - PersistedStreamHeads.RemoveAll(); - PersistedSnapshots.RemoveAll(); - } - - public void Purge(string bucketId) - { - Logger.Warn(Messages.PurgingBucket, bucketId); - TryMongo(() => - { - PersistedStreamHeads.Remove(Query.EQ(MongoStreamHeadFields.FullQualifiedBucketId, bucketId)); - PersistedSnapshots.Remove(Query.EQ(MongoShapshotFields.FullQualifiedBucketId, bucketId)); - PersistedCommits.Remove(Query.EQ(MongoStreamHeadFields.FullQualifiedBucketId, bucketId)); - }); - - } - - public void Drop() - { - Purge(); - } - - public void DeleteStream(string bucketId, string streamId) - { - Logger.Warn(Messages.DeletingStream, streamId, bucketId); - TryMongo(() => - { - PersistedStreamHeads.Remove( - Query.EQ(MongoStreamHeadFields.Id, new BsonDocument{ - {MongoStreamHeadFields.BucketId, bucketId}, - {MongoStreamHeadFields.StreamId, streamId} - }) - ); - - PersistedSnapshots.Remove( - Query.EQ(MongoShapshotFields.Id, new BsonDocument{ - {MongoShapshotFields.BucketId, bucketId}, - {MongoShapshotFields.StreamId, streamId} - }) - ); - - PersistedCommits.Update( - Query.And( - Query.EQ(MongoCommitFields.BucketId, bucketId), - Query.EQ(MongoCommitFields.StreamId, streamId) - ), - Update.Set(MongoCommitFields.BucketId, MongoSystemBuckets.RecycleBin), - UpdateFlags.Multi - ); - }); - } - - public bool IsDisposed - { - get { return _disposed; } - } - - protected virtual void Dispose(bool disposing) - { - if (!disposing || _disposed) - { - return; - } - - Logger.Debug(Messages.ShuttingDownPersistence); - _disposed = true; - } - - private void UpdateStreamHeadAsync(string bucketId, string streamId, int streamRevision, int eventsCount) - { - ThreadPool.QueueUserWorkItem(x => - TryMongo(() => - { - BsonDocument streamHeadId = GetStreamHeadId(bucketId, streamId); - PersistedStreamHeads.Update( - Query.EQ(MongoStreamHeadFields.Id, streamHeadId), - Update - .Set(MongoStreamHeadFields.HeadRevision, streamRevision) - .Inc(MongoStreamHeadFields.SnapshotRevision, 0) - .Inc(MongoStreamHeadFields.Unsnapshotted, eventsCount), - UpdateFlags.Upsert); - }), null); - } - - protected virtual T TryMongo(Func callback) - { - T results = default(T); - TryMongo(() => { results = callback(); }); - return results; - } - - protected virtual void TryMongo(Action callback) - { - if (_disposed) - { - throw new ObjectDisposedException("Attempt to use storage after it has been disposed."); - } - try - { - callback(); - } - catch (MongoConnectionException e) - { - Logger.Warn(Messages.StorageUnavailable); - throw new StorageUnavailableException(e.Message, e); - } - catch (MongoException e) - { - Logger.Error(Messages.StorageThrewException, e.GetType()); - throw new StorageException(e.Message, e); - } - } - - private static BsonDocument GetStreamHeadId(string bucketId, string streamId) - { - var id = new BsonDocument(); - id[MongoStreamHeadFields.BucketId] = bucketId; - id[MongoStreamHeadFields.StreamId] = streamId; - return id; - } - - public void EmptyRecycleBin() - { - var lastCheckpointNumber = _getLastCheckPointNumber(); - TryMongo(() => - { - PersistedCommits.Remove(Query.And( - Query.EQ(MongoCommitFields.BucketId, MongoSystemBuckets.RecycleBin), - Query.LT(MongoCommitFields.CheckpointNumber, lastCheckpointNumber) - )); - }); - } - - public IEnumerable GetDeletedCommits() - { - return TryMongo(() => PersistedCommits - .Find(Query.EQ(MongoCommitFields.BucketId, MongoSystemBuckets.RecycleBin)) - .SetSortOrder(MongoCommitFields.CheckpointNumber) - .Select(mc => mc.ToCommit(_serializer))); - } - } -} \ No newline at end of file diff --git a/src/NEventStore.Persistence.MongoDB/MongoPersistenceFactory.cs b/src/NEventStore.Persistence.MongoDB/MongoPersistenceFactory.cs deleted file mode 100644 index e080f34cb..000000000 --- a/src/NEventStore.Persistence.MongoDB/MongoPersistenceFactory.cs +++ /dev/null @@ -1,27 +0,0 @@ -namespace NEventStore.Persistence.MongoDB -{ - using System; - using global::MongoDB.Driver; - using NEventStore.Serialization; - - public class MongoPersistenceFactory : IPersistenceFactory - { - private readonly Func _connectionStringProvider; - private readonly IDocumentSerializer _serializer; - private readonly MongoPersistenceOptions _options; - - public MongoPersistenceFactory(Func connectionStringProvider, IDocumentSerializer serializer, MongoPersistenceOptions options = null) - { - _connectionStringProvider = connectionStringProvider; - _serializer = serializer; - _options = options ?? new MongoPersistenceOptions(); - } - - public virtual IPersistStreams Build() - { - string connectionString = _connectionStringProvider(); - MongoDatabase database = _options.ConnectToDatabase(connectionString); - return new MongoPersistenceEngine(database, _serializer, _options); - } - } -} diff --git a/src/NEventStore.Persistence.MongoDB/MongoPersistenceOptions.cs b/src/NEventStore.Persistence.MongoDB/MongoPersistenceOptions.cs deleted file mode 100644 index 578cb8e31..000000000 --- a/src/NEventStore.Persistence.MongoDB/MongoPersistenceOptions.cs +++ /dev/null @@ -1,65 +0,0 @@ -namespace NEventStore.Persistence.MongoDB -{ - using System; - using global::MongoDB.Driver; - using NEventStore.Serialization; - - /// - /// Options for the MongoPersistence engine. - /// http://docs.mongodb.org/manual/core/write-concern/#write-concern - /// - public class MongoPersistenceOptions - { - /// - /// Get the WriteConcern for the commit insert operation. - /// Concurrency / duplicate commit detection require a safe mode so level should be at least Acknowledged - /// - /// the write concern for the commit insert operation - public virtual WriteConcern GetInsertCommitWriteConcern() - { - // for concurrency / duplicate commit detection safe mode is required - // minimum level is Acknowledged - return WriteConcern.Acknowledged; - } - - public virtual MongoCollectionSettings GetCommitSettings() - { - return new MongoCollectionSettings - { - AssignIdOnInsert = false, - WriteConcern = WriteConcern.Acknowledged - }; - } - - public virtual MongoCollectionSettings GetSnapshotSettings() - { - return new MongoCollectionSettings - { - AssignIdOnInsert = false, - WriteConcern = WriteConcern.Unacknowledged - }; - } - - public virtual MongoCollectionSettings GetStreamSettings() - { - return new MongoCollectionSettings - { - AssignIdOnInsert = false, - WriteConcern = WriteConcern.Unacknowledged - }; - } - - - /// - /// Connects to NEvenstore Mongo database - /// - /// Connection string - /// nevenstore mongodatabase store - public virtual MongoDatabase ConnectToDatabase(string connectionString) - { - var builder = new MongoUrlBuilder(connectionString); - MongoDatabase database = (new MongoClient(connectionString)).GetServer().GetDatabase(builder.DatabaseName); - return database; - } - } -} diff --git a/src/NEventStore.Persistence.MongoDB/MongoPersistenceWireup.cs b/src/NEventStore.Persistence.MongoDB/MongoPersistenceWireup.cs deleted file mode 100644 index 30eb164b4..000000000 --- a/src/NEventStore.Persistence.MongoDB/MongoPersistenceWireup.cs +++ /dev/null @@ -1,29 +0,0 @@ -// ReSharper disable CheckNamespace - -namespace NEventStore // ReSharper restore CheckNamespace -{ - using System; - using System.Transactions; - using NEventStore.Logging; - using NEventStore.Persistence.MongoDB; - using NEventStore.Serialization; - - public class MongoPersistenceWireup : PersistenceWireup - { - private static readonly ILog Logger = LogFactory.BuildLogger(typeof (MongoPersistenceWireup)); - - public MongoPersistenceWireup(Wireup inner, Func connectionStringProvider, IDocumentSerializer serializer, MongoPersistenceOptions persistenceOptions) - : base(inner) - { - Logger.Debug("Configuring Mongo persistence engine."); - - var options = Container.Resolve(); - if (options != TransactionScopeOption.Suppress) - { - Logger.Warn("MongoDB does not participate in transactions using TransactionScope."); - } - - Container.Register(c => new MongoPersistenceFactory(connectionStringProvider, serializer, persistenceOptions).Build()); - } - } -} diff --git a/src/NEventStore.Persistence.MongoDB/MongoPersistenceWireupExtensions.cs b/src/NEventStore.Persistence.MongoDB/MongoPersistenceWireupExtensions.cs deleted file mode 100644 index db82679ef..000000000 --- a/src/NEventStore.Persistence.MongoDB/MongoPersistenceWireupExtensions.cs +++ /dev/null @@ -1,21 +0,0 @@ -// ReSharper disable once CheckNamespace -namespace NEventStore -{ - using System; - using System.Configuration; - using NEventStore.Persistence.MongoDB; - using NEventStore.Serialization; - - public static class MongoPersistenceWireupExtensions - { - public static PersistenceWireup UsingMongoPersistence(this Wireup wireup, string connectionName, IDocumentSerializer serializer, MongoPersistenceOptions options = null) - { - return new MongoPersistenceWireup(wireup, () => ConfigurationManager.ConnectionStrings[connectionName].ConnectionString, serializer, options); - } - - public static PersistenceWireup UsingMongoPersistence(this Wireup wireup, Func connectionStringProvider, IDocumentSerializer serializer, MongoPersistenceOptions options = null) - { - return new MongoPersistenceWireup(wireup, connectionStringProvider, serializer, options); - } - } -} \ No newline at end of file diff --git a/src/NEventStore.Persistence.MongoDB/NEventStore.Persistence.MongoDB.csproj b/src/NEventStore.Persistence.MongoDB/NEventStore.Persistence.MongoDB.csproj deleted file mode 100644 index 16b1a7282..000000000 --- a/src/NEventStore.Persistence.MongoDB/NEventStore.Persistence.MongoDB.csproj +++ /dev/null @@ -1,99 +0,0 @@ - - - - Debug - AnyCPU - 8.0.30703 - 2.0 - {32ADD8CE-0F3F-41D8-BFA1-6E5D685E64DD} - Library - Properties - NEventStore.Persistence.MongoDB - NEventStore.Persistence.MongoDB - v4.0 - 512 - false - ..\..\NEventStore.snk - ..\..\..\src\ - true - - - - true - full - false - bin\Debug\ - DEBUG;TRACE - prompt - 4 - false - - - pdbonly - true - bin\Release\ - TRACE - prompt - 4 - false - - - - {03946843-F343-419C-88EF-3E446D08DFA6} - NEventStore - - - False - ..\packages\mongocsharpdriver.1.8.1\lib\net35\MongoDB.Bson.dll - - - False - ..\packages\mongocsharpdriver.1.8.1\lib\net35\MongoDB.Driver.dll - - - - - - - - - Properties\GlobalAssemblyInfo.cs - - - Properties\GlobalSuppressions.cs - - - Properties\VersionAssemblyInfo.cs - - - - True - True - Messages.resx - - - - - - - - - - - - Properties\CustomDictionary.xml - - - - - ResXFileCodeGenerator - Messages.Designer.cs - Designer - - - - - - - - \ No newline at end of file diff --git a/src/NEventStore.Persistence.MongoDB/Properties/AssemblyInfo.cs b/src/NEventStore.Persistence.MongoDB/Properties/AssemblyInfo.cs deleted file mode 100644 index 405866ceb..000000000 --- a/src/NEventStore.Persistence.MongoDB/Properties/AssemblyInfo.cs +++ /dev/null @@ -1,6 +0,0 @@ -using System.Reflection; -using System.Runtime.InteropServices; - -[assembly: AssemblyTitle("NEventStore.Persistence.MongoPersistence")] -[assembly: AssemblyDescription("")] -[assembly: Guid("a531c143-8f3a-4123-b8ed-44323eac543c")] \ No newline at end of file diff --git a/src/NEventStore.Persistence.MongoDB/packages.config b/src/NEventStore.Persistence.MongoDB/packages.config deleted file mode 100644 index 430384868..000000000 --- a/src/NEventStore.Persistence.MongoDB/packages.config +++ /dev/null @@ -1,4 +0,0 @@ - - - - \ No newline at end of file diff --git a/src/NEventStore.sln b/src/NEventStore.sln index 5c39ab391..d67d6f1cb 100644 --- a/src/NEventStore.sln +++ b/src/NEventStore.sln @@ -16,8 +16,6 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = ".doc", ".doc", "{CD37080A-F ..\readme.markdown = ..\readme.markdown EndProjectSection EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NEventStore.Persistence.MongoDB", "NEventStore.Persistence.MongoDB\NEventStore.Persistence.MongoDB.csproj", "{32ADD8CE-0F3F-41D8-BFA1-6E5D685E64DD}" -EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "serialization", "serialization", "{52F7988F-452D-46C2-A144-D85E0CF371C6}" EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = ".nuget", ".nuget", "{08A28C69-B04C-4622-8BF7-C9C4C410E64C}" @@ -32,8 +30,6 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = ".nuget", ".nuget", "{08A28C EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NEventStore.Persistence.AcceptanceTests", "NEventStore.Persistence.AcceptanceTests\NEventStore.Persistence.AcceptanceTests.csproj", "{3FE594FE-16FF-4405-97D5-5A58FB12520B}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NEventStore.Persistence.MongoDB.Tests", "NEventStore.Persistence.MongoDB.Tests\NEventStore.Persistence.MongoDB.Tests.csproj", "{49C544AF-02FB-4BE9-BDDF-79EB087E00C3}" -EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NEventStore.Persistence.MsSql.Tests", "NEventStore.Persistence.MsSql.Tests\NEventStore.Persistence.MsSql.Tests.csproj", "{79D255F6-B001-425E-B3BF-CBF1E9A884B9}" EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NEventStore.Persistence.MySql.Tests", "NEventStore.Persistence.MySql.Tests\NEventStore.Persistence.MySql.Tests.csproj", "{6073E59E-589B-4DC1-8073-0B3E462E825A}" @@ -44,8 +40,6 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NEventStore.Persistence.Pos EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NEventStore.Persistence.Sqlite.Tests", "NEventStore.Persistence.Sqlite.Tests\NEventStore.Persistence.Sqlite.Tests.csproj", "{DE2428C7-5F47-4541-9727-B3AACB14DCD6}" EndProject -Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "persistence.mongo", "persistence.mongo", "{154F4778-2DE9-4A1E-94AF-411FD10BA8E5}" -EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "persistence.sql", "persistence.sql", "{302E416E-456F-49D4-A1A4-42323FCD38AD}" EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "core", "core", "{D35087BF-941A-49EA-96A8-B10457C4D169}" @@ -76,18 +70,10 @@ Global {03946843-F343-419C-88EF-3E446D08DFA6}.Debug|Any CPU.Build.0 = Debug|Any CPU {03946843-F343-419C-88EF-3E446D08DFA6}.Release|Any CPU.ActiveCfg = Release|Any CPU {03946843-F343-419C-88EF-3E446D08DFA6}.Release|Any CPU.Build.0 = Release|Any CPU - {32ADD8CE-0F3F-41D8-BFA1-6E5D685E64DD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {32ADD8CE-0F3F-41D8-BFA1-6E5D685E64DD}.Debug|Any CPU.Build.0 = Debug|Any CPU - {32ADD8CE-0F3F-41D8-BFA1-6E5D685E64DD}.Release|Any CPU.ActiveCfg = Release|Any CPU - {32ADD8CE-0F3F-41D8-BFA1-6E5D685E64DD}.Release|Any CPU.Build.0 = Release|Any CPU {3FE594FE-16FF-4405-97D5-5A58FB12520B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {3FE594FE-16FF-4405-97D5-5A58FB12520B}.Debug|Any CPU.Build.0 = Debug|Any CPU {3FE594FE-16FF-4405-97D5-5A58FB12520B}.Release|Any CPU.ActiveCfg = Release|Any CPU {3FE594FE-16FF-4405-97D5-5A58FB12520B}.Release|Any CPU.Build.0 = Release|Any CPU - {49C544AF-02FB-4BE9-BDDF-79EB087E00C3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {49C544AF-02FB-4BE9-BDDF-79EB087E00C3}.Debug|Any CPU.Build.0 = Debug|Any CPU - {49C544AF-02FB-4BE9-BDDF-79EB087E00C3}.Release|Any CPU.ActiveCfg = Release|Any CPU - {49C544AF-02FB-4BE9-BDDF-79EB087E00C3}.Release|Any CPU.Build.0 = Release|Any CPU {79D255F6-B001-425E-B3BF-CBF1E9A884B9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {79D255F6-B001-425E-B3BF-CBF1E9A884B9}.Debug|Any CPU.Build.0 = Debug|Any CPU {79D255F6-B001-425E-B3BF-CBF1E9A884B9}.Release|Any CPU.ActiveCfg = Release|Any CPU @@ -150,8 +136,6 @@ Global {785B7EF4-BD47-4F98-9802-DA1C4A55ECA4} = {D35087BF-941A-49EA-96A8-B10457C4D169} {0143B48E-25AF-4CE0-BD49-A52267D359D3} = {CD37080A-F48D-4E5D-B7C6-B9F86BB05A38} {49705881-56D3-447A-92D8-D11CA6085DBD} = {CD37080A-F48D-4E5D-B7C6-B9F86BB05A38} - {49C544AF-02FB-4BE9-BDDF-79EB087E00C3} = {154F4778-2DE9-4A1E-94AF-411FD10BA8E5} - {32ADD8CE-0F3F-41D8-BFA1-6E5D685E64DD} = {154F4778-2DE9-4A1E-94AF-411FD10BA8E5} {41BD8E8F-F530-48CB-A07B-92E65C8C742E} = {52F7988F-452D-46C2-A144-D85E0CF371C6} {C6BFE4FC-4017-438B-AE99-D4D6F6FC4FE5} = {52F7988F-452D-46C2-A144-D85E0CF371C6} {76554E1F-CAE0-41EB-9C92-A47EC7188714} = {52F7988F-452D-46C2-A144-D85E0CF371C6}