Skip to content

Commit

Permalink
Make IMessage serializer overridable
Browse files Browse the repository at this point in the history
  • Loading branch information
mythz committed Dec 5, 2022
1 parent 17a70e8 commit 8169399
Show file tree
Hide file tree
Showing 9 changed files with 50 additions and 18 deletions.
2 changes: 1 addition & 1 deletion ServiceStack/src/ServiceStack.Client/MessageExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ internal static class MessageExtensions<T>
public static IMessage ConvertToMessage(object oBytes)
{
var bytes = (byte[]) oBytes;
return bytes.ToMessage<T>();
return MessageSerializer.Instance.ToMessage<T>(bytes);
}
}
}
21 changes: 21 additions & 0 deletions ServiceStack/src/ServiceStack.Client/MessageSerializer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#nullable enable
using System;
using ServiceStack.Messaging;

namespace ServiceStack;

public class MessageSerializer
{
public static IMessageSerializer Instance { get; set; } = new ServiceStackMessageSerializer();
}

public class ServiceStackMessageSerializer : IMessageSerializer
{
public byte[] ToBytes(IMessage message) => message.ToBytes();

public byte[] ToBytes<T>(IMessage<T> message) => message.ToBytes();

public IMessage ToMessage(byte[] bytes, Type ofType) => bytes.ToMessage(ofType);

public Message<T> ToMessage<T>(byte[] bytes) => bytes.ToMessage<T>();
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void Publish<T>(IMessage<T> message)

public void Publish(string queueName, IMessage message)
{
var messageBytes = message.ToBytes();
var messageBytes = MessageSerializer.Instance.ToBytes(message);
factory.PublishMessage(queueName, messageBytes);
}

Expand All @@ -60,7 +60,7 @@ public void SendAllOneWay(IEnumerable<object> requests)

public void Notify(string queueName, IMessage message)
{
var messageBytes = message.ToBytes();
var messageBytes = MessageSerializer.Instance.ToBytes(message);
factory.PublishMessage(queueName, messageBytes);
}

Expand All @@ -80,8 +80,7 @@ public IMessage<T> Get<T>(string queueName, TimeSpan? timeOut = null)

public IMessage<T> GetAsync<T>(string queueName)
{
return factory.GetMessageAsync(queueName)
.ToMessage<T>();
return MessageSerializer.Instance.ToMessage<T>(factory.GetMessageAsync(queueName));
}

public void Ack(IMessage message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@ void InvokeMessageReceived(EventArgs e)
received?.Invoke(this, e);
}

private readonly Dictionary<string, Queue<byte[]>> queueMessageBytesMap
= new Dictionary<string, Queue<byte[]>>();
private readonly Dictionary<string, Queue<byte[]>> queueMessageBytesMap = new();

public void PublishMessage<T>(string queueName, IMessage<T> message)
{
PublishMessage(queueName, message.ToBytes());
PublishMessage(queueName, MessageSerializer.Instance.ToBytes(message));
}

public void PublishMessage(string queueName, byte[] messageBytes)
Expand All @@ -45,7 +44,7 @@ public void PublishMessage(string queueName, byte[] messageBytes)
bytesQueue.Enqueue(messageBytes);
}

InvokeMessageReceived(new EventArgs());
InvokeMessageReceived(EventArgs.Empty);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void SendAllOneWay(IEnumerable<object> requests)

public void Publish(string queueName, IMessage message)
{
var messageBytes = message.ToBytes();
var messageBytes = MessageSerializer.Instance.ToBytes(message);
this.ReadWriteClient.LPush(queueName, messageBytes);
this.ReadWriteClient.Publish(QueueNames.TopicIn, queueName.ToUtf8Bytes());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void SendAllOneWay(IEnumerable<object> requests)

public void Publish(string queueName, IMessage message)
{
var messageBytes = message.ToBytes();
var messageBytes = MessageSerializer.Instance.ToBytes(message);
this.ReadWriteClient.LPush(queueName, messageBytes);
this.ReadWriteClient.Publish(QueueNames.TopicIn, queueName.ToUtf8Bytes());

Expand All @@ -80,7 +80,7 @@ public void Publish(string queueName, IMessage message)

public void Notify(string queueName, IMessage message)
{
var messageBytes = message.ToBytes();
var messageBytes = MessageSerializer.Instance.ToBytes(message);
this.ReadWriteClient.LPush(queueName, messageBytes);
this.ReadWriteClient.LTrim(queueName, 0, this.MaxSuccessQueueSize);
this.ReadWriteClient.Publish(QueueNames.TopicOut, queueName.ToUtf8Bytes());
Expand All @@ -93,13 +93,13 @@ public IMessage<T> Get<T>(string queueName, TimeSpan? timeOut = null)
? null
: unblockingKeyAndValue[1];

return messageBytes.ToMessage<T>();
return MessageSerializer.Instance.ToMessage<T>(messageBytes);
}

public IMessage<T> GetAsync<T>(string queueName)
{
var messageBytes = this.ReadWriteClient.RPop(queueName);
return messageBytes.ToMessage<T>();
return MessageSerializer.Instance.ToMessage<T>(messageBytes);
}

public void Ack(IMessage message)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using System;

namespace ServiceStack.Messaging;

public interface IMessageSerializer
{
byte[] ToBytes(IMessage message);

byte[] ToBytes<T>(IMessage<T> message);

IMessage ToMessage(byte[] bytes, Type ofType);

Message<T> ToMessage<T>(byte[] bytes);
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void Publish<T>(IMessage<T> message)
public void Publish(string queueName, IMessage message)
{
this.parent.transientMessageService.MessageQueueFactory
.PublishMessage(queueName, message.ToBytes());
.PublishMessage(queueName, MessageSerializer.Instance.ToBytes(message));
}

public void SendOneWay(object requestDto)
Expand All @@ -95,7 +95,6 @@ public void Dispose()
{
}
}

}
}
#endif
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ public void Can_serialize_IMessage_ToBytes_into_typed_Message()
{
var dto = new Incr { Value = 1 };
var iMsg = MessageFactory.Create(dto);
var bytes = iMsg.ToBytes();
var typedMessage = bytes.ToMessage<Incr>();
var bytes = MessageSerializer.Instance.ToBytes(iMsg);
var typedMessage = MessageSerializer.Instance.ToMessage<Incr>(bytes);

Assert.That(typedMessage.GetBody().Value, Is.EqualTo(dto.Value));
}
Expand Down

0 comments on commit 8169399

Please sign in to comment.