diff --git a/src/BullOak.Repositories.EventStore.Test.Integration/Contexts/InProcEventStoreIntegrationContext.cs b/src/BullOak.Repositories.EventStore.Test.Integration/Contexts/InProcEventStoreIntegrationContext.cs index 4f7563d..224b50c 100644 --- a/src/BullOak.Repositories.EventStore.Test.Integration/Contexts/InProcEventStoreIntegrationContext.cs +++ b/src/BullOak.Repositories.EventStore.Test.Integration/Contexts/InProcEventStoreIntegrationContext.cs @@ -43,7 +43,7 @@ private static IEventStoreConnection GetConnection() public void SetupRepository(IHoldAllConfiguration configuration) { - repository = new EventStoreRepository(configuration, GetConnection()); + repository = new EventStoreRepository(configuration, GetConnection); } [BeforeTestRun] diff --git a/src/BullOak.Repositories.EventStore/EventStoreSession.cs b/src/BullOak.Repositories.EventStore/EventStoreSession.cs index 249e431..096902c 100644 --- a/src/BullOak.Repositories.EventStore/EventStoreSession.cs +++ b/src/BullOak.Repositories.EventStore/EventStoreSession.cs @@ -73,7 +73,12 @@ private async Task> ReadAllEventsFromStream() protected override void Dispose(bool disposing) { - if (disposing) { ConsiderSessionDisposed(); } + if (disposing) + { + ConsiderSessionDisposed(); + eventStoreConnection.SafeClose(); + + } base.Dispose(disposing); } @@ -160,7 +165,9 @@ private ItemWithType GetEventFromEventData(ResolvedEvent resolvedEvent) switchable.CanEdit = false; } else + { @event = jobject.ToObject(type); + } return new ItemWithType(@event, type); } diff --git a/src/BullOak.Repositories.EventStore/EventstoreConnectionExtensions.cs b/src/BullOak.Repositories.EventStore/EventstoreConnectionExtensions.cs new file mode 100644 index 0000000..97b46f7 --- /dev/null +++ b/src/BullOak.Repositories.EventStore/EventstoreConnectionExtensions.cs @@ -0,0 +1,26 @@ +namespace BullOak.Repositories.EventStore +{ + using global::EventStore.ClientAPI; + + public static class EventstoreConnectionExtensions + { + public static void SafeClose(this IEventStoreConnection connection) + { + try + { + if (connection == null) + { + return; + } + + connection.Close(); + connection.Dispose(); + + } + catch + { + // ignored + } + } + } +} \ No newline at end of file diff --git a/src/BullOak.Repositories.EventStore/EventstoreRepository.cs b/src/BullOak.Repositories.EventStore/EventstoreRepository.cs index 80e02b3..8d3d9ca 100644 --- a/src/BullOak.Repositories.EventStore/EventstoreRepository.cs +++ b/src/BullOak.Repositories.EventStore/EventstoreRepository.cs @@ -9,28 +9,56 @@ public class EventStoreRepository : IStartSessions { - private static readonly Task falseResult = Task.FromResult(false); private readonly IHoldAllConfiguration configs; - private readonly IEventStoreConnection connection; + private readonly Func connectionFactory; - public EventStoreRepository(IHoldAllConfiguration configs, IEventStoreConnection connection) + public EventStoreRepository(IHoldAllConfiguration configs, Func connectionFactory) { - this.configs = configs ?? throw new ArgumentNullException(nameof(connection)); - this.connection = connection ?? throw new ArgumentNullException(nameof(connection)); + this.configs = configs ?? throw new ArgumentNullException(nameof(configs)); + this.connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory)); } public async Task> BeginSessionFor(TId id, bool throwIfNotExists = false) { - if (throwIfNotExists && !(await Contains(id))) - throw new StreamNotFoundException(id.ToString()); + IEventStoreConnection connection = null; + EventStoreSession session; + try + { + try + { + connection = connectionFactory(); + } + catch (Exception ex) + { + throw new RepositoryUnavailableException( + "Couldn't connect to the EvenStore repository. See InnerException for details", ex); + } + + if (connection == null) + { + throw new RepositoryUnavailableException( + "Couldn't connect to the EvenStore repository. Connection object is null."); + } - var session = new EventStoreSession(configs, connection, id.ToString()); - await session.Initialize(); + if (throwIfNotExists && !await Contains(id, connection)) + { + throw new StreamNotFoundException(id.ToString()); + } + + session = new EventStoreSession(configs, connection, id.ToString()); + await session.Initialize(); + + } + catch + { + connection.SafeClose(); + throw; + } return session; } - public async Task Contains(TId selector) + private async Task Contains(TId selector, IEventStoreConnection connection) { try { @@ -44,12 +72,24 @@ public async Task Contains(TId selector) } } + public Task Contains(TId selector) + { + using (var connection = connectionFactory()) + { + return Contains(selector, connection); + } + } + + public async Task Delete(TId selector) { - var id = selector.ToString(); - var eventsTail = await connection.ReadStreamEventsBackwardAsync(id, 0, 1, false); - var expectedVersion = eventsTail.LastEventNumber; - await connection.DeleteStreamAsync(id, expectedVersion); + using (var connection = connectionFactory()) + { + var id = selector.ToString(); + var eventsTail = await connection.ReadStreamEventsBackwardAsync(id, 0, 1, false); + var expectedVersion = eventsTail.LastEventNumber; + await connection.DeleteStreamAsync(id, expectedVersion); + } } } } diff --git a/src/BullOak.Repositories/Exceptions/RepositoryUnavailableException.cs b/src/BullOak.Repositories/Exceptions/RepositoryUnavailableException.cs new file mode 100644 index 0000000..8066d6c --- /dev/null +++ b/src/BullOak.Repositories/Exceptions/RepositoryUnavailableException.cs @@ -0,0 +1,20 @@ +namespace BullOak.Repositories.Exceptions +{ + using System; + + public class RepositoryUnavailableException : Exception + { + public RepositoryUnavailableException(string message) : base(message) + { + + } + + + public RepositoryUnavailableException(string message, Exception innerException) : base(message, innerException) + { + + } + + + } +} \ No newline at end of file