Skip to content

Commit

Permalink
Merge pull request #95 from twino-framework/v3
Browse files Browse the repository at this point in the history
3.6.10
  • Loading branch information
mhelvacikoylu committed Aug 13, 2020
2 parents 5ff5508 + ad85f04 commit e3fb0f5
Show file tree
Hide file tree
Showing 11 changed files with 89 additions and 40 deletions.
36 changes: 36 additions & 0 deletions src/Samples/Sample.Route.Consumer1/ProducerQueue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Sample.Route.Models;
using Twino.Client.TMQ;
using Twino.Client.TMQ.Annotations;
using Twino.Client.TMQ.Bus;
using Twino.Protocols.TMQ;


namespace Sample.Route.Consumer1
{
[AutoAck]
[AutoNack]
public class ProducerQueue: IQueueConsumer<ProduceRequestA>
{
private readonly ITwinoRouteBus _bus;

public ProducerQueue(ITwinoRouteBus bus)
{
_bus = bus;
}

public async Task Consume(TmqMessage message, ProduceRequestA model, TmqClient client)
{
var request = new SampleARequest
{
Name = "A-REQUEST",
Guid = Guid.NewGuid()
};
Console.WriteLine("Received");
await Task.Delay(5000);
// var result = await _bus.Execute<SampleARequest, List<SampleResult>>(request);
}
}
}
2 changes: 1 addition & 1 deletion src/Samples/Sample.Route.Consumer1/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ static void Main(string[] args)
tmq.SetClientType("sample-a-consumer");
tmq.SetClientId("consumer1");
tmq.AddTransientConsumers(typeof(Program));
tmq.EnhanceConnection(c => c.ResponseTimeout = TimeSpan.FromSeconds(555));
tmq.EnhanceConnection(c => c.ResponseTimeout = TimeSpan.FromSeconds(5));
tmq.OnConnected(connector => Console.WriteLine($"CONNECTED => sample-a-consumer"));
});

Expand Down
11 changes: 11 additions & 0 deletions src/Samples/Sample.Route.Models/ProduceRequestA.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using Twino.Client.TMQ.Annotations;
using Twino.Client.TMQ.Models;

namespace Sample.Route.Models
{
[QueueId(1001)]
[ChannelName("PRODUCER_CH")]
[WaitForAcknowledge]
[QueueStatus(MessagingQueueStatus.Push)]
public class ProduceRequestA { }
}
17 changes: 8 additions & 9 deletions src/Samples/Sample.Route.Producer/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,18 @@ static async Task Main(string[] args)
var provider = services.BuildServiceProvider();
provider.UseTwinoBus();

var bus = provider.GetService<ITwinoRouteBus>();
var bus = provider.GetService<ITwinoQueueBus>();

while (true)
int messageCount = 0;
while (messageCount < 4)
{
if (!bus.GetClient().IsConnected) continue;
var request = new SampleARequest
{
Name = "A-REQUEST",
Guid = Guid.NewGuid()
};
var result = await bus.Execute<SampleARequest, List<SampleResult>>(request);
Thread.Sleep(1000);
var pushed = await bus.PushJson(new ProduceRequestA());
messageCount++;
}

