diff --git a/src/Samples/Sample.Route.Server/Program.cs b/src/Samples/Sample.Route.Server/Program.cs index d7ebbbec..016cf9e4 100644 --- a/src/Samples/Sample.Route.Server/Program.cs +++ b/src/Samples/Sample.Route.Server/Program.cs @@ -5,6 +5,7 @@ using Twino.MQ.Data; using Twino.MQ.Events; using Twino.MQ.Routing; +using Twino.Protocols.TMQ; using Twino.Server; namespace Sample.Route.Server diff --git a/src/Samples/Sample.Server/Program.cs b/src/Samples/Sample.Server/Program.cs index 2da26d1c..e77847ea 100644 --- a/src/Samples/Sample.Server/Program.cs +++ b/src/Samples/Sample.Server/Program.cs @@ -3,6 +3,7 @@ using Twino.MQ.Data; using Twino.MQ.Queues; using Twino.MQ.Routing; +using Twino.Protocols.TMQ; using Twino.Server; namespace Sample.Server diff --git a/src/Twino.Client.TMQ/Annotations/QueueTagAttribute.cs b/src/Twino.Client.TMQ/Annotations/QueueTagAttribute.cs new file mode 100644 index 00000000..4f8e0d23 --- /dev/null +++ b/src/Twino.Client.TMQ/Annotations/QueueTagAttribute.cs @@ -0,0 +1,24 @@ +using System; + +namespace Twino.Client.TMQ.Annotations +{ + /// + /// Used when queue is created with first push + /// + [AttributeUsage(AttributeTargets.Class)] + public class QueueTagAttribute : Attribute + { + /// + /// The Queue Tag for the type + /// + public string Tag { get; } + + /// + /// Creates new Queue Tag attribute + /// + public QueueTagAttribute(string tag) + { + Tag = tag; + } + } +} \ No newline at end of file diff --git a/src/Twino.Client.TMQ/Annotations/Resolvers/TypeDeliveryDescriptor.cs b/src/Twino.Client.TMQ/Annotations/Resolvers/TypeDeliveryDescriptor.cs index 901bda01..b0b8a0c4 100644 --- a/src/Twino.Client.TMQ/Annotations/Resolvers/TypeDeliveryDescriptor.cs +++ b/src/Twino.Client.TMQ/Annotations/Resolvers/TypeDeliveryDescriptor.cs @@ -36,6 +36,11 @@ public class TypeDeliveryDescriptor /// public MessagingQueueStatus? QueueStatus { get; set; } + /// + /// If queue is created with a message push and that value is not null, queue will be created with that Tag + /// + public string Tag { get; set; } + /// /// Headers for delivery descriptor of type /// @@ -157,6 +162,9 @@ public TmqMessage CreateMessage(MessageType type, string overrideTargetName, ush if (QueueStatus.HasValue) message.AddHeader(TmqHeaders.QUEUE_STATUS, QueueStatus.Value.ToString().ToLower()); + if (!string.IsNullOrEmpty(Tag)) + message.AddHeader(TmqHeaders.QUEUE_TAG, Tag); + foreach (KeyValuePair pair in Headers) message.AddHeader(pair.Key, pair.Value); diff --git a/src/Twino.Client.TMQ/Annotations/Resolvers/TypeDeliveryResolver.cs b/src/Twino.Client.TMQ/Annotations/Resolvers/TypeDeliveryResolver.cs index 81786d89..c6bdf2db 100644 --- a/src/Twino.Client.TMQ/Annotations/Resolvers/TypeDeliveryResolver.cs +++ b/src/Twino.Client.TMQ/Annotations/Resolvers/TypeDeliveryResolver.cs @@ -125,6 +125,10 @@ private void ResolveBase(Type type, TypeDeliveryDescriptor descriptor) if (qsa != null) descriptor.QueueStatus = qsa.Status; + QueueTagAttribute qta = type.GetCustomAttribute(true); + if (qta != null) + descriptor.Tag = qta.Tag; + IEnumerable headerAttributes = type.GetCustomAttributes(true); foreach (MessageHeaderAttribute headerAttribute in headerAttributes) descriptor.Headers.Add(new KeyValuePair(headerAttribute.Key, headerAttribute.Value)); diff --git a/src/Twino.Client.TMQ/Operators/RouterOperator.cs b/src/Twino.Client.TMQ/Operators/RouterOperator.cs index d3c9eb2c..40f46189 100644 --- a/src/Twino.Client.TMQ/Operators/RouterOperator.cs +++ b/src/Twino.Client.TMQ/Operators/RouterOperator.cs @@ -1,166 +1,271 @@ +using System; using System.Collections.Generic; using System.IO; using System.Text; using System.Threading.Tasks; using Twino.Client.TMQ.Annotations.Resolvers; +using Twino.Client.TMQ.Models; using Twino.Protocols.TMQ; +using Twino.Protocols.TMQ.Models; namespace Twino.Client.TMQ.Operators { - /// - /// Router manager object for tmq client - /// - public class RouterOperator - { - private readonly TmqClient _client; - - internal RouterOperator(TmqClient client) - { - _client = client; - } - - #region Actions - - //todo: create - //todo: list - //todo: remove - - //todo: add binding - //todo: get bindings - //todo: remove binding - - #endregion - - #region Publish - - /// - /// Publishes a string message to a router - /// - public async Task Publish(string routerName, - string message, - bool waitForAcknowledge = false, - ushort contentType = 0, - IEnumerable> messageHeaders = null) - { - TmqMessage msg = new TmqMessage(MessageType.Router, routerName, contentType); - msg.PendingAcknowledge = waitForAcknowledge; - msg.SetMessageId(_client.UniqueIdGenerator.Create()); - msg.Content = new MemoryStream(Encoding.UTF8.GetBytes(message)); - - if (messageHeaders != null) - foreach (KeyValuePair pair in messageHeaders) - msg.AddHeader(pair.Key, pair.Value); - - return await _client.SendAndWaitForAcknowledge(msg, waitForAcknowledge); - } - - /// - /// Publishes a byte array data to a router - /// - public async Task Publish(string routerName, - byte[] data, - bool waitForAcknowledge = false, - ushort contentType = 0, - IEnumerable> messageHeaders = null) - { - TmqMessage msg = new TmqMessage(MessageType.Router, routerName, contentType); - msg.PendingAcknowledge = waitForAcknowledge; - msg.SetMessageId(_client.UniqueIdGenerator.Create()); - msg.Content = new MemoryStream(data); - - if (messageHeaders != null) - foreach (KeyValuePair pair in messageHeaders) - msg.AddHeader(pair.Key, pair.Value); - - return await _client.SendAndWaitForAcknowledge(msg, waitForAcknowledge); - } - - /// - /// Publishes a JSON object to a router - /// - public Task PublishJson(object model, bool waitForAcknowledge = false, - IEnumerable> messageHeaders = null) - { - return PublishJson(null, model, waitForAcknowledge, null, messageHeaders); - } - - /// - /// Publishes a JSON object to a router - /// - public async Task PublishJson(string routerName, - object model, - bool waitForAcknowledge = false, - ushort? contentType = null, - IEnumerable> messageHeaders = null) - { - TypeDeliveryDescriptor descriptor = _client.DeliveryContainer.GetDescriptor(model.GetType()); - TmqMessage message = descriptor.CreateMessage(MessageType.Router, routerName, contentType); - - message.PendingAcknowledge = waitForAcknowledge; - message.SetMessageId(_client.UniqueIdGenerator.Create()); - message.Serialize(model, _client.JsonSerializer); - - if (messageHeaders != null) - foreach (KeyValuePair pair in messageHeaders) - message.AddHeader(pair.Key, pair.Value); - - return await _client.SendAndWaitForAcknowledge(message, waitForAcknowledge); - } - - /// - /// Sends a string request to router. - /// Waits response from at least one binding. - /// - public async Task PublishRequest(string routerName, string message, ushort contentType = 0, - IEnumerable> messageHeaders = null) - { - TmqMessage msg = new TmqMessage(MessageType.Router, routerName, contentType); - msg.PendingResponse = true; - msg.Content = new MemoryStream(Encoding.UTF8.GetBytes(message)); - - if (messageHeaders != null) - foreach (KeyValuePair pair in messageHeaders) - msg.AddHeader(pair.Key, pair.Value); - - return await _client.Request(msg); - } - - /// - /// Sends a request to router. - /// Waits response from at least one binding. - /// - public Task> PublishRequestJson(TRequest request, - IEnumerable> messageHeaders = null) - { - return PublishRequestJson(null, request, null, messageHeaders); - } - - /// - /// Sends a request to router. - /// Waits response from at least one binding. - /// - public async Task> PublishRequestJson(string routerName, TRequest request, ushort? contentType = null, - IEnumerable> messageHeaders = null) - { - TypeDeliveryDescriptor descriptor = _client.DeliveryContainer.GetDescriptor(request.GetType()); - TmqMessage message = descriptor.CreateMessage(MessageType.Router, routerName, contentType); - message.PendingResponse = true; - message.Serialize(request, _client.JsonSerializer); - - if (messageHeaders != null) - foreach (KeyValuePair pair in messageHeaders) - message.AddHeader(pair.Key, pair.Value); - - TmqMessage responseMessage = await _client.Request(message); - if (responseMessage.ContentType == 0) - { - TResponse response = responseMessage.Deserialize(_client.JsonSerializer); - return new TwinoResult(response, message, TwinoResultCode.Ok); - } - - return new TwinoResult(default, responseMessage, (TwinoResultCode) responseMessage.ContentType); - } - - #endregion - } + /// + /// Router manager object for tmq client + /// + public class RouterOperator + { + private readonly TmqClient _client; + + internal RouterOperator(TmqClient client) + { + _client = client; + } + + #region Actions + + /// + /// Creates new router. + /// Returns success result if router already exists. + /// + public async Task Create(string name, RouteMethod method) + { + TmqMessage message = new TmqMessage(); + message.Type = MessageType.Server; + message.ContentType = KnownContentTypes.CreateRouter; + message.SetTarget(name); + message.PendingResponse = true; + message.AddHeader(TmqHeaders.ROUTE_METHOD, Convert.ToInt32(method).ToString()); + message.SetMessageId(_client.UniqueIdGenerator.Create()); + return await _client.WaitResponse(message, true); + } + + /// + /// Gets information of all routers in server + /// + public async Task>> List() + { + TmqMessage message = new TmqMessage(); + message.Type = MessageType.Server; + message.ContentType = KnownContentTypes.ListRouters; + return await _client.SendAndGetJson>(message); + } + + /// + /// Removes a router. + /// Returns success result if router doesn't exists. + /// + public async Task Remove(string name) + { + TmqMessage message = new TmqMessage(); + message.Type = MessageType.Server; + message.ContentType = KnownContentTypes.RemoveRouter; + message.SetTarget(name); + message.PendingResponse = true; + message.SetMessageId(_client.UniqueIdGenerator.Create()); + return await _client.WaitResponse(message, true); + } + + /// + /// Adds new binding to a router + /// + /// Router name of the binding + /// Binding type + /// Binding name + /// Binding target. Queue name, tag name, direct receiver id, name, type, etc. + /// Binding interaction + /// Binding method is used when multiple receivers available in same binding. It's used for Direct and Tag bindings. + /// Overwritten content type if specified + /// Binding priority + /// + public async Task AddBinding(string routerName, + BindingType type, + string name, + string target, + BindingInteraction interaction, + RouteMethod bindingMethod = RouteMethod.Distribute, + ushort? contentType = null, + int priority = 1) + { + TmqMessage message = new TmqMessage(); + message.Type = MessageType.Server; + message.ContentType = KnownContentTypes.AddBinding; + message.SetTarget(routerName); + message.PendingResponse = true; + message.SetMessageId(_client.UniqueIdGenerator.Create()); + BindingInformation info = new BindingInformation + { + Name = name, + Target = target, + Interaction = interaction, + ContentType = contentType, + Priority = priority, + BindingType = type, + Method = bindingMethod + }; + message.Serialize(info, new NewtonsoftContentSerializer()); + return await _client.WaitResponse(message, true); + } + + /// + /// Gets all bindings of a router + /// + public async Task>> GetBindings(string routerName) + { + TmqMessage message = new TmqMessage(); + message.Type = MessageType.Server; + message.ContentType = KnownContentTypes.ListBindings; + message.SetTarget(routerName); + return await _client.SendAndGetJson>(message); + } + + /// + /// Remove a binding from a router + /// + public async Task RemoveBinding(string routerName, string bindingName) + { + TmqMessage message = new TmqMessage(); + message.Type = MessageType.Server; + message.ContentType = KnownContentTypes.RemoveBinding; + message.SetTarget(routerName); + message.PendingResponse = true; + message.SetMessageId(_client.UniqueIdGenerator.Create()); + message.AddHeader(TmqHeaders.BINDING_NAME, bindingName); + return await _client.WaitResponse(message, true); + } + + #endregion + + #region Publish + + /// + /// Publishes a string message to a router + /// + public async Task Publish(string routerName, + string message, + bool waitForAcknowledge = false, + ushort contentType = 0, + IEnumerable> messageHeaders = null) + { + TmqMessage msg = new TmqMessage(MessageType.Router, routerName, contentType); + msg.PendingAcknowledge = waitForAcknowledge; + msg.SetMessageId(_client.UniqueIdGenerator.Create()); + msg.Content = new MemoryStream(Encoding.UTF8.GetBytes(message)); + + if (messageHeaders != null) + foreach (KeyValuePair pair in messageHeaders) + msg.AddHeader(pair.Key, pair.Value); + + return await _client.SendAndWaitForAcknowledge(msg, waitForAcknowledge); + } + + /// + /// Publishes a byte array data to a router + /// + public async Task Publish(string routerName, + byte[] data, + bool waitForAcknowledge = false, + ushort contentType = 0, + IEnumerable> messageHeaders = null) + { + TmqMessage msg = new TmqMessage(MessageType.Router, routerName, contentType); + msg.PendingAcknowledge = waitForAcknowledge; + msg.SetMessageId(_client.UniqueIdGenerator.Create()); + msg.Content = new MemoryStream(data); + + if (messageHeaders != null) + foreach (KeyValuePair pair in messageHeaders) + msg.AddHeader(pair.Key, pair.Value); + + return await _client.SendAndWaitForAcknowledge(msg, waitForAcknowledge); + } + + /// + /// Publishes a JSON object to a router + /// + public Task PublishJson(object model, bool waitForAcknowledge = false, + IEnumerable> messageHeaders = null) + { + return PublishJson(null, model, waitForAcknowledge, null, messageHeaders); + } + + /// + /// Publishes a JSON object to a router + /// + public async Task PublishJson(string routerName, + object model, + bool waitForAcknowledge = false, + ushort? contentType = null, + IEnumerable> messageHeaders = null) + { + TypeDeliveryDescriptor descriptor = _client.DeliveryContainer.GetDescriptor(model.GetType()); + TmqMessage message = descriptor.CreateMessage(MessageType.Router, routerName, contentType); + + message.PendingAcknowledge = waitForAcknowledge; + message.SetMessageId(_client.UniqueIdGenerator.Create()); + message.Serialize(model, _client.JsonSerializer); + + if (messageHeaders != null) + foreach (KeyValuePair pair in messageHeaders) + message.AddHeader(pair.Key, pair.Value); + + return await _client.SendAndWaitForAcknowledge(message, waitForAcknowledge); + } + + /// + /// Sends a string request to router. + /// Waits response from at least one binding. + /// + public async Task PublishRequest(string routerName, string message, ushort contentType = 0, + IEnumerable> messageHeaders = null) + { + TmqMessage msg = new TmqMessage(MessageType.Router, routerName, contentType); + msg.PendingResponse = true; + msg.Content = new MemoryStream(Encoding.UTF8.GetBytes(message)); + + if (messageHeaders != null) + foreach (KeyValuePair pair in messageHeaders) + msg.AddHeader(pair.Key, pair.Value); + + return await _client.Request(msg); + } + + /// + /// Sends a request to router. + /// Waits response from at least one binding. + /// + public Task> PublishRequestJson(TRequest request, + IEnumerable> messageHeaders = null) + { + return PublishRequestJson(null, request, null, messageHeaders); + } + + /// + /// Sends a request to router. + /// Waits response from at least one binding. + /// + public async Task> PublishRequestJson(string routerName, TRequest request, ushort? contentType = null, + IEnumerable> messageHeaders = null) + { + TypeDeliveryDescriptor descriptor = _client.DeliveryContainer.GetDescriptor(request.GetType()); + TmqMessage message = descriptor.CreateMessage(MessageType.Router, routerName, contentType); + message.PendingResponse = true; + message.Serialize(request, _client.JsonSerializer); + + if (messageHeaders != null) + foreach (KeyValuePair pair in messageHeaders) + message.AddHeader(pair.Key, pair.Value); + + TmqMessage responseMessage = await _client.Request(message); + if (responseMessage.ContentType == 0) + { + TResponse response = responseMessage.Deserialize(_client.JsonSerializer); + return new TwinoResult(response, message, TwinoResultCode.Ok); + } + + return new TwinoResult(default, responseMessage, (TwinoResultCode) responseMessage.ContentType); + } + + #endregion + } } \ No newline at end of file diff --git a/src/Twino.Client.TMQ/Twino.Client.TMQ.csproj b/src/Twino.Client.TMQ/Twino.Client.TMQ.csproj index fe003be3..e01a3934 100644 --- a/src/Twino.Client.TMQ/Twino.Client.TMQ.csproj +++ b/src/Twino.Client.TMQ/Twino.Client.TMQ.csproj @@ -6,9 +6,9 @@ Twino.Client.TMQ Twino Messaging Queue Client to connect all TMQ Servers twino,tmq,client,mq,messaging,queue - 3.6.10 - 3.6.10 - 3.6.10 + 3.7.0 + 3.7.0 + 3.7.0 true Mehmet Helvacıköylü;Emre Hızlı https://github.com/twino-framework/twino-mq diff --git a/src/Twino.MQ.Data/Twino.MQ.Data.csproj b/src/Twino.MQ.Data/Twino.MQ.Data.csproj index cc985b3d..e4b2c230 100644 --- a/src/Twino.MQ.Data/Twino.MQ.Data.csproj +++ b/src/Twino.MQ.Data/Twino.MQ.Data.csproj @@ -6,9 +6,9 @@ Twino.MQ.Data Persistant queue message data library for Twino MQ twino,server,messaging,queue,mq,persistent,database,db - 3.6.10 - 3.6.10 - 3.6.10 + 3.7.0 + 3.7.0 + 3.7.0 true Mehmet Helvacıköylü;Emre Hızlı https://github.com/twino-framework/twino-mq diff --git a/src/Twino.MQ/Channel.cs b/src/Twino.MQ/Channel.cs index d3a5683b..37c95efa 100644 --- a/src/Twino.MQ/Channel.cs +++ b/src/Twino.MQ/Channel.cs @@ -249,6 +249,10 @@ public Task CreateQueue(ushort queueId, ChannelQueueOptions option string queueStatus = requestMessage.FindHeader(TmqHeaders.QUEUE_STATUS); if (queueStatus != null) options.Status = QueueStatusHelper.FindStatus(queueStatus); + + string tag = requestMessage.FindHeader(TmqHeaders.QUEUE_TAG); + if (!string.IsNullOrEmpty(tag)) + options.TagName = tag; } queue = new ChannelQueue(this, queueId, options); diff --git a/src/Twino.MQ/Network/ServerMessageHandler.cs b/src/Twino.MQ/Network/ServerMessageHandler.cs index aab24d8f..b36e4ca9 100644 --- a/src/Twino.MQ/Network/ServerMessageHandler.cs +++ b/src/Twino.MQ/Network/ServerMessageHandler.cs @@ -9,6 +9,7 @@ using Twino.MQ.Helpers; using Twino.MQ.Options; using Twino.MQ.Queues; +using Twino.MQ.Routing; using Twino.Protocols.TMQ; using Twino.Protocols.TMQ.Models; @@ -112,6 +113,30 @@ private Task HandleUnsafe(MqClient client, TmqMessage message) case KnownContentTypes.ClientList: return GetClients(client, message); + //lists all routers + case KnownContentTypes.ListRouters: + return ListRouters(client, message); + + //creates new router + case KnownContentTypes.CreateRouter: + return CreateRouter(client, message); + + //removes a router + case KnownContentTypes.RemoveRouter: + return RemoveRouter(client, message); + + //lists all bindings of a router + case KnownContentTypes.ListBindings: + return ListRouterBindings(client, message); + + //adds new binding to a router + case KnownContentTypes.AddBinding: + return CreateRouterBinding(client, message); + + //removes a binding from a router + case KnownContentTypes.RemoveBinding: + return RemoveRouterBinding(client, message); + //for not-defines content types, use user-defined message handler default: if (_server.ServerMessageHandler != null) @@ -919,5 +944,229 @@ public async Task GetClients(MqClient client, TmqMessage message) } #endregion + + #region Router + + /// + /// Creates new router + /// + private async Task CreateRouter(MqClient client, TmqMessage message) + { + IRouter found = _server.FindRouter(message.Target); + if (found != null) + { + await client.SendAsync(message.CreateResponse(TwinoResultCode.Ok)); + return; + } + + string methodHeader = message.FindHeader(TmqHeaders.ROUTE_METHOD); + RouteMethod method = RouteMethod.Distribute; + if (!string.IsNullOrEmpty(methodHeader)) + method = (RouteMethod) Convert.ToInt32(methodHeader); + + //check create channel access + if (_server.Authorization != null) + { + bool grant = await _server.Authorization.CanCreateRouter(client, message.Target, method); + if (!grant) + { + await client.SendAsync(message.CreateResponse(TwinoResultCode.Unauthorized)); + return; + } + } + + _server.AddRouter(message.Target, method); + await client.SendAsync(message.CreateResponse(TwinoResultCode.Ok)); + } + + /// + /// Removes a router with it's bindings + /// + private async Task RemoveRouter(MqClient client, TmqMessage message) + { + IRouter found = _server.FindRouter(message.Target); + if (found == null) + { + await client.SendAsync(message.CreateResponse(TwinoResultCode.Ok)); + return; + } + + //check create channel access + if (_server.Authorization != null) + { + bool grant = await _server.Authorization.CanRemoveRouter(client, found); + if (!grant) + { + await client.SendAsync(message.CreateResponse(TwinoResultCode.Unauthorized)); + return; + } + } + + _server.RemoveRouter(found); + await client.SendAsync(message.CreateResponse(TwinoResultCode.Ok)); + } + + /// + /// Sends all routers + /// + private async Task ListRouters(MqClient client, TmqMessage message) + { + List items = new List(); + foreach (IRouter router in _server.Routers) + { + RouterInformation info = new RouterInformation + { + Name = router.Name, + IsEnabled = router.IsEnabled + }; + + if (router is Router r) + info.Method = r.Method; + + items.Add(info); + } + + TmqMessage response = message.CreateResponse(TwinoResultCode.Ok); + response.Serialize(items, new NewtonsoftContentSerializer()); + await client.SendAsync(response); + } + + /// + /// Creates new binding for a router + /// + private async Task CreateRouterBinding(MqClient client, TmqMessage message) + { + IRouter router = _server.FindRouter(message.Target); + if (router == null) + { + await client.SendAsync(message.CreateResponse(TwinoResultCode.NotFound)); + return; + } + + BindingInformation info = message.Deserialize(new NewtonsoftContentSerializer()); + + //check create channel access + if (_server.Authorization != null) + { + bool grant = await _server.Authorization.CanCreateBinding(client, router, info); + if (!grant) + { + await client.SendAsync(message.CreateResponse(TwinoResultCode.Unauthorized)); + return; + } + } + + switch (info.BindingType) + { + case BindingType.Direct: + router.AddBinding(new DirectBinding(info.Name, info.Target, info.ContentType, info.Priority, info.Interaction, info.Method)); + break; + + case BindingType.Queue: + router.AddBinding(new QueueBinding(info.Name, info.Target, info.ContentType ?? 0, info.Priority, info.Interaction)); + break; + + case BindingType.Http: + router.AddBinding(new HttpBinding(info.Name, info.Target, (HttpBindingMethod) (info.ContentType ?? 0), info.Priority, info.Interaction)); + break; + + case BindingType.Tag: + router.AddBinding(new TagBinding(info.Name, info.Target, info.ContentType ?? 0, info.Priority, info.Interaction, info.Method)); + break; + } + + await client.SendAsync(message.CreateResponse(TwinoResultCode.Ok)); + } + + /// + /// Removes a router with it's bindings + /// + private async Task RemoveRouterBinding(MqClient client, TmqMessage message) + { + IRouter router = _server.FindRouter(message.Target); + if (router == null) + { + await client.SendAsync(message.CreateResponse(TwinoResultCode.NotFound)); + return; + } + + string name = message.FindHeader(TmqHeaders.BINDING_NAME); + if (string.IsNullOrEmpty(name)) + { + await client.SendAsync(message.CreateResponse(TwinoResultCode.NotFound)); + return; + } + + Binding[] bindings = router.GetBindings(); + Binding binding = bindings.FirstOrDefault(x => x.Name == name); + if (binding == null) + { + await client.SendAsync(message.CreateResponse(TwinoResultCode.NotFound)); + return; + } + + //check create channel access + if (_server.Authorization != null) + { + bool grant = await _server.Authorization.CanRemoveBinding(client, binding); + if (!grant) + { + await client.SendAsync(message.CreateResponse(TwinoResultCode.Unauthorized)); + return; + } + } + + router.RemoveBinding(binding); + await client.SendAsync(message.CreateResponse(TwinoResultCode.Ok)); + } + + /// + /// Sends all bindings of a router + /// + private async Task ListRouterBindings(MqClient client, TmqMessage message) + { + IRouter router = _server.FindRouter(message.Target); + if (router == null) + { + await client.SendAsync(message.CreateResponse(TwinoResultCode.NotFound)); + return; + } + + List items = new List(); + foreach (Binding binding in router.GetBindings()) + { + BindingInformation info = new BindingInformation + { + Name = binding.Name, + Target = binding.Target, + Priority = binding.Priority, + ContentType = binding.ContentType, + Interaction = binding.Interaction + }; + + if (binding is QueueBinding) + info.BindingType = BindingType.Queue; + else if (binding is DirectBinding directBinding) + { + info.Method = directBinding.RouteMethod; + info.BindingType = BindingType.Direct; + } + else if (binding is TagBinding tagBinding) + { + info.Method = tagBinding.RouteMethod; + info.BindingType = BindingType.Tag; + } + else if (binding is HttpBinding) + info.BindingType = BindingType.Http; + + items.Add(info); + } + + TmqMessage response = message.CreateResponse(TwinoResultCode.Ok); + response.Serialize(items, new NewtonsoftContentSerializer()); + await client.SendAsync(response); + } + + #endregion } } \ No newline at end of file diff --git a/src/Twino.MQ/Queues/States/BroadcastQueueState.cs b/src/Twino.MQ/Queues/States/BroadcastQueueState.cs index 487e097e..76a0c2a2 100644 --- a/src/Twino.MQ/Queues/States/BroadcastQueueState.cs +++ b/src/Twino.MQ/Queues/States/BroadcastQueueState.cs @@ -91,39 +91,31 @@ private async Task ProcessMessage(QueueMessage message) delivery.FirstAcquirer = message.Message.FirstAcquirer; //send the message - bool sent = await client.Client.SendAsync(messageData); - - if (sent) - { - messageIsSent = true; - - //adds the delivery to time keeper to check timing up - _queue.TimeKeeper.AddAcknowledgeCheck(delivery); - - //set as sent, if message is sent to it's first acquirer, - //set message first acquirer false and re-create byte array data of the message - bool firstAcquirer = message.Message.FirstAcquirer; - - //mark message is sent - delivery.MarkAsSent(); - - //do after send operations for per message - _queue.Info.AddDelivery(); - Decision d = await _queue.DeliveryHandler.ConsumerReceived(_queue, delivery, client.Client); - final = ChannelQueue.CreateFinalDecision(final, d); - - //if we are sending to only first acquirer, break - if (_queue.Options.SendOnlyFirstAcquirer && firstAcquirer) - break; - - if (firstAcquirer && clients.Count > 1) - messageData = TmqWriter.Create(message.Message); - } - else - { - Decision d = await _queue.DeliveryHandler.ConsumerReceiveFailed(_queue, delivery, client.Client); - final = ChannelQueue.CreateFinalDecision(final, d); - } + _ = client.Client.SendAsync(messageData); + + messageIsSent = true; + + //adds the delivery to time keeper to check timing up + _queue.TimeKeeper.AddAcknowledgeCheck(delivery); + + //set as sent, if message is sent to it's first acquirer, + //set message first acquirer false and re-create byte array data of the message + bool firstAcquirer = message.Message.FirstAcquirer; + + //mark message is sent + delivery.MarkAsSent(); + + //do after send operations for per message + _queue.Info.AddDelivery(); + Decision d = await _queue.DeliveryHandler.ConsumerReceived(_queue, delivery, client.Client); + final = ChannelQueue.CreateFinalDecision(final, d); + + //if we are sending to only first acquirer, break + if (_queue.Options.SendOnlyFirstAcquirer && firstAcquirer) + break; + + if (firstAcquirer && clients.Count > 1) + messageData = TmqWriter.Create(message.Message); } message.Decision = final; diff --git a/src/Twino.MQ/Routing/HttpBindingMethod.cs b/src/Twino.MQ/Routing/HttpBindingMethod.cs index 82a86101..bdace24a 100644 --- a/src/Twino.MQ/Routing/HttpBindingMethod.cs +++ b/src/Twino.MQ/Routing/HttpBindingMethod.cs @@ -8,26 +8,26 @@ public enum HttpBindingMethod : ushort /// /// "GET" /// - Get, + Get = 0, /// /// "POST" /// - Post, + Post = 1, /// /// "PUT" /// - Put, + Put = 2, /// /// "PATCH" /// - Patch, + Patch = 3, /// /// "DELETE" /// - Delete + Delete = 4 } } \ No newline at end of file diff --git a/src/Twino.MQ/Routing/IRouter.cs b/src/Twino.MQ/Routing/IRouter.cs index d89d8cac..d48fe87a 100644 --- a/src/Twino.MQ/Routing/IRouter.cs +++ b/src/Twino.MQ/Routing/IRouter.cs @@ -25,6 +25,11 @@ public interface IRouter /// bool IsEnabled { get; set; } + /// + /// Gets all bindings of router + /// + Binding[] GetBindings(); + /// /// Adds new binding to router /// diff --git a/src/Twino.MQ/Routing/Router.cs b/src/Twino.MQ/Routing/Router.cs index e363b3fc..e53cd9e1 100644 --- a/src/Twino.MQ/Routing/Router.cs +++ b/src/Twino.MQ/Routing/Router.cs @@ -63,6 +63,14 @@ public Router(TwinoMQ server, string name, RouteMethod method) #region Add - Remove + /// + /// Returns all bindings of router + /// + public Binding[] GetBindings() + { + return Bindings; + } + /// /// Adds new binding to router /// @@ -188,7 +196,7 @@ private async Task Distribute(MqClient sender, TmqMessage m { if (binding.Interaction != BindingInteraction.None) result = RouterPublishResult.OkAndWillBeRespond; - + else if (result == RouterPublishResult.NoReceivers) result = RouterPublishResult.OkWillNotRespond; } diff --git a/src/Twino.MQ/Routing/TagBinding.cs b/src/Twino.MQ/Routing/TagBinding.cs new file mode 100644 index 00000000..efb60a15 --- /dev/null +++ b/src/Twino.MQ/Routing/TagBinding.cs @@ -0,0 +1,91 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Twino.MQ.Clients; +using Twino.MQ.Helpers; +using Twino.MQ.Queues; +using Twino.Protocols.TMQ; + +namespace Twino.MQ.Routing +{ + /// + /// Tag binding targets channel queues with tag name. + /// Binding can send message to multiple queues with same tag name. + /// Binding receivers are received messages as QueueMessage. + /// + public class TagBinding : Binding + { + private ChannelQueue[] _targetQueues; + private DateTime _queueUpdateTime; + private readonly TimeSpan _queueCacheDuration = TimeSpan.FromMilliseconds(1000); + private readonly IUniqueIdGenerator _idGenerator = new DefaultUniqueIdGenerator(); + + /// + /// Tag binding routing method + /// + public RouteMethod RouteMethod { get; set; } + + /// + /// Creates new direct binding. + /// Name is the name of the binding. + /// Target is the tag of channels.. + /// Content Type should be Queue Id. + /// Priority for router binding. + /// + public TagBinding(string name, string target, ushort? contentType, int priority, BindingInteraction interaction, + RouteMethod routeMethod = RouteMethod.Distribute) + : base(name, target, contentType, priority, interaction) + { + RouteMethod = routeMethod; + } + + /// + /// Sends the message to binding receivers + /// + public override Task Send(MqClient sender, TmqMessage message) + { + if (DateTime.UtcNow - _queueUpdateTime > _queueCacheDuration) + RefreshQueueCache(); + + message.PendingAcknowledge = false; + message.PendingResponse = false; + if (Interaction == BindingInteraction.Acknowledge) + message.PendingAcknowledge = true; + else if (Interaction == BindingInteraction.Response) + message.PendingResponse = true; + + bool sent = false; + foreach (ChannelQueue queue in _targetQueues) + { + string messageId = sent || Interaction == BindingInteraction.None + ? message.MessageId + : _idGenerator.Create(); + + if (!sent) + sent = true; + + TmqMessage msg = message.Clone(true, true, messageId); + QueueMessage queueMessage = new QueueMessage(msg); + queue.AddMessage(queueMessage); + } + + return Task.FromResult(sent); + } + + private void RefreshQueueCache() + { + _queueUpdateTime = DateTime.UtcNow; + List list = new List(); + + foreach (Channel channel in Router.Server.Channels) + { + IEnumerable queues = channel.QueuesClone.Where(x => x.TagName != null && Filter.CheckMatch(x.TagName, Target)); + foreach (ChannelQueue queue in queues) + list.Add(queue); + } + + _targetQueues = list.ToArray(); + } + } +} \ No newline at end of file diff --git a/src/Twino.MQ/Security/IClientAuthorization.cs b/src/Twino.MQ/Security/IClientAuthorization.cs index b4ef0b67..11bc8564 100644 --- a/src/Twino.MQ/Security/IClientAuthorization.cs +++ b/src/Twino.MQ/Security/IClientAuthorization.cs @@ -1,7 +1,9 @@ using System.Threading.Tasks; using Twino.MQ.Clients; using Twino.MQ.Queues; +using Twino.MQ.Routing; using Twino.Protocols.TMQ; +using Twino.Protocols.TMQ.Models; namespace Twino.MQ.Security { @@ -39,10 +41,30 @@ public interface IClientAuthorization /// Returns true, if client can pull a message from the queue /// Task CanPullFromQueue(ChannelClient client, ChannelQueue queue); - + /// /// Returns true, if client can subscribe to the event /// bool CanSubscribeEvent(MqClient client, string eventName, string channelName, ushort queueId); + + /// + /// Returns true, if client can create a router + /// + Task CanCreateRouter(MqClient client, string routerName, RouteMethod method); + + /// + /// Returns true, if client can remove a router + /// + Task CanRemoveRouter(MqClient client, IRouter router); + + /// + /// Returns true, if client can create a binding in a router + /// + Task CanCreateBinding(MqClient client, IRouter router, BindingInformation binding); + + /// + /// Returns true, if client can remove a binding from a router + /// + Task CanRemoveBinding(MqClient client, Binding binding); } } \ No newline at end of file diff --git a/src/Twino.MQ/Twino.MQ.csproj b/src/Twino.MQ/Twino.MQ.csproj index 311c2544..ce56aeb3 100644 --- a/src/Twino.MQ/Twino.MQ.csproj +++ b/src/Twino.MQ/Twino.MQ.csproj @@ -6,9 +6,9 @@ Twino.MQ Messaging Queue Server library with TMQ Protocol via Twino Server twino,server,tmq,messaging,queue,mq - 3.6.10 - 3.6.10 - 3.6.10 + 3.7.0 + 3.7.0 + 3.7.0 true Mehmet Helvacıköylü;Emre Hızlı https://github.com/twino-framework/twino-mq diff --git a/src/Twino.MQ/Routing/BindingInteraction.cs b/src/Twino.Protocols.TMQ/BindingInteraction.cs similarity index 93% rename from src/Twino.MQ/Routing/BindingInteraction.cs rename to src/Twino.Protocols.TMQ/BindingInteraction.cs index 44af718b..29ce909a 100644 --- a/src/Twino.MQ/Routing/BindingInteraction.cs +++ b/src/Twino.Protocols.TMQ/BindingInteraction.cs @@ -1,4 +1,4 @@ -namespace Twino.MQ.Routing +namespace Twino.Protocols.TMQ { /// /// Options for pending acknowledge or response from binding receiver diff --git a/src/Twino.Protocols.TMQ/BindingType.cs b/src/Twino.Protocols.TMQ/BindingType.cs new file mode 100644 index 00000000..d6b16483 --- /dev/null +++ b/src/Twino.Protocols.TMQ/BindingType.cs @@ -0,0 +1,29 @@ +namespace Twino.Protocols.TMQ +{ + /// + /// Route binding types + /// + public enum BindingType + { + /// + /// Direct message binding. + /// Messages are sent to direct receivers without queue operation. + /// + Direct = 0, + + /// + /// Messages are pushed into queue + /// + Queue = 1, + + /// + /// Messages are sent to an HTTP endpoint as JSON request + /// + Http = 2, + + /// + /// Messages are pushed to queues with specified tag name + /// + Tag = 3 + } +} \ No newline at end of file diff --git a/src/Twino.Protocols.TMQ/KnownContentTypes.cs b/src/Twino.Protocols.TMQ/KnownContentTypes.cs index da619293..df92a923 100644 --- a/src/Twino.Protocols.TMQ/KnownContentTypes.cs +++ b/src/Twino.Protocols.TMQ/KnownContentTypes.cs @@ -140,5 +140,34 @@ public class KnownContentTypes /// public const ushort DecisionOverNode = 641; + /// + /// "651" Gets all rouuters + /// + public const ushort ListRouters = 651; + + /// + /// "652" Creates new router + /// + public const ushort CreateRouter = 652; + + /// + /// "653" Removes a router with it's bindings + /// + public const ushort RemoveRouter = 653; + + /// + /// "661" List all bindings of a router + /// + public const ushort ListBindings = 661; + + /// + /// "662" Creates new binding in a router + /// + public const ushort AddBinding = 662; + + /// + /// "663" Removes a binding from a router + /// + public const ushort RemoveBinding = 663; } } \ No newline at end of file diff --git a/src/Twino.Protocols.TMQ/Models/BindingInformation.cs b/src/Twino.Protocols.TMQ/Models/BindingInformation.cs new file mode 100644 index 00000000..f8cc9a78 --- /dev/null +++ b/src/Twino.Protocols.TMQ/Models/BindingInformation.cs @@ -0,0 +1,63 @@ +using System.Text.Json.Serialization; +using Newtonsoft.Json; + +namespace Twino.Protocols.TMQ.Models +{ + /// + /// Router binding information + /// + public class BindingInformation + { + /// + /// Unique name of the binding + /// + [JsonProperty("Name")] + [JsonPropertyName("Name")] + public string Name { get; set; } + + /// + /// Binding target name. + /// For queue bindings, channel name. + /// For direct bindings client id, type or name. + /// + [JsonProperty("Target")] + [JsonPropertyName("Target")] + public string Target { get; set; } + + /// + /// Binding content type. + /// Null, passes same content type from producer to receiver + /// + [JsonProperty("ContentType")] + [JsonPropertyName("ContentType")] + public ushort? ContentType { get; set; } + + /// + /// Binding priority + /// + [JsonProperty("Priority")] + [JsonPropertyName("Priority")] + public int Priority { get; set; } + + /// + /// Binding interaction type + /// + [JsonProperty("Name")] + [JsonPropertyName("Name")] + public BindingInteraction Interaction { get; set; } + + /// + /// Binding type + /// + [JsonProperty("BindingType")] + [JsonPropertyName("BindingType")] + public BindingType BindingType { get; set; } + + /// + /// Routing method in binding + /// + [JsonProperty("Method")] + [JsonPropertyName("Method")] + public RouteMethod Method { get; set; } + } +} \ No newline at end of file diff --git a/src/Twino.Protocols.TMQ/Models/ChannelInformation.cs b/src/Twino.Protocols.TMQ/Models/ChannelInformation.cs index 0a667bb7..588ec8ff 100644 --- a/src/Twino.Protocols.TMQ/Models/ChannelInformation.cs +++ b/src/Twino.Protocols.TMQ/Models/ChannelInformation.cs @@ -1,4 +1,5 @@ using System.Text.Json.Serialization; +using Newtonsoft.Json; namespace Twino.Protocols.TMQ.Models { @@ -10,12 +11,14 @@ public class ChannelInformation /// /// Channel name /// + [JsonProperty("Name")] [JsonPropertyName("Name")] public string Name { get; set; } /// /// Active queues in the channel /// + [JsonProperty("Queues")] [JsonPropertyName("Queues")] public ushort[] Queues { get; set; } @@ -23,60 +26,70 @@ public class ChannelInformation /// If true, channel can have multiple queues with multiple content type. /// If false, each channel can only have one queue /// + [JsonProperty("AllowMultipleQueues")] [JsonPropertyName("AllowMultipleQueues")] public bool AllowMultipleQueues { get; set; } = true; /// /// Allowed queue id list for the channel /// + [JsonProperty("AllowedQueues")] [JsonPropertyName("AllowedQueues")] public ushort[] AllowedQueues { get; set; } /// /// If true, messages will send to only first acquirers /// + [JsonProperty("OnlyFirstAcquirer")] [JsonPropertyName("OnlyFirstAcquirer")] public bool OnlyFirstAcquirer { get; set; } /// /// If true, messages will request acknowledge from receivers /// + [JsonProperty("RequestAcknowledge")] [JsonPropertyName("RequestAcknowledge")] public bool RequestAcknowledge { get; set; } /// /// When acknowledge is required, maximum duration for waiting acknowledge message /// + [JsonProperty("AcknowledgeTimeout")] [JsonPropertyName("AcknowledgeTimeout")] public int AcknowledgeTimeout { get; set; } /// /// When message queuing is active, maximum time for a message wait /// + [JsonProperty("MessageTimeout")] [JsonPropertyName("MessageTimeout")] public int MessageTimeout { get; set; } /// /// If true, server creates unique id for each message. /// + [JsonProperty("UseMessageId")] [JsonPropertyName("UseMessageId")] public bool UseMessageId { get; set; } = true; /// /// If true, queue does not send next message to receivers until acknowledge message received /// + [JsonProperty("WaitForAcknowledge")] [JsonPropertyName("WaitForAcknowledge")] public bool WaitForAcknowledge { get; set; } /// /// If true, server doesn't send client name to receivers in queueus. /// + [JsonProperty("HideClientNames")] [JsonPropertyName("HideClientNames")] public bool HideClientNames { get; set; } /// /// Online and subscribed clients in the channel /// + [JsonProperty("ActiveClients")] [JsonPropertyName("ActiveClients")] public int ActiveClients { get; set; } @@ -84,6 +97,7 @@ public class ChannelInformation /// Maximum client limit of the channel /// Zero is unlimited /// + [JsonProperty("ClientLimit")] [JsonPropertyName("ClientLimit")] public int ClientLimit { get; set; } @@ -91,12 +105,14 @@ public class ChannelInformation /// Maximum queue limit of the channel /// Zero is unlimited /// + [JsonProperty("QueueLimit")] [JsonPropertyName("QueueLimit")] public int QueueLimit { get; set; } /// /// If true, channel will be destroyed when there are no messages in queues and there are no consumers available /// + [JsonProperty("DestroyWhenEmpty")] [JsonPropertyName("DestroyWhenEmpty")] public bool DestroyWhenEmpty { get; set; } diff --git a/src/Twino.Protocols.TMQ/Models/ClientInformation.cs b/src/Twino.Protocols.TMQ/Models/ClientInformation.cs index 977ed32d..11b3bcfc 100644 --- a/src/Twino.Protocols.TMQ/Models/ClientInformation.cs +++ b/src/Twino.Protocols.TMQ/Models/ClientInformation.cs @@ -1,4 +1,5 @@ using System.Text.Json.Serialization; +using Newtonsoft.Json; namespace Twino.Protocols.TMQ.Models { @@ -11,12 +12,14 @@ public class ClientInformation /// Client's unique id /// If it's null or empty, server will create new unique id for the client. /// + [JsonProperty("Id")] [JsonPropertyName("Id")] public string Id { get; set; } /// /// Client name /// + [JsonProperty("Name")] [JsonPropertyName("Name")] public string Name { get; set; } @@ -24,18 +27,21 @@ public class ClientInformation /// Client type. /// If different type of clients join your server, you can categorize them with this type value /// + [JsonProperty("Type")] [JsonPropertyName("Type")] public string Type { get; set; } /// /// Total online duration of client in milliseconds /// + [JsonProperty("Online")] [JsonPropertyName("Online")] public long Online { get; set; } /// /// If true, client authenticated by server's IClientAuthenticator implementation /// + [JsonProperty("IsAuthenticated")] [JsonPropertyName("IsAuthenticated")] public bool IsAuthenticated { get; set; } } diff --git a/src/Twino.Protocols.TMQ/Models/QueueInformation.cs b/src/Twino.Protocols.TMQ/Models/QueueInformation.cs index d0dc2a97..487f7d40 100644 --- a/src/Twino.Protocols.TMQ/Models/QueueInformation.cs +++ b/src/Twino.Protocols.TMQ/Models/QueueInformation.cs @@ -1,4 +1,5 @@ using System.Text.Json.Serialization; +using Newtonsoft.Json; namespace Twino.Protocols.TMQ.Models { @@ -10,144 +11,168 @@ public class QueueInformation /// /// Queue channel name /// + [JsonProperty("Channel")] [JsonPropertyName("Channel")] public string Channel { get; set; } /// /// Queue tag name /// + [JsonProperty("TagName")] [JsonPropertyName("TagName")] public string TagName { get; set; } /// /// Queue id /// + [JsonProperty("Id")] [JsonPropertyName("Id")] public ushort Id { get; set; } /// /// Pending high priority messages in the queue /// + [JsonProperty("PriorityMessages")] [JsonPropertyName("PriorityMessages")] public int PriorityMessages { get; set; } /// /// Pending regular messages in the queue /// + [JsonProperty("Messages")] [JsonPropertyName("Messages")] public int Messages { get; set; } /// /// Queue current status /// + [JsonProperty("Status")] [JsonPropertyName("Status")] public string Status { get; set; } /// /// If true, messages will send to only first acquirers /// + [JsonProperty("OnlyFirstAcquirer")] [JsonPropertyName("OnlyFirstAcquirer")] public bool OnlyFirstAcquirer { get; set; } /// /// If true, messages will request acknowledge from receivers /// + [JsonProperty("RequestAcknowledge")] [JsonPropertyName("RequestAcknowledge")] public bool RequestAcknowledge { get; set; } /// /// When acknowledge is required, maximum duration for waiting acknowledge message /// + [JsonProperty("AcknowledgeTimeout")] [JsonPropertyName("AcknowledgeTimeout")] public int AcknowledgeTimeout { get; set; } /// /// When message queuing is active, maximum time for a message wait /// + [JsonProperty("MessageTimeout")] [JsonPropertyName("MessageTimeout")] public int MessageTimeout { get; set; } /// /// If true, server creates unique id for each message. /// + [JsonProperty("UseMessageId")] [JsonPropertyName("UseMessageId")] public bool UseMessageId { get; set; } = true; /// /// If true, queue does not send next message to receivers until acknowledge message received /// + [JsonProperty("WaitForAcknowledge")] [JsonPropertyName("WaitForAcknowledge")] public bool WaitForAcknowledge { get; set; } /// /// If true, server doesn't send client name to receivers in queueus. /// + [JsonProperty("HideClientNames")] [JsonPropertyName("HideClientNames")] public bool HideClientNames { get; set; } /// /// Total messages received from producers /// + [JsonProperty("ReceivedMessages")] [JsonPropertyName("ReceivedMessages")] public long ReceivedMessages { get; set; } /// /// Total messages sent to consumers /// + [JsonProperty("SentMessages")] [JsonPropertyName("SentMessages")] public long SentMessages { get; set; } /// /// Total message send operation each message to each consumer /// + [JsonProperty("Deliveries")] [JsonPropertyName("Deliveries")] public long Deliveries { get; set; } /// /// Total unacknowledged messages /// + [JsonProperty("NegativeAcks")] [JsonPropertyName("NegativeAcks")] public long NegativeAcks { get; set; } /// /// Total acknowledged messages /// + [JsonProperty("Acks")] [JsonPropertyName("Acks")] public long Acks { get; set; } /// /// Total timed out messages /// + [JsonProperty("TimeoutMessages")] [JsonPropertyName("TimeoutMessages")] public long TimeoutMessages { get; set; } /// /// Total saved messages /// + [JsonProperty("SavedMessages")] [JsonPropertyName("SavedMessages")] public long SavedMessages { get; set; } /// /// Total removed messages /// + [JsonProperty("RemovedMessages")] [JsonPropertyName("RemovedMessages")] public long RemovedMessages { get; set; } /// /// Total error count /// + [JsonProperty("Errors")] [JsonPropertyName("Errors")] public long Errors { get; set; } /// /// Last message receive date in UNIX milliseconds /// + [JsonProperty("LastMessageReceived")] [JsonPropertyName("LastMessageReceived")] public long LastMessageReceived { get; set; } /// /// Last message send date in UNIX milliseconds /// + [JsonProperty("LastMessageSent")] [JsonPropertyName("LastMessageSent")] public long LastMessageSent { get; set; } @@ -155,6 +180,7 @@ public class QueueInformation /// Maximum message limit of the queue /// Zero is unlimited /// + [JsonProperty("MessageLimit")] [JsonPropertyName("MessageLimit")] public int MessageLimit { get; set; } @@ -162,6 +188,7 @@ public class QueueInformation /// Maximum message size limit /// Zero is unlimited /// + [JsonProperty("MessageSizeLimit")] [JsonPropertyName("MessageSizeLimit")] public ulong MessageSizeLimit { get; set; } } diff --git a/src/Twino.Protocols.TMQ/Models/RouterInformation.cs b/src/Twino.Protocols.TMQ/Models/RouterInformation.cs new file mode 100644 index 00000000..d13a30c7 --- /dev/null +++ b/src/Twino.Protocols.TMQ/Models/RouterInformation.cs @@ -0,0 +1,35 @@ +using System.Text.Json.Serialization; +using Newtonsoft.Json; + +namespace Twino.Protocols.TMQ.Models +{ + /// + /// Router Information + /// + public class RouterInformation + { + /// + /// Route name. + /// Must be unique. + /// Can't include " ", "*" or ";" + /// + [JsonProperty("Name")] + [JsonPropertyName("Name")] + public string Name { get; set; } + + /// + /// If true, messages are routed to bindings. + /// If false, messages are not routed. + /// + [JsonProperty("IsEnabled")] + [JsonPropertyName("IsEnabled")] + public bool IsEnabled { get; set; } + + /// + /// Route method. Defines how messages will be routed. + /// + [JsonProperty("Method")] + [JsonPropertyName("Method")] + public RouteMethod Method { get; set; } + } +} \ No newline at end of file diff --git a/src/Twino.Protocols.TMQ/PredefinedMessages.cs b/src/Twino.Protocols.TMQ/PredefinedMessages.cs index d7ddbdbe..f334b1c4 100644 --- a/src/Twino.Protocols.TMQ/PredefinedMessages.cs +++ b/src/Twino.Protocols.TMQ/PredefinedMessages.cs @@ -23,7 +23,7 @@ public static class PredefinedMessages public static readonly byte[] PING = { 0x89, 0xFF, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 }; /// - /// PING message for TMQ "0x8A, 0xFF, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00" + /// PONG message for TMQ "0x8A, 0xFF, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00" /// public static readonly byte[] PONG = { 0x8A, 0xFF, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 }; } diff --git a/src/Twino.MQ/Routing/RouteMethod.cs b/src/Twino.Protocols.TMQ/RouteMethod.cs similarity index 86% rename from src/Twino.MQ/Routing/RouteMethod.cs rename to src/Twino.Protocols.TMQ/RouteMethod.cs index 25d21593..913b979a 100644 --- a/src/Twino.MQ/Routing/RouteMethod.cs +++ b/src/Twino.Protocols.TMQ/RouteMethod.cs @@ -1,4 +1,4 @@ -namespace Twino.MQ.Routing +namespace Twino.Protocols.TMQ { /// /// Describes how messages are routed @@ -8,12 +8,12 @@ public enum RouteMethod /// /// Routes each message to all bindings /// - Distribute, + Distribute = 0, /// /// Routes each message to only one binding /// - RoundRobin, + RoundRobin = 1, /// /// Routes message to only first binding. @@ -21,6 +21,6 @@ public enum RouteMethod /// Messages are sent to only one active queue when it exists. /// When it's removed messages are sent to other queue while it's active. /// - OnlyFirst + OnlyFirst = 2 } } \ No newline at end of file diff --git a/src/Twino.Protocols.TMQ/TmqHeaders.cs b/src/Twino.Protocols.TMQ/TmqHeaders.cs index d73f66b2..4214044a 100644 --- a/src/Twino.Protocols.TMQ/TmqHeaders.cs +++ b/src/Twino.Protocols.TMQ/TmqHeaders.cs @@ -185,5 +185,20 @@ public class TmqHeaders /// public const string QUEUE_STATUS = "Queue-Status"; + /// + /// "Queue-Tag" + /// + public const string QUEUE_TAG = "Queue-Tag"; + + /// + /// "Route-Method" + /// + public const string ROUTE_METHOD = "Route-Method"; + + /// + /// "Binding-Name" + /// + public const string BINDING_NAME = "Binding-Name"; + } } \ No newline at end of file diff --git a/src/Twino.Protocols.TMQ/Twino.Protocols.TMQ.csproj b/src/Twino.Protocols.TMQ/Twino.Protocols.TMQ.csproj index fdacebcf..8d1fcca4 100644 --- a/src/Twino.Protocols.TMQ/Twino.Protocols.TMQ.csproj +++ b/src/Twino.Protocols.TMQ/Twino.Protocols.TMQ.csproj @@ -6,9 +6,9 @@ Twino.Protocols.TMQ Twino Messaging Queue (TMQ) Protocol library and server extension for Twino Server twino,tcp,server,http,messaging,queue,tmq,mq,protocol - 3.6.10 - 3.6.10 - 3.6.10 + 3.7.0 + 3.7.0 + 3.7.0 true Mehmet Helvacıköylü;Emre Hızlı https://github.com/twino-framework/twino-mq diff --git a/src/Twino.Protocols.TMQ/TwinoResultCode.cs b/src/Twino.Protocols.TMQ/TwinoResultCode.cs index ced0c59f..968f19df 100644 --- a/src/Twino.Protocols.TMQ/TwinoResultCode.cs +++ b/src/Twino.Protocols.TMQ/TwinoResultCode.cs @@ -1,7 +1,7 @@ namespace Twino.Protocols.TMQ { /// - /// TmqClient and TmqAdminClient process result enum + /// TmqClient process result enum /// public enum TwinoResultCode : ushort {