Skip to content

Commit

Permalink
0.3-beta
Browse files Browse the repository at this point in the history
Add event bus event system
  • Loading branch information
decembrist-revolt committed May 20, 2021
1 parent 16de61d commit addb74d
Show file tree
Hide file tree
Showing 15 changed files with 264 additions and 104 deletions.
3 changes: 0 additions & 3 deletions Decembrist Plugin.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,4 @@
<TargetFramework>netcoreapp3.1</TargetFramework>
<RootNamespace>Decembrist</RootNamespace>
</PropertyGroup>
<ItemGroup>
<Content Include="README.md" />
</ItemGroup>
</Project>
4 changes: 1 addition & 3 deletions Example/DiTestNode.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
using System;
using Godot;
using Decembrist.Di;
using Decembrist.Example.Service;
using Decembrist.Utils;
using Godot;
using static Decembrist.Example.Assertions;

namespace Decembrist.Example
Expand Down
42 changes: 42 additions & 0 deletions Example/EventBusEvents/EventTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
using Decembrist.Events;
using Godot;

namespace Decembrist.Example.EventBusEvents
{
public class EventTest : Node2D
{

public override void _Ready()
{
var messageCount = 0;

void IncrementCountCallback()
{
messageCount++;
}
GD.Print("EventBus event test started for........................................");
var subscription1 = this.EventListener("event1", IncrementCountCallback);
var subscription2 = this.EventListener("event1", IncrementCountCallback);
this.FireEvent("event1");
subscription2.Stop();
subscription1.Stop();
this.FireEvent("event1");
Assertions.AssertTrue(messageCount == 2, "Event without params test");

messageCount = 0;

void PlusCountCallback(int input)
{
messageCount += input;
}
subscription1 = this.EventListener<int>("event2", PlusCountCallback);
subscription2 = this.EventListener<int>("event2", PlusCountCallback);
this.FireEvent("event2", 2);
subscription2.Stop();
subscription1.Stop();
this.FireEvent("event2", 2);
Assertions.AssertTrue(messageCount == 4, "Event with params test");
GD.Print("EventBus event test stopped...........................................");
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
using Decembrist.Events;
using Godot;

namespace Decembrist.Example.EventBusTest
namespace Decembrist.Example.EventBusMessages
{
public class Consumer : Node2D
{
Expand All @@ -16,13 +16,13 @@ public class Consumer : Node2D
public override void _Ready()
{
var messageCount1 = 0;
_subscription1 = this.Consumer<int, int>(ConsumerAddress1, message =>
_subscription1 = this.MessageEndpoint<int, int>(ConsumerAddress1, message =>
{
messageCount1++;
HandleMessage(_subscription1, message, messageCount1);
});
var messageCount2 = 0;
_subscription2 = this.Consumer<int, int>(ConsumerAddress2, message =>
_subscription2 = this.MessageEndpoint<int, int>(ConsumerAddress2, message =>
{
messageCount2++;
HandleMessage(_subscription2, message, messageCount2);
Expand All @@ -31,18 +31,18 @@ public override void _Ready()

private void HandleMessage(
EventBusSubscription eventBusSubscription,
ReplyEventMessage<int, int> message,
ReplyEventBusRequest<int, int> busRequest,
int messageCount)
{
Assertions.AssertTrue(!message.IsError(), "not error message");
Assertions.AssertTrue(!busRequest.IsError(), "not error message");
if (messageCount > 5)
{
message.ErrorReply("test error", 2);
busRequest.ErrorReply("test error", 2);
eventBusSubscription.Stop();
}
else
{
message.Reply(message.Content + 1);
busRequest.Reply(busRequest.Content + 1);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Decembrist.Events;
using Godot;
using static Decembrist.Example.Assertions;

namespace Decembrist.Example.EventBusTest
namespace Decembrist.Example.EventBusMessages
{
[Tool]
public class Producer : Node2D
Expand All @@ -21,7 +20,7 @@ public class Producer : Node2D
public override async void _Ready()
{
await Task.Delay(1000);
GD.Print($"EventBus test started for {_messageAddress}....................");
GD.Print($"EventBus message test started for {_messageAddress}............");
const int messageCount = 6;
ProduceMessages(messageCount);
await Task.Delay(1000);
Expand All @@ -33,7 +32,7 @@ public override async void _Ready()

AssertTrue(_errorMessage == Consumer.TestError, "event bus error message ok");
AssertTrue(_errorCode == Consumer.TestErrorCode, "event bus error code ok");
GD.Print("EventBus test stopped...........................................");
GD.Print("EventBus message test stopped...................................");
}

private async void ProduceMessages(int messageCount)
Expand All @@ -42,7 +41,7 @@ private async void ProduceMessages(int messageCount)
{
try
{
var response = await this.SendMessageAsync<int, int>(_messageAddress, i);
var response = await this.SendMessageRequestAsync<int, int>(_messageAddress, i);
_responses.Add(response);
}
catch (SendEventException ex)
Expand Down
22 changes: 12 additions & 10 deletions Example/TestScene.tscn
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
[gd_scene load_steps=4 format=2]
[gd_scene load_steps=5 format=2]

[ext_resource path="res://Example/DiTestNode.cs" type="Script" id=1]
[ext_resource path="res://Example/EventBusTest/Producer.cs" type="Script" id=2]
[ext_resource path="res://Example/EventBusTest/Consumer.cs" type="Script" id=3]
[ext_resource path="res://Example/EventBusMessages/Producer.cs" type="Script" id=2]
[ext_resource path="res://Example/EventBusMessages/Consumer.cs" type="Script" id=3]
[ext_resource path="res://Example/EventBusEvents/EventTest.cs" type="Script" id=4]

[node name="Node2D" type="Node2D"]

[node name="DiTestNode" type="Node2D" parent="."]
script = ExtResource( 1 )

[node name="EventBusTestNode" type="Node2D" parent="."]
[node name="EventBusMessages" type="Node2D" parent="."]

[node name="Producer1" type="Node2D" parent="EventBusTestNode"]
[node name="Producer1" type="Node2D" parent="EventBusMessages"]
script = ExtResource( 2 )
_messageAddress = "consumer-address1"

[node name="Producer2" type="Node2D" parent="EventBusTestNode"]
script = ExtResource( 2 )
_messageAddress = "consumer-address2"

[node name="Consumer" type="Node2D" parent="EventBusTestNode"]
[node name="Consumer" type="Node2D" parent="EventBusMessages"]
script = ExtResource( 3 )

[node name="EventBusEvents" type="Node2D" parent="."]

[node name="EventTest" type="Node2D" parent="EventBusEvents"]
script = ExtResource( 4 )
75 changes: 62 additions & 13 deletions addons/decembrist_plugin/Autoload/EventBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Threading.Tasks;
using Decembrist.Di;
using Decembrist.Events;
using Decembrist.Utils;
using Decembrist.Utils.Callback;
using Decembrist.Utils.Task;
using Godot;
Expand All @@ -17,12 +18,16 @@ public class EventBus : Node

[Signal]
private delegate void MessageSignal(string source, string messageId);

[Signal]
private new delegate void EmitSignal(string address, string messageId);

[Signal]
internal delegate void ReplySignal(string replyMessageId);

private ConcurrentDictionary<string, Object> _messages = new();
private ConcurrentDictionary<string, Object> _replies = new();
private ConcurrentDictionary<string, object?> _eventMessages = new();
private ConcurrentDictionary<string, AbstractCallback> _replyCallbacks = new();

public EventBus()
Expand All @@ -38,11 +43,11 @@ public override void _Ready()
/// <summary>
/// Add reply event to list
/// </summary>
/// <param name="eventMessage">reply event</param>
/// <param name="eventBusRequest">reply event</param>
/// <typeparam name="TResponse"></typeparam>
internal void AddReplyMessage<TResponse>(EventMessage<TResponse> eventMessage)
internal void AddReplyMessage<TResponse>(EventBusRequest<TResponse> eventBusRequest)
{
_replies[eventMessage.MessageId] = eventMessage;
_replies[eventBusRequest.MessageId] = eventBusRequest;
}

/// <summary>
Expand All @@ -54,28 +59,45 @@ internal void AddReplyMessage<TResponse>(EventMessage<TResponse> eventMessage)
/// <typeparam name="TRequest">Message content type</typeparam>
/// <typeparam name="TResponse">Response message type</typeparam>
public void Send<TRequest, TResponse>(string to, TRequest? message,
Action<EventMessage<TResponse>> replyHandler)
Action<EventBusRequest<TResponse>> replyHandler)
{
var eventMessage = new ReplyEventMessage<TRequest, TResponse>(this, message);
var eventMessage = new ReplyEventBusRequest<TRequest, TResponse>(this, message);
var messageId = eventMessage.MessageId;
var callback = this.Subscribe(nameof(ReplySignal), (string replyMessageId) =>
const string signal = nameof(ReplySignal);
var callback = this.Subscribe(signal, (string replyMessageId) =>
{
if (messageId != replyMessageId) return;

_replies.TryRemove(messageId, out var reply);
if (reply is EventMessage<TResponse> response)
if (reply is EventBusRequest<TResponse> response)
{
replyHandler(response);
this.Unsubscribe(nameof(ReplySignal), _replyCallbacks[messageId]);
this.Unsubscribe(signal, _replyCallbacks[messageId]);
}
});
_replyCallbacks[messageId] = callback;
_messages[messageId] = eventMessage;
EmitSignal(nameof(MessageSignal), to, messageId);
}

/// <summary>
/// Send message through event bus for every registered consumer
/// </summary>
/// <param name="to">Message address</param>
/// <param name="message">Message content</param>
/// <param name="replyHandler">Consumer response handler</param>
/// <typeparam name="TRequest">Message content type</typeparam>
/// <typeparam name="TResponse">Response message type</typeparam>
public void Emit(string @event, object? body)
{
var messageId = Uuid.Get();
_eventMessages[messageId] = body;
EmitSignal(nameof(EmitSignal), @event, messageId);
_eventMessages.TryRemove(messageId, out var eventBody);
}

/// <summary>
/// Async version for <see cref="Send{T,TR}(string,T?,System.Action{Decembrist.Events.EventMessage{TR}})"/>
/// Async version for <see cref="Send{T,TR}(string,T?,System.Action{EventBusRequest{T}})"/>
/// </summary>
/// <param name="to">Message address</param>
/// <param name="message">Message content</param>
Expand Down Expand Up @@ -108,24 +130,51 @@ public Task<TResponse> Send<TRequest, TResponse>(string to, TRequest? message =
/// <typeparam name="TRequest">Message content type</typeparam>
/// <typeparam name="TResponse">Response message type</typeparam>
/// <returns>Subscription</returns>
public EventBusSubscription Consumer<TRequest, TResponse>(
public EventBusSubscription MessageEndpoint<TRequest, TResponse>(
string from,
Action<ReplyEventMessage<TRequest, TResponse>> messageHandler)
Action<ReplyEventBusRequest<TRequest, TResponse>> messageHandler)
{
const string signal = nameof(MessageSignal);
var callback = this.Subscribe(signal, (string source, string messageId) =>
{
if (source != from) return;

_messages.TryRemove(messageId, out var message);
if (message is ReplyEventMessage<TRequest, TResponse> eventMessage)
if (message is not ReplyEventBusRequest<TRequest, TResponse> eventMessage) return;

messageHandler(eventMessage);
if (!eventMessage.Replied)
{
messageHandler(eventMessage);
eventMessage.EmptyReply();
}
}
);

return new EventBusSubscription(this, callback, signal);
}

/// <summary>
/// Subscribes on event emits
/// </summary>
/// <param name="event">Event name</param>
/// <param name="eventHandler">Handler callback</param>
/// <typeparam name="TRequest">Event body type</typeparam>
/// <returns>Subscription</returns>
public EventBusSubscription EventListener<TRequest>(string @event, Action<TRequest> eventHandler)
{
const string signal = nameof(EmitSignal);
var callback = this.Subscribe(signal, (string address, string messageId) =>
{
if (@event != address) return;

var body = _eventMessages[messageId];
if (body is TRequest or null)
{
eventHandler((TRequest) body);
}
});

return new EventBusSubscription(this, callback, signal);
}
}
}
Loading

0 comments on commit addb74d

Please sign in to comment.