while (true)
await Task.Delay(500);
}
}
}
30 changes: 17 additions & 13 deletions src/Twino.Client.TMQ/Internal/RequestHandlerExecuter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ public RequestHandlerExecuter(Type handlerType, ITwinoRequestHandler<TRequest, T

public override async Task Execute(TmqClient client, TmqMessage message, object model)
{
TRequest requestModel = (TRequest) model;
Exception exception = null;
IConsumerFactory consumerFactory = null;
bool respond = false;

try
{
TRequest requestModel = (TRequest) model;
ITwinoRequestHandler<TRequest, TResponse> handler;

if (_handler != null)
Expand All @@ -48,22 +49,12 @@ public override async Task Execute(TmqClient client, TmqMessage message, object
if (responseModel != null)
responseMessage.Serialize(responseModel, client.JsonSerializer);

respond = true;
await client.SendAsync(responseMessage);
}
catch (Exception e)
{
ErrorResponse errorModel;
try
{
errorModel = await handler.OnError(e, requestModel, message, client);
}
catch
{
errorModel = new ErrorResponse
{
ResultCode = TwinoResultCode.InternalServerError
};
}
ErrorResponse errorModel = await handler.OnError(e, requestModel, message, client);

if (errorModel.ResultCode == TwinoResultCode.Ok)
errorModel.ResultCode = TwinoResultCode.Failed;
Expand All @@ -73,12 +64,25 @@ public override async Task Execute(TmqClient client, TmqMessage message, object
if (!string.IsNullOrEmpty(errorModel.Reason))
responseMessage.SetStringContent(errorModel.Reason);

respond = true;
await client.SendAsync(responseMessage);
throw;
}
}
catch (Exception e)
{
if (!respond)
{
try
{
TmqMessage response = message.CreateResponse(TwinoResultCode.InternalServerError);
await client.SendAsync(response);
}
catch
{
}
}

await SendExceptions(client, e);
exception = e;
throw;
Expand Down
6 changes: 3 additions & 3 deletions src/Twino.Client.TMQ/Twino.Client.TMQ.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
<Product>Twino.Client.TMQ</Product>
<Description>Twino Messaging Queue Client to connect all TMQ Servers</Description>
<PackageTags>twino,tmq,client,mq,messaging,queue</PackageTags>
<AssemblyVersion>3.6.8</AssemblyVersion>
<FileVersion>3.6.8</FileVersion>
<PackageVersion>3.6.8</PackageVersion>
<AssemblyVersion>3.6.10</AssemblyVersion>
<FileVersion>3.6.10</FileVersion>
<PackageVersion>3.6.10</PackageVersion>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<Authors>Mehmet Helvacıköylü;Emre Hızlı</Authors>
<PackageProjectUrl>https://github.com/twino-framework/twino-mq</PackageProjectUrl>
Expand Down
6 changes: 3 additions & 3 deletions src/Twino.MQ.Data/Twino.MQ.Data.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
<Product>Twino.MQ.Data</Product>
<Description>Persistant queue message data library for Twino MQ</Description>
<PackageTags>twino,server,messaging,queue,mq,persistent,database,db</PackageTags>
<AssemblyVersion>3.6.8</AssemblyVersion>
<FileVersion>3.6.8</FileVersion>
<PackageVersion>3.6.8</PackageVersion>
<AssemblyVersion>3.6.10</AssemblyVersion>
<FileVersion>3.6.10</FileVersion>
<PackageVersion>3.6.10</PackageVersion>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<Authors>Mehmet Helvacıköylü;Emre Hızlı</Authors>
<PackageProjectUrl>https://github.com/twino-framework/twino-mq</PackageProjectUrl>
Expand Down
7 changes: 3 additions & 4 deletions src/Twino.MQ/Queues/ChannelQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -924,17 +924,16 @@ internal async Task WaitForAcknowledge(QueueMessage message)
if (!message.Message.PendingAcknowledge)
message.Message.PendingAcknowledge = true;

if (_acknowledgeCallback == null)
return;

//lock the object, because pending ack message should be queued
if (_ackSync == null)
_ackSync = new SemaphoreSlim(1, 1);

await _ackSync.WaitAsync();
try
{
await _acknowledgeCallback.Task;
if (_acknowledgeCallback != null)
await _acknowledgeCallback.Task;

_acknowledgeCallback = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
}
finally
Expand Down
2 changes: 1 addition & 1 deletion src/Twino.MQ/Queues/States/PushQueueState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ private async Task<PushResult> ProcessMessage(QueueMessage message)

//if to process next message is requires previous message acknowledge, wait here
if (_queue.Options.RequestAcknowledge && _queue.Options.WaitForAcknowledge)
await _queue.WaitForAcknowledge(message);
await _queue.WaitForAcknowledge(message);

message.Decision = await _queue.DeliveryHandler.BeginSend(_queue, message);
if (!await _queue.ApplyDecision(message.Decision, message))
Expand Down
6 changes: 3 additions & 3 deletions src/Twino.MQ/Twino.MQ.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
<Product>Twino.MQ</Product>
<Description>Messaging Queue Server library with TMQ Protocol via Twino Server</Description>
<PackageTags>twino,server,tmq,messaging,queue,mq</PackageTags>
<AssemblyVersion>3.6.8</AssemblyVersion>
<FileVersion>3.6.8</FileVersion>
<PackageVersion>3.6.8</PackageVersion>
<AssemblyVersion>3.6.10</AssemblyVersion>
<FileVersion>3.6.10</FileVersion>
<PackageVersion>3.6.10</PackageVersion>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<Authors>Mehmet Helvacıköylü;Emre Hızlı</Authors>
<PackageProjectUrl>https://github.com/twino-framework/twino-mq</PackageProjectUrl>
Expand Down
6 changes: 3 additions & 3 deletions src/Twino.Protocols.TMQ/Twino.Protocols.TMQ.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
<Product>Twino.Protocols.TMQ</Product>
<Description>Twino Messaging Queue (TMQ) Protocol library and server extension for Twino Server</Description>
<PackageTags>twino,tcp,server,http,messaging,queue,tmq,mq,protocol</PackageTags>
<AssemblyVersion>3.6.8</AssemblyVersion>
<FileVersion>3.6.8</FileVersion>
<PackageVersion>3.6.8</PackageVersion>
<AssemblyVersion>3.6.10</AssemblyVersion>
<FileVersion>3.6.10</FileVersion>
<PackageVersion>3.6.10</PackageVersion>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<Authors>Mehmet Helvacıköylü;Emre Hızlı</Authors>
<PackageProjectUrl>https://github.com/twino-framework/twino-mq</PackageProjectUrl>
Expand Down

0 comments on commit e3fb0f5

Please sign in to comment.