Skip to content

Commit

Permalink
test(worker): add test for job worker
Browse files Browse the repository at this point in the history
  • Loading branch information
Zelldon committed Nov 29, 2018
1 parent b491f81 commit e4f5b60
Show file tree
Hide file tree
Showing 12 changed files with 176 additions and 34 deletions.
12 changes: 11 additions & 1 deletion Client-Example/Client-Example.csproj
Expand Up @@ -8,7 +8,7 @@
<RootNamespace>ClientExample</RootNamespace>
<AssemblyName>Client-Example</AssemblyName>
<TargetFrameworkVersion>v4.7</TargetFrameworkVersion>
<AutoGenerateBindingRedirects>false</AutoGenerateBindingRedirects>
<AutoGenerateBindingRedirects>true</AutoGenerateBindingRedirects>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|x86' ">
<DebugSymbols>true</DebugSymbols>
Expand All @@ -30,13 +30,23 @@
<PlatformTarget>x86</PlatformTarget>
</PropertyGroup>
<ItemGroup>
<Reference Include="Google.Protobuf, Version=3.6.1.0, Culture=neutral, PublicKeyToken=a7d26565bac4d604">
<HintPath>..\packages\Google.Protobuf.3.6.1\lib\net45\Google.Protobuf.dll</HintPath>
</Reference>
<Reference Include="Grpc.Core, Version=1.0.0.0, Culture=neutral, PublicKeyToken=d754f35622e28bad">
<HintPath>..\packages\Grpc.Core.1.16.0\lib\net45\Grpc.Core.dll</HintPath>
</Reference>
<Reference Include="System" />
</ItemGroup>
<ItemGroup>
<Compile Include="Program.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Client-test\Client-test.csproj">
<Project>{98fb2560-2c8f-4a55-ad83-5d004f6833dc}</Project>
<Name>Client-test</Name>
</ProjectReference>
<ProjectReference Include="..\Client\Client.csproj">
<Project>{86325E12-E104-4360-B4F4-B21B30754EF3}</Project>
<Name>Client</Name>
Expand Down
33 changes: 32 additions & 1 deletion Client-Example/Program.cs
Expand Up @@ -13,7 +13,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
using System;
using GatewayProtocol;
using Google.Protobuf.Collections;
using Grpc.Core;
using Zeebe.Client;
using Zeebe.Client.Impl;

