Skip to content

Commit

Permalink
upgraded to Zeebe 0.14.0 and cleaned up a bit
Browse files Browse the repository at this point in the history
  • Loading branch information
berndruecker committed Dec 12, 2018
1 parent 835721e commit fc77fe3
Show file tree
Hide file tree
Showing 20 changed files with 212 additions and 101 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Expand Up @@ -11,8 +11,8 @@
<dependencies>
<dependency>
<groupId>io.zeebe</groupId>
<artifactId>zeebe-gateway</artifactId>
<version>0.12.0-alpha4</version>
<artifactId>zeebe-client-java</artifactId>
<version>0.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
Expand Down
28 changes: 0 additions & 28 deletions src/main/java/io/berndruecker/demo/SysoutWorker.java

This file was deleted.

Expand Up @@ -42,7 +42,7 @@ public List<Map<String, String>> taskConfigs(final int maxTasks) {

for (int i = 0; i < maxTasks; i++) {
final Map<String, String> config = new HashMap<>();
config.put(Constants.CONFIG_ZEEBE_BROKER_ADDRESS, zeebeBrokerAddress.toString());
config.put(Constants.CONFIG_ZEEBE_BROKER_ADDRESS, zeebeBrokerAddress);
config.put(Constants.CONFIG_CORRELATION_KEY_JSONPATH, correlationKeyJsonPath);
config.put(Constants.CONFIG_MESSAGE_NAME_JSONPATH, messageNameJsonPath);
config.put(Constants.CONFIG_START_EVENT_MAPPING, startEventMapping);
Expand Down
Expand Up @@ -15,10 +15,9 @@
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;

import io.zeebe.gateway.ZeebeClient;
import io.zeebe.gateway.api.events.MessageEvent;
import io.zeebe.gateway.api.events.WorkflowInstanceEvent;
import io.zeebe.gateway.cmd.ClientCommandRejectedException;
import io.opencensus.trace.MessageEvent;
import io.zeebe.client.ZeebeClient;
import io.zeebe.client.api.events.WorkflowInstanceEvent;

public final class ZeebeSinkTask extends SinkTask {

Expand Down Expand Up @@ -78,17 +77,17 @@ public void put(final Collection<SinkRecord> records) {
LOG.warn("Started workflow instance " + workflowInstanceEvent + " based on record " + record);
} else {
// back to normal behavior
MessageEvent messageEvent = zeebe.workflowClient().newPublishMessageCommand() //
zeebe.workflowClient().newPublishMessageCommand() //
.messageName(messageName) //
.correlationKey(correlationKey) //
.messageId(messageId) //
.payload(payload) //
.send().join();

LOG.warn("Send message " + messageEvent + " to Zeebe based on record " + record);
LOG.warn("Send message to Zeebe based on record " + record);
}
}
catch (ClientCommandRejectedException ex) {
catch (Exception ex) {
// something could not be processed in Zeebe
// Retry will not have any effect
// so ignore it for the moment
Expand Down
@@ -1,7 +1,6 @@
package io.berndruecker.demo.kafka.connect.zeebe;

import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.Charset;
import java.time.Duration;
import java.util.LinkedList;
Expand All @@ -15,12 +14,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.zeebe.gateway.ZeebeClient;
import io.zeebe.gateway.ZeebeClientConfiguration;
import io.zeebe.gateway.api.clients.JobClient;
import io.zeebe.gateway.api.events.JobEvent;
import io.zeebe.gateway.api.subscription.JobHandler;
import io.zeebe.gateway.api.subscription.JobWorker;
import io.zeebe.client.ZeebeClient;
import io.zeebe.client.api.clients.JobClient;
import io.zeebe.client.api.response.ActivatedJob;
import io.zeebe.client.api.subscription.JobHandler;
import io.zeebe.client.api.subscription.JobWorker;

public final class ZeebeSourceTask extends SourceTask {

Expand All @@ -38,15 +36,17 @@ public final class ZeebeSourceTask extends SourceTask {
private ZeebeClient zeebe;

private JobWorker subscription;
private ConcurrentLinkedQueue<JobEvent> collectedJobs = new ConcurrentLinkedQueue<>();

private ConcurrentLinkedQueue<CollectedJob> collectedJobs = new ConcurrentLinkedQueue<>();

private static class CollectedJob {
public ActivatedJob job;
// public JobClient jobClient;
}

@Override
public void start(final Map<String, String> props) {

zeebeBrokerAddress = props.get(Constants.CONFIG_ZEEBE_BROKER_ADDRESS);
//
// correlationKeyJsonPath = props.get(Constants.CONFIG_CORRELATION_KEY_JSONPATH);
// messageNameJsonPath = props.get(Constants.CONFIG_MESSAGE_NAME_JSONPATH);

LOG.info("Connecting to Zeebe broker at '" + zeebeBrokerAddress + "'");

Expand All @@ -58,30 +58,35 @@ public void start(final Map<String, String> props) {
subscription = zeebe.jobClient().newWorker() //
.jobType("sendMessage") //
.handler(new JobHandler() {
public void handle(JobClient jobClient, JobEvent jobEvent) {
collectedJobs.add(jobEvent);
public void handle(JobClient jobClient, ActivatedJob jobEvent) {
CollectedJob collectedJob = new CollectedJob();
collectedJob.job = jobEvent;
jobClient //
.newCompleteCommand(collectedJob.job.getKey()) //
.send().join();
}
}) //
.name("KafkaConnector") //
.timeout(Duration.ofSeconds(1)) //
.open();
LOG.info("Subscribed to Zeebe at '" + zeebeBrokerAddress + "' for sending records");
}

@Override
public List<SourceRecord> poll() {
final List<SourceRecord> records = new LinkedList<>();

JobEvent jobEvent = null;
while ((jobEvent = collectedJobs.poll()) != null) {
CollectedJob collectedJob = null;
while ((collectedJob = collectedJobs.poll()) != null) {

for (String topic : kafkaTopics) {
final SourceRecord record = new SourceRecord(null, null, topic, // ignore partitions for now random.nextInt(kafkaPartitions),
Schema.BYTES_SCHEMA, //
// TODO: THink about if always the full payload should be transfered
jobEvent.getPayload().getBytes(Charset.forName("UTF-8")));
collectedJob.job.getPayload().getBytes(Charset.forName("UTF-8")));
records.add(record);
LOG.warn("Collected record to be sent to Kafka " + record);
}

}

return records;
Expand Down
@@ -1,17 +1,14 @@
package io.berndruecker.demo;
package io.berndruecker.demo.kafka.connect.zeebe.test;

import io.zeebe.gateway.ZeebeClient;
import io.zeebe.gateway.api.clients.JobClient;
import io.zeebe.gateway.api.events.JobEvent;
import io.zeebe.gateway.api.subscription.JobHandler;
import io.zeebe.client.ZeebeClient;

public class DeployWorkflowModel {

public static void main(String[] args) {
ZeebeClient zeebe = ZeebeClient.newClient();

zeebe.workflowClient().newDeployCommand()
.addResourceFromClasspath("play.bpmn")
.addResourceFromClasspath("test-kafka-connect.bpmn")
.send().join();

System.out.println("deployed");
Expand Down
@@ -1,28 +1,28 @@
package io.berndruecker.demo;
package io.berndruecker.demo.kafka.connect.zeebe.test;

import io.zeebe.gateway.ZeebeClient;
import io.zeebe.gateway.api.clients.JobClient;
import io.zeebe.gateway.api.events.JobEvent;
import io.zeebe.gateway.api.subscription.JobHandler;
import io.zeebe.client.ZeebeClient;
import io.zeebe.client.api.clients.JobClient;
import io.zeebe.client.api.response.ActivatedJob;
import io.zeebe.client.api.subscription.JobHandler;

public class Play {

public static void main(String[] args) {
ZeebeClient zeebe = ZeebeClient.newClient();

zeebe.workflowClient().newDeployCommand()
.addResourceFromClasspath("play.bpmn")
.addResourceFromClasspath("test-kafka-connect.bpmn")
.send().join();

System.out.println("deployed");

zeebe.workflowClient().newCreateInstanceCommand()
.bpmnProcessId("play")
.bpmnProcessId("test-kafka-connect")
.latestVersion()
.payload("{\"orderId\": \"17\"}")
.send().join();

System.out.println("started");
System.out.println("started workflow instance");

// zeebe.topicClient().workflowClient().newPublishMessageCommand()
// .messageName("OrderPaid")
Expand All @@ -33,7 +33,7 @@ public static void main(String[] args) {
// System.out.println("sent message");

zeebe.workflowClient().newCreateInstanceCommand()
.bpmnProcessId("play")
.bpmnProcessId("test-kafka-connect")
.latestVersion()
.payload("{\"orderId\": \"17\"}")
.send().join();
Expand All @@ -45,9 +45,9 @@ public static void main(String[] args) {
.handler(new JobHandler() {

@Override
public void handle(JobClient client, JobEvent evt) {
System.out.println(evt);
client.newCompleteCommand(evt).send().join();
public void handle(JobClient client, ActivatedJob job) {
System.out.println(job);
client.newCompleteCommand(job.getKey()).send().join();
}
})
.open();
Expand Down
@@ -1,4 +1,4 @@
package io.berndruecker.demo;
package io.berndruecker.demo.kafka.connect.zeebe.test;

import java.util.Properties;

Expand All @@ -7,7 +7,7 @@
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

public class SendKafkaMessage {
public class SendOrderPaidKafkaMessage {

public static void main(String[] argv) throws Exception {
// Configure the Producer
Expand Down
@@ -1,14 +1,14 @@
package io.berndruecker.demo;
package io.berndruecker.demo.kafka.connect.zeebe.test;

import io.zeebe.gateway.ZeebeClient;
import io.zeebe.client.ZeebeClient;

public class StartWorkflowInstance {

public static void main(String[] args) {
ZeebeClient zeebe = ZeebeClient.newClient();

zeebe.workflowClient().newCreateInstanceCommand()
.bpmnProcessId("play")
.bpmnProcessId("test-kafka-connect")
.latestVersion()
.payload("{\"orderId\": \"18\"}")
.send().join();
Expand Down
@@ -0,0 +1,28 @@
package io.berndruecker.demo.kafka.connect.zeebe.test;

import io.zeebe.client.ZeebeClient;
import io.zeebe.client.api.clients.JobClient;
import io.zeebe.client.api.response.ActivatedJob;
import io.zeebe.client.api.subscription.JobHandler;

public class SysoutWorker {

public static void main(String[] args) {
ZeebeClient zeebe = ZeebeClient.newClient();

zeebe.jobClient().newWorker()
.jobType("sysout")
.handler(new JobHandler() {

@Override
public void handle(JobClient client, ActivatedJob job) {
System.out.println(job);
client.newCompleteCommand(job.getKey()).send().join();
}
})
.open();

System.out.println("Waiting for work...");
}

}

0 comments on commit fc77fe3

Please sign in to comment.