Skip to content

Commit

Permalink
feat: add event notification feature
Browse files Browse the repository at this point in the history
  • Loading branch information
ailtonguitar authored and filipeesch committed Oct 3, 2023
1 parent 356865e commit 87cd205
Show file tree
Hide file tree
Showing 20 changed files with 359 additions and 203 deletions.
16 changes: 6 additions & 10 deletions src/KafkaFlow.Abstractions/Consumers/IWorker.cs
@@ -1,8 +1,5 @@
namespace KafkaFlow
{
using System;
using KafkaFlow.Observer;

/// <summary>
/// Represents the interface of a internal worker
/// </summary>
Expand All @@ -14,19 +11,18 @@ public interface IWorker
int Id { get; }

/// <summary>
/// This handler is called immediately after a worker completes the consumption of a message
/// Gets the subject for worker stopping events where observers can subscribe to receive notifications.
/// </summary>
/// <param name="handler"><see cref="Action"/> to be executed</param>
void OnTaskCompleted(Action handler);
IEvent WorkerStopping { get; }

/// <summary>
/// Gets the subject for worker stopping events where observers can subscribe to receive notifications.
/// Gets the subject for worker stopped events where observers can subscribe to receive notifications.
/// </summary>
ISubject<WorkerStoppingSubject, VoidObject> WorkerStopping { get; }
IEvent WorkerStopped { get; }

/// <summary>
/// Gets the subject for worker stopped events where observers can subscribe to receive notifications.
/// Gets the subject for worker consumption completed events where observers can subscribe to receive notifications.
/// </summary>
ISubject<WorkerStoppedSubject, VoidObject> WorkerStopped { get; }
IEvent<IMessageContext> WorkerProcessingEnded { get; }
}
}
19 changes: 0 additions & 19 deletions src/KafkaFlow.Abstractions/Consumers/WorkerStoppedSubject.cs

This file was deleted.

19 changes: 0 additions & 19 deletions src/KafkaFlow.Abstractions/Consumers/WorkerStoppingSubject.cs

This file was deleted.

32 changes: 32 additions & 0 deletions src/KafkaFlow.Abstractions/IEvent.cs
@@ -0,0 +1,32 @@
namespace KafkaFlow
{
using System;
using System.Threading.Tasks;

/// <summary>
/// Represents an Event to be subscribed.
/// </summary>
public interface IEvent
{
/// <summary>
/// Subscribes to the event.
/// </summary>
/// <param name="handler">The handler to be called when the event is fired.</param>
/// <returns>Event subscription reference</returns>
IEventSubscription Subscribe(Func<Task> handler);
}

/// <summary>
/// Represents an Event to be subscribed.
/// </summary>
/// <typeparam name="TArg">The argument expected by the event.</typeparam>
public interface IEvent<TArg>
{
/// <summary>
/// Subscribes to the event.
/// </summary>
/// <param name="handler">The handler to be called when the event is fired.</param>
/// <returns>Event subscription reference</returns>
IEventSubscription Subscribe(Func<TArg, Task> handler);
}
}
12 changes: 12 additions & 0 deletions src/KafkaFlow.Abstractions/IEventSubscription.cs
@@ -0,0 +1,12 @@
namespace KafkaFlow;

/// <summary>
/// Represents an Event subscription.
/// </summary>
public interface IEventSubscription
{
/// <summary>
/// Cancels the subscription to the event.
/// </summary>
void Cancel();
}
17 changes: 0 additions & 17 deletions src/KafkaFlow.Abstractions/Observer/ISubject.cs

This file was deleted.

18 changes: 0 additions & 18 deletions src/KafkaFlow.Abstractions/Observer/ISubjectObserver.cs

This file was deleted.

53 changes: 0 additions & 53 deletions src/KafkaFlow.Abstractions/Observer/Subject.cs

This file was deleted.

17 changes: 0 additions & 17 deletions src/KafkaFlow.Abstractions/VoidObject.cs

This file was deleted.

10 changes: 2 additions & 8 deletions src/KafkaFlow.BatchConsume/BatchConsumeMiddleware.cs
Expand Up @@ -7,12 +7,8 @@
using System.Threading.Tasks;
using KafkaFlow.Configuration;
using KafkaFlow.Consumers;
using KafkaFlow.Observer;

internal class BatchConsumeMiddleware
: IMessageMiddleware,
ISubjectObserver<WorkerStoppedSubject, VoidObject>,
IDisposable
internal class BatchConsumeMiddleware : IMessageMiddleware, IDisposable
{
private readonly SemaphoreSlim dispatchSemaphore = new(1, 1);

Expand All @@ -37,7 +33,7 @@ internal class BatchConsumeMiddleware
this.batch = new(batchSize);
this.consumerConfiguration = middlewareContext.Consumer.Configuration;

middlewareContext.Worker.WorkerStopped.Subscribe(this);
middlewareContext.Worker.WorkerStopped.Subscribe(() => this.TriggerDispatchAndWaitAsync());
}

public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
Expand Down Expand Up @@ -67,8 +63,6 @@ public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
}
}

public async Task OnNotification(WorkerStoppedSubject subject, VoidObject arg) => await this.TriggerDispatchAndWaitAsync();

public void Dispose()
{
this.dispatchTask?.Dispose();
Expand Down
Expand Up @@ -51,7 +51,7 @@ public void Setup()

workerMock
.SetupGet(x => x.WorkerStopped)
.Returns(new WorkerStoppedSubject(this.logHandlerMock.Object));
.Returns(new Event(this.logHandlerMock.Object));

consumerConfigurationMock
.SetupGet(x => x.AutoMessageCompletion)
Expand Down

0 comments on commit 87cd205

Please sign in to comment.