Skip to content

Commit

Permalink
Test Utilities (#21)
Browse files Browse the repository at this point in the history
* Clone event store

* Copy streams

* In-memory store snapshots

* Json event store
  • Loading branch information
benfoster committed Jan 12, 2024
1 parent a854337 commit cd9cadc
Show file tree
Hide file tree
Showing 15 changed files with 490 additions and 27 deletions.
2 changes: 1 addition & 1 deletion .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ insert_final_newline = false
[*.js]
indent_size = 2

[*.json]
[*.{json,jsonc}]
indent_size = 2

[*.{yml,yaml}]
Expand Down
32 changes: 32 additions & 0 deletions src/Odyssey/EventStoreExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using OneOf;

namespace Odyssey;

using AppendResult = OneOf<Success, UnexpectedStreamState>;

public static class EventStoreExtensions
{
/// <summary>
/// Copies events from one stream to another within the same event store instance
/// </summary>
/// <param name="sourceStore">The event store to copy streams from and to</param>
/// <param name="sourceStreamId">The source stream identifier</param>
/// <param name="destinationStreamId">The destination stream identifier</param>
/// <returns>The append result of the write</returns>
public static Task<AppendResult> CopyStream(this IEventStore sourceStore, string sourceStreamId, string destinationStreamId)
=> CopyStream(sourceStore, sourceStreamId, sourceStore, destinationStreamId);

/// <summary>
/// Copies events from one stream in the source store to another in the destination store
/// </summary>
/// <param name="store">The event store to copy streams from and to</param>
/// <param name="sourceStreamId">The source stream identifier</param>
/// <param name="destinationStore">The</param>
/// <param name="destinationStreamId">The destination stream identifier. If not specified the <paramref name="sourceStreamId"/> will be used</param>
/// <returns>The append result of the write</returns>
public static async Task<AppendResult> CopyStream(this IEventStore sourceStore, string sourceStreamId, IEventStore destinationStore, string? destinationStreamId = null)
{
var eventsToCopy = await sourceStore.ReadStream(sourceStreamId, ReadDirection.Forwards, StreamPosition.Start);
return await destinationStore.AppendToStream(destinationStreamId ?? sourceStreamId, eventsToCopy.ToList(), StreamState.Any);
}
}
14 changes: 14 additions & 0 deletions src/Odyssey/ICloneable.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
namespace Odyssey;

/// <summary>
/// Interface for cloneable event stores (those that can be copied)
/// </summary>
public interface ICloneable
{
/// <summary>
/// Copies all of the event streams from the event store instance to the target
/// </summary>
/// <param name="target">The target event store to write to</param>
/// <returns></returns>
Task CopyTo(IEventStore target, CancellationToken cancellationToken = default);
}
79 changes: 77 additions & 2 deletions src/Odyssey/InMemoryEventStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,15 @@ namespace Odyssey;
using OneOf.Types;
using AppendResult = OneOf.OneOf<Success, UnexpectedStreamState>;

public class InMemoryEventStore : IEventStore
public sealed class InMemoryEventStore : IEventStore, ICloneable
{
private static readonly IReadOnlyCollection<EventData> EmptyStream = Array.Empty<EventData>();
private readonly ConcurrentDictionary<string, List<EventData>> _streams = new();
private ConcurrentDictionary<string, List<EventData>> _streams = new();

// We could probably use dictionaries rather than InMemoryEventStore but this avoids
// having to a deep clone of the streams
private readonly ConcurrentDictionary<string, InMemoryEventStore> _snapshots = new();
private SemaphoreSlim _snapshotLock = new SemaphoreSlim(1);

public Task Initialize(CancellationToken cancellationToken) => Task.CompletedTask;

Expand Down Expand Up @@ -116,11 +121,81 @@ public bool DeleteStream(string streamId)
return _streams.TryRemove(streamId, out _);
}

/// <summary>
/// Copies in-memory streams to the target event store
/// </summary>
/// <param name="target">The target event store to write to</param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task CopyTo(IEventStore target, CancellationToken cancellationToken = default)
{
target.NotNull();

foreach ((string streamId, List<EventData> events) in _streams)
{
if (!cancellationToken.IsCancellationRequested)
{
await target.AppendToStream(streamId, events, StreamState.NoStream, cancellationToken);
}
};
}

/// <summary>
/// Resets the entire in-memory store, clearing all streams
/// </summary>
public void Reset()
{
_streams.Clear();
}

/// <summary>
/// Creates a snapshot of all streams in the event store
/// </summary>
/// <returns>The snapshot identifier</returns>
public async Task<string> CreateSnapshot(CancellationToken cancellationToken = default)
{
try
{
// This only adds thread safety around the action of creating actions
// It's still possible that writes are happening whilst a snapshot is occurring
// Probably okay for the in-memory case but perhaps a good reason to move this out to a snapshottable wrapper
await _snapshotLock.WaitAsync(cancellationToken);
string snapShotId = Guid.NewGuid().ToString();

// Since events *should* be immutable, copying individual events by reference shouldn't be an issue
var snapshot = new InMemoryEventStore();
await CopyTo(snapshot);
_snapshots.TryAdd(snapShotId, snapshot);

return snapShotId;
}
finally
{
_snapshotLock.Release();
}
}

/// <summary>
/// Restores the event store from a previously taken snapshot
/// </summary>
/// <param name="snapshotId">The snapshot identifier</param>
/// <param name="deleteSnapshotOnRestore">Whether to remove the snapshot once it has been restored</param>
/// <returns></returns>
public async Task RestoreFromSnapshot(string snapshotId, bool deleteSnapshotOnRestore, CancellationToken cancellationToken = default)
{
try
{
await _snapshotLock.WaitAsync(cancellationToken);
_streams = _snapshots[snapshotId]._streams;

if (deleteSnapshotOnRestore)
{
_snapshots.TryRemove(snapshotId, out _);
}
}
finally
{
_snapshotLock.Release();
}
}
}
163 changes: 163 additions & 0 deletions src/Odyssey/JsonFileEventStore.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
namespace Odyssey;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using O9d.Guard;
using OneOf;
using OneOf.Types;

