Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 73 additions & 76 deletions Driver/Program.cs
Original file line number Diff line number Diff line change
@@ -1,90 +1,87 @@
using System.Diagnostics;

namespace Driver
namespace Driver;

using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using EventStore.Client;
using Shared.DomainDrivenDesign.EventSourcing;
using Shared.EventStore.Aggregate;
using Shared.EventStore.EventHandling;
using Shared.EventStore.EventStore;
using Shared.EventStore.SubscriptionWorker;
using Shared.IntegrationTesting;
using Shared.Logger;

internal class Program
{
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using EventStore.Client;
using Shared.DomainDrivenDesign.EventSourcing;
using Shared.EventStore.Aggregate;
using Shared.EventStore.EventHandling;
using Shared.EventStore.EventStore;
using Shared.EventStore.SubscriptionWorker;
using Shared.IntegrationTesting;
using Shared.Logger;

internal class Program
{
#region Methods

internal static EventStoreClientSettings ConfigureEventStoreSettings() {
EventStoreClientSettings settings = new();

settings.CreateHttpMessageHandler = () => new SocketsHttpHandler {
SslOptions = {
RemoteCertificateValidationCallback = (sender,
certificate,
chain,
errors) => true,
}
};

settings.ConnectivitySettings = EventStoreClientConnectivitySettings.Default;
settings.ConnectivitySettings.Insecure = false;
settings.DefaultCredentials = new UserCredentials("admin", "changeit");
settings.ConnectivitySettings.Address = new Uri("esdb://192.168.0.133:2113?tls=true&tlsVerifyCert=false");

return settings;
}

private static async Task Main(String[] args) {
Logger.Initialise(NullLogger.Instance);
await Program.SubscriptionsTest();

Console.ReadKey();
}

private static async Task QueryTest(){
String query = "fromStream('$et-EstateCreatedEvent')\r\n .when({\r\n $init: function (s, e)\r\n {\r\n return {\r\n estates:[]\r\n };\r\n },\r\n \"EstateCreatedEvent\": function(s,e){\r\n s.estates.push(e.data.estateName);\r\n }\r\n });";
EventStoreClient client = new(Program.ConfigureEventStoreSettings());
EventStoreProjectionManagementClient projection = new(Program.ConfigureEventStoreSettings());
EventStoreContext context = new(client, projection);

var result = await context.RunTransientQuery(query, CancellationToken.None);
Console.WriteLine(result);
}

private static async Task SubscriptionsTest() {
String eventStoreConnectionString = "esdb://127.0.0.1:2113?tls=false";
Int32 inflightMessages = 50;
Int32 persistentSubscriptionPollingInSeconds = 10;
Int32 cacheDuration = 0;

ISubscriptionRepository subscriptionRepository = SubscriptionRepository.Create(eventStoreConnectionString, cacheDuration);
#region Methods

internal static EventStoreClientSettings ConfigureEventStoreSettings() {
EventStoreClientSettings settings = new();

settings.CreateHttpMessageHandler = () => new SocketsHttpHandler {
SslOptions = {
RemoteCertificateValidationCallback = (sender,
certificate,
chain,
errors) => true,
}
};

settings.ConnectivitySettings = EventStoreClientConnectivitySettings.Default;
settings.ConnectivitySettings.Insecure = false;
settings.DefaultCredentials = new UserCredentials("admin", "changeit");
settings.ConnectivitySettings.Address = new Uri("esdb://192.168.0.133:2113?tls=true&tlsVerifyCert=false");

return settings;
}

private static async Task Main(String[] args) {
Logger.Initialise(NullLogger.Instance);
await Program.SubscriptionsTest();

Console.ReadKey();
}

// init our SubscriptionRepository
subscriptionRepository.PreWarm(CancellationToken.None).Wait();
private static async Task QueryTest(){
String query = "fromStream('$et-EstateCreatedEvent')\r\n .when({\r\n $init: function (s, e)\r\n {\r\n return {\r\n estates:[]\r\n };\r\n },\r\n \"EstateCreatedEvent\": function(s,e){\r\n s.estates.push(e.data.estateName);\r\n }\r\n });";
EventStoreClient client = new(Program.ConfigureEventStoreSettings());
EventStoreProjectionManagementClient projection = new(Program.ConfigureEventStoreSettings());
EventStoreContext context = new(client, projection);

var result = await context.RunTransientQuery(query, CancellationToken.None);
Console.WriteLine(result);
}

private static async Task SubscriptionsTest() {
String eventStoreConnectionString = "esdb://127.0.0.1:2113?tls=false";
Int32 inflightMessages = 50;
Int32 persistentSubscriptionPollingInSeconds = 10;
Int32 cacheDuration = 0;

ISubscriptionRepository subscriptionRepository = SubscriptionRepository.Create(eventStoreConnectionString, cacheDuration);

IDomainEventHandlerResolver eventHandlerResolver = new DomainEventHandlerResolver(new Dictionary<String, String[]>(), null);
// init our SubscriptionRepository
subscriptionRepository.PreWarm(CancellationToken.None).Wait();

SubscriptionWorker concurrentSubscriptions = SubscriptionWorker.CreateSubscriptionWorker(
eventStoreConnectionString, eventHandlerResolver,
subscriptionRepository, inflightMessages, persistentSubscriptionPollingInSeconds);
IDomainEventHandlerResolver eventHandlerResolver = new DomainEventHandlerResolver(new Dictionary<String, String[]>(), null);

concurrentSubscriptions.Trace += (_, args) => Console.WriteLine($"{TraceEventType.Information}|{args.Message}");
concurrentSubscriptions.Warning += (_, args) => Console.WriteLine($"{TraceEventType.Warning}|{args.Message}");
concurrentSubscriptions.Error += (_, args) => Console.WriteLine($"{TraceEventType.Error}|{args.Message}");
SubscriptionWorker concurrentSubscriptions = SubscriptionWorker.CreateSubscriptionWorker(
eventStoreConnectionString, eventHandlerResolver,
subscriptionRepository, inflightMessages, persistentSubscriptionPollingInSeconds);

concurrentSubscriptions.Trace += (_, args) => Console.WriteLine($"{TraceEventType.Information}|{args.Message}");
concurrentSubscriptions.Warning += (_, args) => Console.WriteLine($"{TraceEventType.Warning}|{args.Message}");
concurrentSubscriptions.Error += (_, args) => Console.WriteLine($"{TraceEventType.Error}|{args.Message}");


concurrentSubscriptions.StartAsync(CancellationToken.None).Wait();
}

#endregion
concurrentSubscriptions.StartAsync(CancellationToken.None).Wait();
}


#endregion
}
105 changes: 52 additions & 53 deletions Driver/TestAggregate.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,72 +2,71 @@
using System.Collections.Generic;
using System.Text;

namespace Driver
namespace Driver;

using Newtonsoft.Json;
using Shared.DomainDrivenDesign.EventSourcing;
using Shared.EventStore.Aggregate;
using Shared.EventStore.EventStore;

public record TestAggregate1 : Aggregate
{
using Newtonsoft.Json;
using Shared.DomainDrivenDesign.EventSourcing;
using Shared.EventStore.Aggregate;
using Shared.EventStore.EventStore;
public String AggregateName { get; private set; }

public record TestAggregate1 : Aggregate
public static TestAggregate1 Create(Guid aggregateId)
{
public String AggregateName { get; private set; }

public static TestAggregate1 Create(Guid aggregateId)
{
return new TestAggregate1(aggregateId);
}
return new TestAggregate1(aggregateId);
}

public TestAggregate1()
{
public TestAggregate1()
{

}
}

private TestAggregate1(Guid aggregateId)
{
this.AggregateId = aggregateId;
}
private TestAggregate1(Guid aggregateId)
{
this.AggregateId = aggregateId;
}

protected override Object GetMetadata()
{
return null;
}
protected override Object GetMetadata()
{
return null;
}

public override void PlayEvent(IDomainEvent domainEvent)
{
this.PlayEvent((dynamic)domainEvent);
}
public override void PlayEvent(IDomainEvent domainEvent)
{
this.PlayEvent((dynamic)domainEvent);
}

private void PlayEvent(AggregateNameSetEvent domainEvent){
if (AggregateName == "Error")
throw new Exception("Error Aggregate");
this.AggregateName = domainEvent.AggregateName;
}
private void PlayEvent(AggregateNameSetEvent domainEvent){
if (AggregateName == "Error")
throw new Exception("Error Aggregate");
this.AggregateName = domainEvent.AggregateName;
}

public void SetAggregateName(String aggregateName)
{
AggregateNameSetEvent aggregateNameSetEvent = AggregateNameSetEvent.Create(this.AggregateId, aggregateName);
public void SetAggregateName(String aggregateName)
{
AggregateNameSetEvent aggregateNameSetEvent = AggregateNameSetEvent.Create(this.AggregateId, aggregateName);

this.ApplyAndAppend(aggregateNameSetEvent);
}
this.ApplyAndAppend(aggregateNameSetEvent);
}
}

public record AggregateNameSetEvent : DomainEvent
{
[JsonProperty]
public String AggregateName { get; private set; }
public record AggregateNameSetEvent : DomainEvent
{
[JsonProperty]
public String AggregateName { get; private set; }

private AggregateNameSetEvent(Guid aggregateId,
Guid eventId,
String aggregateName) : base(aggregateId, eventId)
{
this.AggregateName = aggregateName;
}
private AggregateNameSetEvent(Guid aggregateId,
Guid eventId,
String aggregateName) : base(aggregateId, eventId)
{
this.AggregateName = aggregateName;
}

public static AggregateNameSetEvent Create(Guid aggregateId,
String aggregateName)
{
return new AggregateNameSetEvent(aggregateId, Guid.NewGuid(), aggregateName);
}
public static AggregateNameSetEvent Create(Guid aggregateId,
String aggregateName)
{
return new AggregateNameSetEvent(aggregateId, Guid.NewGuid(), aggregateName);
}
}
}
Loading
Loading