Skip to content

Commit

Permalink
Json event store
Browse files Browse the repository at this point in the history
  • Loading branch information
benfoster committed Jan 12, 2024
1 parent 08d0ab2 commit 11961b5
Show file tree
Hide file tree
Showing 6 changed files with 251 additions and 9 deletions.
2 changes: 1 addition & 1 deletion .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ insert_final_newline = false
[*.js]
indent_size = 2

[*.json]
[*.{json,jsonc}]
indent_size = 2

[*.{yml,yaml}]
Expand Down
163 changes: 163 additions & 0 deletions src/Odyssey/JsonFileEventStore.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
namespace Odyssey;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using O9d.Guard;
using OneOf;
using OneOf.Types;

/// <summary>
/// Implementation of the event store that reads from files on disk
/// </summary>
public sealed class JsonFileEventStore : IEventStore, ICloneable
{
private static readonly IReadOnlyCollection<EventData> EmptyStream = Array.Empty<EventData>();
private readonly Dictionary<string, List<EventData>> _appendedEvents = new();
private string _storagePath;
private readonly JsonSerializer _serializer;
private readonly TypeResolver _eventTypeResolver;

public JsonFileEventStore(string storagePath)
{
_storagePath = storagePath.NotNull();
_serializer = JsonSerializer.Create(SerializerSettings.Default);
_eventTypeResolver = TypeResolvers.UsingClrQualifiedTypeMetadata;
}

public Task<OneOf<Success, UnexpectedStreamState>> AppendToStream(string streamId, IReadOnlyList<EventData> events, StreamState expectedState, CancellationToken cancellationToken = default)
{
if (expectedState != StreamState.NoStream)
{
throw new InvalidOperationException("Appending to an existing stream is not currently supported");
}

using var sw = new StreamWriter(GetStreamFilePath(streamId));
using var writer = new JsonTextWriter(sw);
_serializer.Serialize(writer, events);
return Task.FromResult<OneOf<Success, UnexpectedStreamState>>(new Success());
}

public Task Initialize(CancellationToken cancellationToken = default)
{
if (!Directory.Exists(_storagePath))
{
Directory.CreateDirectory(_storagePath);
}

return Task.CompletedTask;
}

public async Task<IReadOnlyCollection<EventData>> ReadStream(string streamId, ReadDirection direction, StreamPosition position, CancellationToken cancellationToken = default)
{
// TODO support json or jsonc
string streamFilePath = GetStreamFilePath(streamId);

if (!File.Exists(streamFilePath))
{
return EmptyStream;
}

string fileJson = await File.ReadAllTextAsync(streamFilePath);
var fileEvents = JsonConvert.DeserializeObject<IEnumerable<JsonFileEvent>>(fileJson) ?? Array.Empty<JsonFileEvent>();

var events = new List<EventData>();

foreach (JsonFileEvent fileEvent in fileEvents)
{
EventData eventData = ResolveEvent(fileEvent);
events.Add(eventData);
}

if (_appendedEvents.ContainsKey(streamId))
{
events.AddRange(_appendedEvents[streamId]);
}

return events.AsReadOnly();
}

private string GetStreamFilePath(string streamId) => Path.Combine(_storagePath, $"{streamId}.jsonc");

public async Task<OneOf<EventData, NotFound>> ReadStreamEvent(string streamId, long eventNumber, CancellationToken cancellationToken = default)
{
var events = await ReadStream(streamId, ReadDirection.Forwards, StreamPosition.Start, cancellationToken);

if (eventNumber > events.Count - 1)
{
return new NotFound();
}

return events.ElementAt((int)eventNumber);
}

public void ClearAppendedEvents() => _appendedEvents.Clear();

private EventData ResolveEvent(JsonFileEvent @event)
{
Type? eventType = _eventTypeResolver.Invoke(@event.Id, @event.Metadata);

return eventType is not null
? @event.ToEventData(eventType, _serializer)
: throw new ArgumentException($"The CLR type for event {@event.EventType} cannot be resolved");
}

public async Task CopyTo(IEventStore target, CancellationToken cancellationToken = default)
{
target.NotNull();
foreach (var streamId in GetStreamsFromDirectory())
{
var @events = await ReadStream(streamId, ReadDirection.Forwards, StreamPosition.Start, cancellationToken);
await target.AppendToStream(streamId, @events.ToList(), StreamState.NoStream, cancellationToken);
}
}

private IEnumerable<string> GetStreamsFromDirectory()
{
return Directory.GetFiles(_storagePath)
.Select(path => Path.GetFileNameWithoutExtension(path));
}

public sealed class JsonFileEvent
{
[JsonProperty("id")]
public string Id { get; set; } = null!;

[JsonProperty("stream_id")] // PK
public string StreamId { get; set; } = null!;

[JsonProperty("event_id")]
public Guid EventId { get; set; }

[JsonProperty("event_type")]
public string EventType { get; set; } = null!;

[JsonProperty("data")]
public JObject Data { get; set; } = null!;

[JsonProperty("metadata")]
public Dictionary<string, object> Metadata { get; set; } = null!;

[JsonProperty("event_number")]
public long EventNumber { get; set; }

// https://learn.microsoft.com/en-us/azure/cosmos-db/account-databases-containers-items#properties-of-an-item
[JsonProperty("_ts")] // Unix time
public long? Timestamp { get; set; }

public EventData ToEventData(Type clrType, JsonSerializer serializer)
{
var eventData = new EventData(
EventId,
EventType,
Data.ToObject(clrType, serializer)!,
Metadata
)
{
EventNumber = EventNumber
};

return eventData;
}

public static string GenerateId(long eventNumber, string streamId) => $"{eventNumber}@{streamId}";
}
}
16 changes: 8 additions & 8 deletions src/Odyssey/Odyssey.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="O9d.Guard" Version="0.1.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="[6.0.0, 8.0.0)" />
<PackageReference Include="Microsoft.Azure.Cosmos" Version="3.31.2" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="[6.0.0, 8.0.0)" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="[6.0.0, 8.0.0)" />
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="[6.0.0, 8.0.0)" />
<PackageReference Include="OneOf" Version="[3.0.223, 4.0)" />
<PackageReference Include="MinVer" Version="4.2.0" PrivateAssets="All" />
<PackageReference Include="O9d.Guard" Version="0.1.0"/>
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="[6.0.0, 8.0.0)"/>
<PackageReference Include="Microsoft.Azure.Cosmos" Version="3.31.2"/>
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="[6.0.0, 8.0.0)"/>
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="[6.0.0, 8.0.0)"/>
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="[6.0.0, 8.0.0)"/>
<PackageReference Include="OneOf" Version="[3.0.223, 4.0)"/>
<PackageReference Include="MinVer" Version="4.2.0" PrivateAssets="All"/>
</ItemGroup>
</Project>
60 changes: 60 additions & 0 deletions test/Odyssey.Tests/JsonFileEventStoreTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
namespace Odyssey.Tests;