namespace ClientExample
{
Expand All @@ -23,8 +27,35 @@ public static void Main(string[] args)
{
Console.WriteLine("Hello World!");

var server = new Server();
server.Ports.Add(new ServerPort("localhost", 26500, ServerCredentials.Insecure));

var testService = new GatewayTestService();
var serviceDefinition = Gateway.BindService(testService);
server.Services.Add(serviceDefinition);
server.Start();

var client = new ZeebeClient("localhost:26500");

// given

var expectedRequest = new ActivateJobsRequest
{
Timeout = 123L, Amount = 1, Type = "foo", Worker = "jobWorker"
};

var expectedResponse = new ActivateJobsResponse
{
Jobs =
{
new ActivatedJob{Key = 1, JobHeaders = new JobHeaders()},
new ActivatedJob{Key = 2, JobHeaders = new JobHeaders()},
new ActivatedJob{Key = 3, JobHeaders = new JobHeaders()}
}
};

testService.AddRequestHandler(typeof(ActivateJobsRequest), request => expectedResponse);

IZeebeClient client = new Zeebe.Client.Impl.ZeebeClient("localhost:26500");

client.JobClient()
.Worker()
Expand Down
9 changes: 5 additions & 4 deletions Client-test/BaseZeebeTest.cs
Expand Up @@ -11,22 +11,23 @@ public class BaseZeebeTest
private GatewayTestService testService;
private IZeebeClient client;

public Server Server { get { return server; }}
public GatewayTestService TestService { get { return testService; }}
public IZeebeClient ZeebeClient { get { return client; }}
public Server Server => server;
public GatewayTestService TestService => testService;
public IZeebeClient ZeebeClient => client;


[SetUp]
public void Init()
{
client = new ZeebeClient("localhost:26500");
server = new Server();
server.Ports.Add(new ServerPort("localhost", 26500, ServerCredentials.Insecure));

testService = new GatewayTestService();
var serviceDefinition = Gateway.BindService(testService);
server.Services.Add(serviceDefinition);
server.Start();

client = new ZeebeClient("localhost:26500");
}

[TearDown]
Expand Down
2 changes: 1 addition & 1 deletion Client-test/Client-test.csproj
Expand Up @@ -42,10 +42,10 @@
</Reference>
</ItemGroup>
<ItemGroup>
<Compile Include="JobWorkerTest.cs" />
<Compile Include="RequestTopologyTest.cs" />
<Compile Include="GatewayTestService.cs" />
<Compile Include="BaseZeebeTest.cs" />
<Compile Include="ExampleMain.cs" />
</ItemGroup>
<ItemGroup>
<None Include="packages.config" />
Expand Down
19 changes: 13 additions & 6 deletions Client-test/GatewayTestService.cs
Expand Up @@ -2,12 +2,14 @@
using System.Collections.Generic;
using Google.Protobuf;
using System.Threading.Tasks;
using GatewayProtocol;
using Grpc.Core;

namespace Zeebe.Client
{
public delegate IMessage RequestHandler(IMessage request);

public class GatewayTestService : GatewayProtocol.Gateway.GatewayBase
public class GatewayTestService : Gateway.GatewayBase
{

private readonly List<IMessage> requests = new List<IMessage>();
Expand All @@ -23,7 +25,8 @@ public class GatewayTestService : GatewayProtocol.Gateway.GatewayBase

public GatewayTestService()
{
typedRequestHandler.Add(typeof(GatewayProtocol.TopologyRequest), (request) => new GatewayProtocol.TopologyResponse());
typedRequestHandler.Add(typeof(TopologyRequest), request => new TopologyResponse());
typedRequestHandler.Add(typeof(ActivateJobsRequest), request => new ActivateJobsResponse());
}

public void AddRequestHandler(Type requestType, RequestHandler requestHandler) => typedRequestHandler[requestType] = requestHandler;
Expand All @@ -32,17 +35,21 @@ public GatewayTestService()
// overwrite base methods to handle requests
//

public override Task<GatewayProtocol.TopologyResponse> Topology(GatewayProtocol.TopologyRequest request, Grpc.Core.ServerCallContext context)
public override Task<TopologyResponse> Topology(TopologyRequest request, ServerCallContext context)
{
return Task.FromResult((GatewayProtocol.TopologyResponse) HandleRequest(request, context));
return Task.FromResult((TopologyResponse) HandleRequest(request, context));
}

public override async Task ActivateJobs(ActivateJobsRequest request, IServerStreamWriter<ActivateJobsResponse> responseStream, ServerCallContext context)
{
await responseStream.WriteAsync((ActivateJobsResponse) HandleRequest(request, context));
}

private IMessage HandleRequest(IMessage request, Grpc.Core.ServerCallContext context)
private IMessage HandleRequest(IMessage request, ServerCallContext context)
{
requests.Add(request);

RequestHandler handler = typedRequestHandler[request.GetType()];
var handler = typedRequestHandler[request.GetType()];
return handler.Invoke(request);
}
}
Expand Down
62 changes: 62 additions & 0 deletions Client-test/JobWorkerTest.cs
@@ -0,0 +1,62 @@
using System;
using NUnit.Framework;
using GatewayProtocol;
using Zeebe.Client.Impl.Responses;
using Zeebe.Client.Api.Responses;
using System.Diagnostics.Contracts;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
using Zeebe.Client.Api.Subscription;


namespace Zeebe.Client
{
[TestFixture]
public class JobWorkerTest : BaseZeebeTest
{
[Test]
public async Task ShouldSendRequestAsExpected()
{
// given
var expectedRequest = new ActivateJobsRequest
{
Timeout = 123L, Amount = 1, Type = "foo", Worker = "jobWorker"
};

var expectedResponse = new ActivateJobsResponse
{
Jobs =
{
new GatewayProtocol.ActivatedJob{Key = 1, JobHeaders = new GatewayProtocol.JobHeaders()},
new GatewayProtocol.ActivatedJob{Key = 2, JobHeaders = new GatewayProtocol.JobHeaders()},
new GatewayProtocol.ActivatedJob{Key = 3, JobHeaders = new GatewayProtocol.JobHeaders()}
}
};

TestService.AddRequestHandler(typeof(ActivateJobsRequest), request => expectedResponse);

// when
var signal = new EventWaitHandle(false, EventResetMode.AutoReset);
using (var jobWorker = ZeebeClient.JobClient()
.Worker()
.JobType("foo")
.Handler((jobClient, job) => { signal.Set(); })
.Limit(1)
.Name("jobWorker")
.Timeout(123L)
.PollInterval(TimeSpan.FromMilliseconds(100))
.Open())
{

// then
Assert.True(jobWorker.IsOpen());

signal.WaitOne();
}

var actualRequest = TestService.Requests[0];
Assert.AreEqual(expectedRequest, actualRequest);
}
}
}
2 changes: 0 additions & 2 deletions Client-test/RequestTopologyTest.cs
@@ -1,8 +1,6 @@
using NUnit.Framework;
using GatewayProtocol;
using Zeebe.Client.Impl.Responses;
using Zeebe.Client.Api.Responses;
using System.Diagnostics.Contracts;
using System.Threading.Tasks;

namespace Zeebe.Client
Expand Down
1 change: 1 addition & 0 deletions Client/Api/Subscription/IJobWorkerBuilderStep1.cs
Expand Up @@ -15,6 +15,7 @@
*/

using System;
using System.Threading.Tasks;
using Zeebe.Client.Api.Clients;
using Zeebe.Client.Api.Responses;

Expand Down
5 changes: 5 additions & 0 deletions Client/Impl/Responses/ActivatedJob.cs
Expand Up @@ -59,5 +59,10 @@ public ActivatedJob(GatewayProtocol.ActivatedJob activatedJob)
public InstanceType PayloadAsType<InstanceType>() {
return JsonConvert.DeserializeObject<InstanceType>(Payload);
}

public override string ToString()
{
return $"{nameof(Key)}: {Key}, {nameof(Type)}: {Type}, {nameof(Headers)}: {Headers}, {nameof(Worker)}: {Worker}, {nameof(Retries)}: {Retries}, {nameof(Deadline)}: {Deadline}, {nameof(Payload)}: {Payload}, {nameof(PayloadAsDictionary)}: {PayloadAsDictionary}";
}
}
}
4 changes: 4 additions & 0 deletions Client/Impl/Responses/JobHeaders.cs
Expand Up @@ -40,5 +40,9 @@ public JobHeaders(GatewayProtocol.JobHeaders jobHeaders)

public long ElementInstanceKey { get; }

public override string ToString()
{
return $"{nameof(WorkflowInstanceKey)}: {WorkflowInstanceKey}, {nameof(BpmnProcessId)}: {BpmnProcessId}, {nameof(WorkflowDefinitionVersion)}: {WorkflowDefinitionVersion}, {nameof(WorkflowKey)}: {WorkflowKey}, {nameof(ElementId)}: {ElementId}, {nameof(ElementInstanceKey)}: {ElementInstanceKey}";
}
}
}
58 changes: 40 additions & 18 deletions Client/Impl/Subscription/JobWorker.cs
Expand Up @@ -13,19 +13,21 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using GatewayProtocol;
using Zeebe.Client.Api.Clients;
using Zeebe.Client.Api.Responses;
using Zeebe.Client.Api.Subscription;
using ActivatedJob = Zeebe.Client.Impl.Responses.ActivatedJob;

