From ecdf904314d2d966833e73971825f56717491433 Mon Sep 17 00:00:00 2001 From: Alexey Stolybko Date: Tue, 9 Oct 2018 14:54:12 +0100 Subject: [PATCH 1/7] Proposed change to introduce connection factory method as a dependency instead of connection objectct to allow more flexibility with regards to how connection object is resolved. (e.g. to allow connection polling, reconnection and more monitoring) --- .../InProcEventStoreIntegrationContext.cs | 2 +- .../EventstoreRepository.cs | 41 +++++++++++++++---- .../RepositoryUnavailableException.cs | 20 +++++++++ 3 files changed, 53 insertions(+), 10 deletions(-) create mode 100644 src/BullOak.Repositories/Exceptions/RepositoryUnavailableException.cs diff --git a/src/BullOak.Repositories.EventStore.Test.Integration/Contexts/InProcEventStoreIntegrationContext.cs b/src/BullOak.Repositories.EventStore.Test.Integration/Contexts/InProcEventStoreIntegrationContext.cs index 4f7563d..224b50c 100644 --- a/src/BullOak.Repositories.EventStore.Test.Integration/Contexts/InProcEventStoreIntegrationContext.cs +++ b/src/BullOak.Repositories.EventStore.Test.Integration/Contexts/InProcEventStoreIntegrationContext.cs @@ -43,7 +43,7 @@ private static IEventStoreConnection GetConnection() public void SetupRepository(IHoldAllConfiguration configuration) { - repository = new EventStoreRepository(configuration, GetConnection()); + repository = new EventStoreRepository(configuration, GetConnection); } [BeforeTestRun] diff --git a/src/BullOak.Repositories.EventStore/EventstoreRepository.cs b/src/BullOak.Repositories.EventStore/EventstoreRepository.cs index 80e02b3..d1f1488 100644 --- a/src/BullOak.Repositories.EventStore/EventstoreRepository.cs +++ b/src/BullOak.Repositories.EventStore/EventstoreRepository.cs @@ -11,26 +11,43 @@ public class EventStoreRepository : IStartSessions { private static readonly Task falseResult = Task.FromResult(false); private readonly IHoldAllConfiguration configs; - private readonly IEventStoreConnection connection; + private readonly Func connectionFactory; - public EventStoreRepository(IHoldAllConfiguration configs, IEventStoreConnection connection) + public EventStoreRepository(IHoldAllConfiguration configs, Func connectionFactory) { - this.configs = configs ?? throw new ArgumentNullException(nameof(connection)); - this.connection = connection ?? throw new ArgumentNullException(nameof(connection)); + this.configs = configs ?? throw new ArgumentNullException(nameof(configs)); + this.connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory)); } public async Task> BeginSessionFor(TId id, bool throwIfNotExists = false) { - if (throwIfNotExists && !(await Contains(id))) + IEventStoreConnection connection; + try + { + connection = connectionFactory(); + } + catch (Exception ex) + { + throw new RepositoryUnavailableException("Couldn't connect to the EvenStore repository. See InnerException for details", ex); + } + + if (connection == null) + { + throw new RepositoryUnavailableException("Couldn't connect to the EvenStore repository. See InnerException for details", new ArgumentNullException(nameof(connection))); + } + + if (throwIfNotExists && !(await Contains(id, connection))) + { throw new StreamNotFoundException(id.ToString()); + } - var session = new EventStoreSession(configs, connection, id.ToString()); + var session = new EventStoreSession(configs, connectionFactory(), id.ToString()); await session.Initialize(); return session; } - public async Task Contains(TId selector) + private async Task Contains(TId selector, IEventStoreConnection connection) { try { @@ -44,12 +61,18 @@ public async Task Contains(TId selector) } } + public Task Contains(TId selector) + { + return Contains(selector, connectionFactory()); + } + + public async Task Delete(TId selector) { var id = selector.ToString(); - var eventsTail = await connection.ReadStreamEventsBackwardAsync(id, 0, 1, false); + var eventsTail = await connectionFactory().ReadStreamEventsBackwardAsync(id, 0, 1, false); var expectedVersion = eventsTail.LastEventNumber; - await connection.DeleteStreamAsync(id, expectedVersion); + await connectionFactory().DeleteStreamAsync(id, expectedVersion); } } } diff --git a/src/BullOak.Repositories/Exceptions/RepositoryUnavailableException.cs b/src/BullOak.Repositories/Exceptions/RepositoryUnavailableException.cs new file mode 100644 index 0000000..8066d6c --- /dev/null +++ b/src/BullOak.Repositories/Exceptions/RepositoryUnavailableException.cs @@ -0,0 +1,20 @@ +namespace BullOak.Repositories.Exceptions +{ + using System; + + public class RepositoryUnavailableException : Exception + { + public RepositoryUnavailableException(string message) : base(message) + { + + } + + + public RepositoryUnavailableException(string message, Exception innerException) : base(message, innerException) + { + + } + + + } +} \ No newline at end of file From 5d0f68ad512db9fed412a59c423749cb8aabace9 Mon Sep 17 00:00:00 2001 From: Alexey Stolybko Date: Tue, 9 Oct 2018 16:11:32 +0100 Subject: [PATCH 2/7] Addressed PR comment --- src/BullOak.Repositories.EventStore/EventstoreRepository.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/BullOak.Repositories.EventStore/EventstoreRepository.cs b/src/BullOak.Repositories.EventStore/EventstoreRepository.cs index d1f1488..7cce563 100644 --- a/src/BullOak.Repositories.EventStore/EventstoreRepository.cs +++ b/src/BullOak.Repositories.EventStore/EventstoreRepository.cs @@ -33,7 +33,7 @@ public async Task> BeginSessionFor(TId id, bool throwIf if (connection == null) { - throw new RepositoryUnavailableException("Couldn't connect to the EvenStore repository. See InnerException for details", new ArgumentNullException(nameof(connection))); + throw new RepositoryUnavailableException("Couldn't connect to the EvenStore repository. Connection object is null."); } if (throwIfNotExists && !(await Contains(id, connection))) From 120484a50b86738619fb539a29f46e6fba7f5a1f Mon Sep 17 00:00:00 2001 From: Alexey Stolybko Date: Tue, 9 Oct 2018 18:23:23 +0100 Subject: [PATCH 3/7] Addressed PR comments --- .../EventstoreRepository.cs | 32 ++++++++++++------- 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/src/BullOak.Repositories.EventStore/EventstoreRepository.cs b/src/BullOak.Repositories.EventStore/EventstoreRepository.cs index 7cce563..a9cf76c 100644 --- a/src/BullOak.Repositories.EventStore/EventstoreRepository.cs +++ b/src/BullOak.Repositories.EventStore/EventstoreRepository.cs @@ -9,7 +9,6 @@ public class EventStoreRepository : IStartSessions { - private static readonly Task falseResult = Task.FromResult(false); private readonly IHoldAllConfiguration configs; private readonly Func connectionFactory; @@ -36,15 +35,18 @@ public async Task> BeginSessionFor(TId id, bool throwIf throw new RepositoryUnavailableException("Couldn't connect to the EvenStore repository. Connection object is null."); } - if (throwIfNotExists && !(await Contains(id, connection))) + using (connection) { - throw new StreamNotFoundException(id.ToString()); - } + if (throwIfNotExists && !(await Contains(id, connection))) + { + throw new StreamNotFoundException(id.ToString()); + } - var session = new EventStoreSession(configs, connectionFactory(), id.ToString()); - await session.Initialize(); + var session = new EventStoreSession(configs, connectionFactory(), id.ToString()); + await session.Initialize(); - return session; + return session; + } } private async Task Contains(TId selector, IEventStoreConnection connection) @@ -63,16 +65,22 @@ private async Task Contains(TId selector, IEventStoreConnection connection public Task Contains(TId selector) { - return Contains(selector, connectionFactory()); + using (var connection = connectionFactory()) + { + return Contains(selector, connection); + } } public async Task Delete(TId selector) { - var id = selector.ToString(); - var eventsTail = await connectionFactory().ReadStreamEventsBackwardAsync(id, 0, 1, false); - var expectedVersion = eventsTail.LastEventNumber; - await connectionFactory().DeleteStreamAsync(id, expectedVersion); + using (var connection = connectionFactory()) + { + var id = selector.ToString(); + var eventsTail = await connection.ReadStreamEventsBackwardAsync(id, 0, 1, false); + var expectedVersion = eventsTail.LastEventNumber; + await connection.DeleteStreamAsync(id, expectedVersion); + } } } } From 0d0815969162d2ab7946d8aa77f958f0069607a0 Mon Sep 17 00:00:00 2001 From: Alexey Stolybko Date: Tue, 9 Oct 2018 18:38:57 +0100 Subject: [PATCH 4/7] Addressed PR comment --- .../EventStoreSession.cs | 16 +++++++++++++++- .../EventstoreRepository.cs | 2 +- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/src/BullOak.Repositories.EventStore/EventStoreSession.cs b/src/BullOak.Repositories.EventStore/EventStoreSession.cs index 249e431..5c05634 100644 --- a/src/BullOak.Repositories.EventStore/EventStoreSession.cs +++ b/src/BullOak.Repositories.EventStore/EventStoreSession.cs @@ -73,7 +73,19 @@ private async Task> ReadAllEventsFromStream() protected override void Dispose(bool disposing) { - if (disposing) { ConsiderSessionDisposed(); } + if (disposing) + { + ConsiderSessionDisposed(); + try + { + eventStoreConnection.Close(); + eventStoreConnection.Dispose(); + } + catch + { + // ignored as connection may already be in disposed state and throw + } + } base.Dispose(disposing); } @@ -160,7 +172,9 @@ private ItemWithType GetEventFromEventData(ResolvedEvent resolvedEvent) switchable.CanEdit = false; } else + { @event = jobject.ToObject(type); + } return new ItemWithType(@event, type); } diff --git a/src/BullOak.Repositories.EventStore/EventstoreRepository.cs b/src/BullOak.Repositories.EventStore/EventstoreRepository.cs index a9cf76c..bc517e4 100644 --- a/src/BullOak.Repositories.EventStore/EventstoreRepository.cs +++ b/src/BullOak.Repositories.EventStore/EventstoreRepository.cs @@ -42,7 +42,7 @@ public async Task> BeginSessionFor(TId id, bool throwIf throw new StreamNotFoundException(id.ToString()); } - var session = new EventStoreSession(configs, connectionFactory(), id.ToString()); + var session = new EventStoreSession(configs, connection, id.ToString()); await session.Initialize(); return session; From eaebabba35ae004d7db324a4d6d4d219577952d7 Mon Sep 17 00:00:00 2001 From: Alexey Stolybko Date: Tue, 9 Oct 2018 18:40:20 +0100 Subject: [PATCH 5/7] Missed change --- .../EventstoreRepository.cs | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/src/BullOak.Repositories.EventStore/EventstoreRepository.cs b/src/BullOak.Repositories.EventStore/EventstoreRepository.cs index bc517e4..1b47477 100644 --- a/src/BullOak.Repositories.EventStore/EventstoreRepository.cs +++ b/src/BullOak.Repositories.EventStore/EventstoreRepository.cs @@ -35,18 +35,15 @@ public async Task> BeginSessionFor(TId id, bool throwIf throw new RepositoryUnavailableException("Couldn't connect to the EvenStore repository. Connection object is null."); } - using (connection) + if (throwIfNotExists && !(await Contains(id, connection))) { - if (throwIfNotExists && !(await Contains(id, connection))) - { - throw new StreamNotFoundException(id.ToString()); - } + throw new StreamNotFoundException(id.ToString()); + } - var session = new EventStoreSession(configs, connection, id.ToString()); - await session.Initialize(); + var session = new EventStoreSession(configs, connection, id.ToString()); + await session.Initialize(); - return session; - } + return session; } private async Task Contains(TId selector, IEventStoreConnection connection) From e659c9f9828bab14ae3e2ca340e1492b42f2d0e6 Mon Sep 17 00:00:00 2001 From: Alexey Stolybko Date: Wed, 10 Oct 2018 12:24:06 +0100 Subject: [PATCH 6/7] Added some connection handling to the repository --- .../EventstoreRepository.cs | 58 ++++++++++++++----- 1 file changed, 44 insertions(+), 14 deletions(-) diff --git a/src/BullOak.Repositories.EventStore/EventstoreRepository.cs b/src/BullOak.Repositories.EventStore/EventstoreRepository.cs index 1b47477..785f9c9 100644 --- a/src/BullOak.Repositories.EventStore/EventstoreRepository.cs +++ b/src/BullOak.Repositories.EventStore/EventstoreRepository.cs @@ -20,30 +20,60 @@ public EventStoreRepository(IHoldAllConfiguration configs, Func> BeginSessionFor(TId id, bool throwIfNotExists = false) { - IEventStoreConnection connection; + IEventStoreConnection connection = null; + EventStoreSession session; try { - connection = connectionFactory(); + try + { + connection = connectionFactory(); + } + catch (Exception ex) + { + throw new RepositoryUnavailableException( + "Couldn't connect to the EvenStore repository. See InnerException for details", ex); + } + + if (connection == null) + { + throw new RepositoryUnavailableException( + "Couldn't connect to the EvenStore repository. Connection object is null."); + } + + if (throwIfNotExists && !await Contains(id, connection)) + { + throw new StreamNotFoundException(id.ToString()); + } + + session = new EventStoreSession(configs, connection, id.ToString()); + await session.Initialize(); + } - catch (Exception ex) + catch { - throw new RepositoryUnavailableException("Couldn't connect to the EvenStore repository. See InnerException for details", ex); + CleanupConnection(connection); + throw; } - if (connection == null) + return session; + } + + private void CleanupConnection(IEventStoreConnection connection) + { + try { - throw new RepositoryUnavailableException("Couldn't connect to the EvenStore repository. Connection object is null."); - } + if (connection == null) + { + return; + } - if (throwIfNotExists && !(await Contains(id, connection))) + connection.Close(); + connection.Dispose(); + } + catch { - throw new StreamNotFoundException(id.ToString()); + // ignored } - - var session = new EventStoreSession(configs, connection, id.ToString()); - await session.Initialize(); - - return session; } private async Task Contains(TId selector, IEventStoreConnection connection) From 38a7cd2ee4f1970de78164ce0386364c2aefb455 Mon Sep 17 00:00:00 2001 From: Alexey Stolybko Date: Wed, 10 Oct 2018 12:37:52 +0100 Subject: [PATCH 7/7] Addressed PR comments --- .../EventStoreSession.cs | 11 ++------ .../EventstoreConnectionExtensions.cs | 26 +++++++++++++++++++ .../EventstoreRepository.cs | 20 +------------- 3 files changed, 29 insertions(+), 28 deletions(-) create mode 100644 src/BullOak.Repositories.EventStore/EventstoreConnectionExtensions.cs diff --git a/src/BullOak.Repositories.EventStore/EventStoreSession.cs b/src/BullOak.Repositories.EventStore/EventStoreSession.cs index 5c05634..096902c 100644 --- a/src/BullOak.Repositories.EventStore/EventStoreSession.cs +++ b/src/BullOak.Repositories.EventStore/EventStoreSession.cs @@ -76,15 +76,8 @@ protected override void Dispose(bool disposing) if (disposing) { ConsiderSessionDisposed(); - try - { - eventStoreConnection.Close(); - eventStoreConnection.Dispose(); - } - catch - { - // ignored as connection may already be in disposed state and throw - } + eventStoreConnection.SafeClose(); + } base.Dispose(disposing); } diff --git a/src/BullOak.Repositories.EventStore/EventstoreConnectionExtensions.cs b/src/BullOak.Repositories.EventStore/EventstoreConnectionExtensions.cs new file mode 100644 index 0000000..97b46f7 --- /dev/null +++ b/src/BullOak.Repositories.EventStore/EventstoreConnectionExtensions.cs @@ -0,0 +1,26 @@ +namespace BullOak.Repositories.EventStore +{ + using global::EventStore.ClientAPI; + + public static class EventstoreConnectionExtensions + { + public static void SafeClose(this IEventStoreConnection connection) + { + try + { + if (connection == null) + { + return; + } + + connection.Close(); + connection.Dispose(); + + } + catch + { + // ignored + } + } + } +} \ No newline at end of file diff --git a/src/BullOak.Repositories.EventStore/EventstoreRepository.cs b/src/BullOak.Repositories.EventStore/EventstoreRepository.cs index 785f9c9..8d3d9ca 100644 --- a/src/BullOak.Repositories.EventStore/EventstoreRepository.cs +++ b/src/BullOak.Repositories.EventStore/EventstoreRepository.cs @@ -51,31 +51,13 @@ public async Task> BeginSessionFor(TId id, bool throwIf } catch { - CleanupConnection(connection); + connection.SafeClose(); throw; } return session; } - private void CleanupConnection(IEventStoreConnection connection) - { - try - { - if (connection == null) - { - return; - } - - connection.Close(); - connection.Dispose(); - } - catch - { - // ignored - } - } - private async Task Contains(TId selector, IEventStoreConnection connection) { try