/// <summary>
/// Implementation of the event store that reads from files on disk
/// </summary>
public sealed class JsonFileEventStore : IEventStore, ICloneable
{
private static readonly IReadOnlyCollection<EventData> EmptyStream = Array.Empty<EventData>();
private readonly Dictionary<string, List<EventData>> _appendedEvents = new();
private string _storagePath;
private readonly JsonSerializer _serializer;
private readonly TypeResolver _eventTypeResolver;

public JsonFileEventStore(string storagePath)
{
_storagePath = storagePath.NotNull();
_serializer = JsonSerializer.Create(SerializerSettings.Default);
_eventTypeResolver = TypeResolvers.UsingClrQualifiedTypeMetadata;
}

public Task<OneOf<Success, UnexpectedStreamState>> AppendToStream(string streamId, IReadOnlyList<EventData> events, StreamState expectedState, CancellationToken cancellationToken = default)
{
if (expectedState != StreamState.NoStream)
{
throw new InvalidOperationException("Appending to an existing stream is not currently supported");
}

using var sw = new StreamWriter(GetStreamFilePath(streamId));
using var writer = new JsonTextWriter(sw);
_serializer.Serialize(writer, events);
return Task.FromResult<OneOf<Success, UnexpectedStreamState>>(new Success());
}

public Task Initialize(CancellationToken cancellationToken = default)
{
if (!Directory.Exists(_storagePath))
{
Directory.CreateDirectory(_storagePath);
}

return Task.CompletedTask;
}

public async Task<IReadOnlyCollection<EventData>> ReadStream(string streamId, ReadDirection direction, StreamPosition position, CancellationToken cancellationToken = default)
{
// TODO support json or jsonc
string streamFilePath = GetStreamFilePath(streamId);

if (!File.Exists(streamFilePath))
{
return EmptyStream;
}

string fileJson = await File.ReadAllTextAsync(streamFilePath);
var fileEvents = JsonConvert.DeserializeObject<IEnumerable<JsonFileEvent>>(fileJson) ?? Array.Empty<JsonFileEvent>();

var events = new List<EventData>();

foreach (JsonFileEvent fileEvent in fileEvents)
{
EventData eventData = ResolveEvent(fileEvent);
events.Add(eventData);
}

if (_appendedEvents.ContainsKey(streamId))
{
events.AddRange(_appendedEvents[streamId]);
}

return events.AsReadOnly();
}

private string GetStreamFilePath(string streamId) => Path.Combine(_storagePath, $"{streamId}.jsonc");

public async Task<OneOf<EventData, NotFound>> ReadStreamEvent(string streamId, long eventNumber, CancellationToken cancellationToken = default)
{
var events = await ReadStream(streamId, ReadDirection.Forwards, StreamPosition.Start, cancellationToken);

if (eventNumber > events.Count - 1)
{
return new NotFound();
}

return events.ElementAt((int)eventNumber);
}

public void ClearAppendedEvents() => _appendedEvents.Clear();

private EventData ResolveEvent(JsonFileEvent @event)
{
Type? eventType = _eventTypeResolver.Invoke(@event.Id, @event.Metadata);

return eventType is not null
? @event.ToEventData(eventType, _serializer)
: throw new ArgumentException($"The CLR type for event {@event.EventType} cannot be resolved");
}

public async Task CopyTo(IEventStore target, CancellationToken cancellationToken = default)
{
target.NotNull();
foreach (var streamId in GetStreamsFromDirectory())
{
var @events = await ReadStream(streamId, ReadDirection.Forwards, StreamPosition.Start, cancellationToken);
await target.AppendToStream(streamId, @events.ToList(), StreamState.NoStream, cancellationToken);
}
}

private IEnumerable<string> GetStreamsFromDirectory()
{
return Directory.GetFiles(_storagePath)
.Select(path => Path.GetFileNameWithoutExtension(path));
}

public sealed class JsonFileEvent
{
[JsonProperty("id")]
public string Id { get; set; } = null!;

[JsonProperty("stream_id")] // PK
public string StreamId { get; set; } = null!;

[JsonProperty("event_id")]
public Guid EventId { get; set; }

[JsonProperty("event_type")]
public string EventType { get; set; } = null!;

[JsonProperty("data")]
public JObject Data { get; set; } = null!;

[JsonProperty("metadata")]
public Dictionary<string, object> Metadata { get; set; } = null!;

[JsonProperty("event_number")]
public long EventNumber { get; set; }

// https://learn.microsoft.com/en-us/azure/cosmos-db/account-databases-containers-items#properties-of-an-item
[JsonProperty("_ts")] // Unix time
public long? Timestamp { get; set; }

public EventData ToEventData(Type clrType, JsonSerializer serializer)
{
var eventData = new EventData(
EventId,
EventType,
Data.ToObject(clrType, serializer)!,
Metadata
)
{
EventNumber = EventNumber
};

return eventData;
}

public static string GenerateId(long eventNumber, string streamId) => $"{eventNumber}@{streamId}";
}
}
16 changes: 8 additions & 8 deletions src/Odyssey/Odyssey.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="O9d.Guard" Version="0.1.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="[6.0.0, 8.0.0)" />
<PackageReference Include="Microsoft.Azure.Cosmos" Version="3.31.2" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="[6.0.0, 8.0.0)" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="[6.0.0, 8.0.0)" />
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="[6.0.0, 8.0.0)" />
<PackageReference Include="OneOf" Version="[3.0.223, 4.0)" />
<PackageReference Include="MinVer" Version="4.2.0" PrivateAssets="All" />
<PackageReference Include="O9d.Guard" Version="0.1.0"/>
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="[6.0.0, 8.0.0)"/>
<PackageReference Include="Microsoft.Azure.Cosmos" Version="3.31.2"/>
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="[6.0.0, 8.0.0)"/>
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="[6.0.0, 8.0.0)"/>
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="[6.0.0, 8.0.0)"/>
<PackageReference Include="OneOf" Version="[3.0.223, 4.0)"/>
<PackageReference Include="MinVer" Version="4.2.0" PrivateAssets="All"/>
</ItemGroup>
</Project>
Loading

0 comments on commit cd9cadc

Please sign in to comment.