namespace Zeebe.Client.Impl.Subscription
{
public class JobWorker : IJobWorker
{
private ConcurrentQueue<IJob> workItems = new ConcurrentQueue<IJob>();
private readonly ConcurrentQueue<IJob> workItems = new ConcurrentQueue<IJob>();

private readonly ActivateJobsRequest activeRequest;
private readonly Gateway.GatewayClient client;
Expand All @@ -36,11 +38,12 @@ public class JobWorker : IJobWorker

private bool isRunning;

public JobWorker(Gateway.GatewayClient client, ActivateJobsRequest request, int pollInterval, IJobClient jobClient, JobHandler jobHandler)
internal JobWorker(Gateway.GatewayClient client, ActivateJobsRequest request, int pollInterval,
IJobClient jobClient, JobHandler jobHandler)
{
this.source = new CancellationTokenSource();
source = new CancellationTokenSource();
this.client = client;
this.activeRequest = request;
activeRequest = request;
this.pollInterval = pollInterval;
this.jobClient = jobClient;
this.jobHandler = jobHandler;
Expand All @@ -50,13 +53,21 @@ public JobWorker(Gateway.GatewayClient client, ActivateJobsRequest request, int
internal void Open()
{
isRunning = true;
CancellationToken cancellationToken = source.Token;

TaskFactory taskFactory = new TaskFactory();
var poller = taskFactory.StartNew(async () => await Poll(cancellationToken));
var handler = taskFactory.StartNew(() => HandleActivatedJobs());

Task.WaitAll(poller, handler);
var cancellationToken = source.Token;

var taskFactory = new TaskFactory();

taskFactory.StartNew(async () =>
await Poll(cancellationToken)
.ContinueWith(t => Console.WriteLine(t.Exception.ToString()),
TaskContinuationOptions.OnlyOnFaulted)
).ContinueWith(
t => Console.WriteLine(t.Exception.ToString()),
TaskContinuationOptions.OnlyOnFaulted);

taskFactory.StartNew(() => HandleActivatedJobs())
.ContinueWith(t => Console.WriteLine(t.Exception.ToString()),
TaskContinuationOptions.OnlyOnFaulted);
}

private void HandleActivatedJobs()
Expand All @@ -69,7 +80,15 @@ private void HandleActivatedJobs()

if (success)
{
jobHandler.Invoke(jobClient, activatedJob);
try
{
jobHandler(jobClient, activatedJob);
}
catch (Exception exception)
{
Console.WriteLine("Fail to handle job with values '{0}', job handler throws exception {1}", activatedJob, exception);
// TODO fail job
}
}
}
else
Expand All @@ -90,13 +109,16 @@ private async Task Poll(CancellationToken cancellationToken)

private async Task PollJobs(CancellationToken cancellationToken)
{
var stream = client.ActivateJobs(activeRequest).ResponseStream;
while (await stream.MoveNext(cancellationToken))
using (var stream = client.ActivateJobs(activeRequest))
{
ActivateJobsResponse response = stream.Current;
foreach (ActivatedJob job in response.Jobs)
var responseStream = stream.ResponseStream;
while (await responseStream.MoveNext(cancellationToken))
{
workItems.Enqueue(new Responses.ActivatedJob(job));
var response = responseStream.Current;
foreach (var job in response.Jobs)
{
workItems.Enqueue(new ActivatedJob(job));
}
}
}
}
Expand All @@ -117,4 +139,4 @@ public bool IsClosed()
return !isRunning;
}
}
}
}

0 comments on commit e4f5b60

Please sign in to comment.