Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EventStore repository to allow connection factory to be passed #107

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ private static IEventStoreConnection GetConnection()

public void SetupRepository(IHoldAllConfiguration configuration)
{
repository = new EventStoreRepository<string, IHoldHigherOrder>(configuration, GetConnection());
repository = new EventStoreRepository<string, IHoldHigherOrder>(configuration, GetConnection);
}

[BeforeTestRun]
Expand Down
9 changes: 8 additions & 1 deletion src/BullOak.Repositories.EventStore/EventStoreSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,12 @@ private async Task<List<ResolvedEvent>> ReadAllEventsFromStream()

protected override void Dispose(bool disposing)
{
if (disposing) { ConsiderSessionDisposed(); }
if (disposing)
{
ConsiderSessionDisposed();
eventStoreConnection.SafeClose();

}
base.Dispose(disposing);
}

Expand Down Expand Up @@ -160,7 +165,9 @@ private ItemWithType GetEventFromEventData(ResolvedEvent resolvedEvent)
switchable.CanEdit = false;
}
else
{
@event = jobject.ToObject(type);
}

return new ItemWithType(@event, type);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
namespace BullOak.Repositories.EventStore
{
using global::EventStore.ClientAPI;

public static class EventstoreConnectionExtensions
{
public static void SafeClose(this IEventStoreConnection connection)
{
try
{
if (connection == null)
{
return;
}

connection.Close();
connection.Dispose();

}
catch
{
// ignored
}
}
}
}
68 changes: 54 additions & 14 deletions src/BullOak.Repositories.EventStore/EventstoreRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,56 @@

public class EventStoreRepository<TId, TState> : IStartSessions<TId, TState>
{
private static readonly Task<bool> falseResult = Task.FromResult(false);
private readonly IHoldAllConfiguration configs;
private readonly IEventStoreConnection connection;
private readonly Func<IEventStoreConnection> connectionFactory;

public EventStoreRepository(IHoldAllConfiguration configs, IEventStoreConnection connection)
public EventStoreRepository(IHoldAllConfiguration configs, Func<IEventStoreConnection> connectionFactory)
{
this.configs = configs ?? throw new ArgumentNullException(nameof(connection));
this.connection = connection ?? throw new ArgumentNullException(nameof(connection));
this.configs = configs ?? throw new ArgumentNullException(nameof(configs));
this.connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory));
}

public async Task<IManageSessionOf<TState>> BeginSessionFor(TId id, bool throwIfNotExists = false)
{
if (throwIfNotExists && !(await Contains(id)))
throw new StreamNotFoundException(id.ToString());
IEventStoreConnection connection = null;
EventStoreSession<TState> session;
try
{
try
{
connection = connectionFactory();
}
catch (Exception ex)
{
throw new RepositoryUnavailableException(
"Couldn't connect to the EvenStore repository. See InnerException for details", ex);
}

if (connection == null)
{
throw new RepositoryUnavailableException(
"Couldn't connect to the EvenStore repository. Connection object is null.");
}

var session = new EventStoreSession<TState>(configs, connection, id.ToString());
await session.Initialize();
if (throwIfNotExists && !await Contains(id, connection))
{
throw new StreamNotFoundException(id.ToString());
}

session = new EventStoreSession<TState>(configs, connection, id.ToString());
await session.Initialize();

}
catch
{
connection.SafeClose();
throw;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw at L40 will leave the connection open.


return session;
}

public async Task<bool> Contains(TId selector)
private async Task<bool> Contains(TId selector, IEventStoreConnection connection)
{
try
{
Expand All @@ -44,12 +72,24 @@ public async Task<bool> Contains(TId selector)
}
}

public Task<bool> Contains(TId selector)
{
using (var connection = connectionFactory())
{
return Contains(selector, connection);
}
}


public async Task Delete(TId selector)
{
var id = selector.ToString();
var eventsTail = await connection.ReadStreamEventsBackwardAsync(id, 0, 1, false);
var expectedVersion = eventsTail.LastEventNumber;
await connection.DeleteStreamAsync(id, expectedVersion);
using (var connection = connectionFactory())
{
var id = selector.ToString();
var eventsTail = await connection.ReadStreamEventsBackwardAsync(id, 0, 1, false);
var expectedVersion = eventsTail.LastEventNumber;
await connection.DeleteStreamAsync(id, expectedVersion);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
namespace BullOak.Repositories.Exceptions
{
using System;

public class RepositoryUnavailableException : Exception
{
public RepositoryUnavailableException(string message) : base(message)
{

}


public RepositoryUnavailableException(string message, Exception innerException) : base(message, innerException)
{

}


}
}