using O9d.Guard;
using static Utils;

public class JsonFileEventStoreTests
{
[Fact]
public async Task Can_read_json_file_events()
{
var eventStore = new JsonFileEventStore("event-streams");
var streamId = "test-stream";

var events = await eventStore.ReadStream(streamId, ReadDirection.Forwards, StreamPosition.Start);
events.Count.ShouldBe(1);
var eventData = events.First();

var @event = eventData.Data.ShouldBeOfType<JsonEvent>();

var expected = new JsonEvent("Did the thing");
@event.ShouldBe(expected);
}

[Fact]
public async Task Can_read_event_at_index()
{
var eventStore = new JsonFileEventStore("event-streams");
var streamId = "test-stream";

var result = await eventStore.ReadStreamEvent(streamId, 0);
result.Value.ShouldBeOfType<EventData>().NotNull();
}

[Fact]
public async void Can_clone()
{
var eventStore = new JsonFileEventStore("event-streams");
var inMemStore = new InMemoryEventStore();

await eventStore.CopyTo(inMemStore);
var events = await inMemStore.ReadStream("test-stream", ReadDirection.Backwards, StreamPosition.Start);
events.Count().ShouldBe(1);
}

[Fact]
public async void Can_write_and_read_events()
{
var eventStore = new JsonFileEventStore("temp");
await eventStore.Initialize();

string streamId = CreateStreamId();
var @event = new JsonEvent("some reference");
await eventStore.AppendToStream(streamId, new[] { MapEvent(@event) }, StreamState.NoStream);

var events = await eventStore.ReadStream(streamId, ReadDirection.Backwards, StreamPosition.Start);
events.Count().ShouldBe(1);
}
}

public record JsonEvent(string Reference);
3 changes: 3 additions & 0 deletions test/Odyssey.Tests/Odyssey.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,8 @@
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\Odyssey\Odyssey.csproj"/>
<None Update="event-streams/*.json*">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
</ItemGroup>
</Project>
16 changes: 16 additions & 0 deletions test/Odyssey.Tests/event-streams/test-stream.jsonc
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[
{
"id": "0@test-stream",
"stream_id": "test-stream",
"event_id": "3766c7f2-ac53-4f1c-bc51-4c5f28757e26",
"event_type": "json_event",
"data": {
"reference": "Did the thing"
},
"metadata": {
"_clr_type": "Odyssey.Tests.JsonEvent, Odyssey.Tests",
"_clr_type_name": "JsonEvent"
},
"event_number": 0
}
]

0 comments on commit 11961b5

Please sign in to comment.