diff --git a/contrib/jms/AUTHORS.md b/contrib/jms/AUTHORS.md new file mode 100644 index 0000000000..3d4f79ae71 --- /dev/null +++ b/contrib/jms/AUTHORS.md @@ -0,0 +1,7 @@ +# Authors of 'jms' module + +The following is the official list of authors for copyright purposes of this community-contributed module. + + Apache + Jean-Baptiste Onofré, jbonofre [at] apache [dot] org + Google Inc. \ No newline at end of file diff --git a/contrib/jms/README.md b/contrib/jms/README.md new file mode 100644 index 0000000000..99b879c88c --- /dev/null +++ b/contrib/jms/README.md @@ -0,0 +1,45 @@ +# JMS module + +This library provides Dataflow sources and sinkgs to make it possible to read +and write on JMS brokers from Dataflow pipelines. + +It supports both JMS queues and topics, with unbounded or bounded `PCollections`. + +To use JmsIO, you have to: + +1. Create a JMS `ConnectionFactory` specific to the JMS broker you want to use (for instance, ActiveMQ, IBM MQ, ...) +2. Specify the JMS destination (queue or topic) you want to use + +## Reading (consuming messages) with JmsIO + +The `JmsIO.Read` transform continuously consumes from the JMS broker and returns an unbounded `PCollection` of `Strings` that +represent the messages. By default, each element in the resulting `PCollection` is encoded as a UTF-8 string. +You can override the default encoding by using `withCoder` when you call JmsIO.Read. + +---- +PipelineOptions options = PipelineOptionsFactory.create(); +Pipeline p = Pipeline.create(options); + +// create JMS connection factory, for instance, with ActiveMQ +javax.jms.ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); + +PCollection data = p.apply(JmsIO.Read.named("ConsumeFromJMS").connectionFactory(connectionFactory).queue("my-queue")); +---- + +### Reading a bounded set of messages from JMS + +The `DirectPipelineRunner` and the batch mode of the Dataflow service do not support unbounded `PCollections`. +To use JmsIO as source in these contexts, you need to supply a bound on the amount of messages to consume. + +You can specify the `.maxNumMessages` option to read a fixed maximum number of messages. + +## Writing (producing messages) with JmsIO + +The JmsIO.Write transform continuously writes an unbounded `PCollection` of `String` objects, produced to a +JMS broker. By default, the input `PCollection` to `JmsIO.Write` must contain strings encoded in UTF-8. +You can change the expected input type and encoding by using `withCoder`. + +---- +PCollection data = ...; +data.apply(JmsIO.Write.named("ProduceToJMS").connectionFactory(connectionFactory).queue("my-queue")); +---- \ No newline at end of file diff --git a/contrib/jms/pom.xml b/contrib/jms/pom.xml new file mode 100644 index 0000000000..7728458ad6 --- /dev/null +++ b/contrib/jms/pom.xml @@ -0,0 +1,170 @@ + + + + 4.0.0 + + com.google.cloud.dataflow + google-cloud-dataflow-java-contrib-jms + Google Cloud Dataflow JMS Library + Library to read and write data from a JMS broker. + 0.0.1-SNAPSHOT + jar + + + + Apache License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + repo + + + + + UTF-8 + [1.2.0,2.0.0) + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.2 + + 1.7 + 1.7 + + + + + org.apache.maven.plugins + maven-checkstyle-plugin + 2.12 + + + com.puppycrawl.tools + checkstyle + 6.6 + + + + ../../checkstyle.xml + true + true + true + + + + + check + + + + + + + + org.apache.maven.plugins + maven-source-plugin + 2.4 + + + attach-sources + compile + + jar + + + + attach-test-sources + test-compile + + test-jar + + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + + Google Cloud Dataflow JMS Contrib + Google Cloud Dataflow JMS Contrib + + com.google.cloud.dataflow.contrib.jms + false + ]]> + + + + https://cloud.google.com/dataflow/java-sdk/JavaDoc/ + ${basedir}/../../javadoc/dataflow-sdk-docs + + + http://docs.guava-libraries.googlecode.com/git-history/release18/javadoc/ + ${basedir}/../../javadoc/guava-docs + + + + + + + jar + + package + + + + + + + + + com.google.cloud.dataflow + google-cloud-dataflow-java-sdk-all + ${google-cloud-dataflow-version} + + + + org.apache.geronimo.specs + geronimo-jms_1.1_spec + 1.1.1 + + + + + org.apache.activemq + activemq-broker + 5.11.2 + test + + + org.apache.activemq + activemq-kahadb-store + 5.11.2 + test + + + junit + junit + 4.11 + test + + + diff --git a/contrib/jms/src/main/java/com/google/cloud/dataflow/contrib/jms/JmsIO.java b/contrib/jms/src/main/java/com/google/cloud/dataflow/contrib/jms/JmsIO.java new file mode 100644 index 0000000000..4b40548294 --- /dev/null +++ b/contrib/jms/src/main/java/com/google/cloud/dataflow/contrib/jms/JmsIO.java @@ -0,0 +1,532 @@ +/* + * Copyright (C) 2015 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.dataflow.contrib.jms; + +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; +import com.google.cloud.dataflow.sdk.coders.VoidCoder; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.util.CoderUtils; +import com.google.cloud.dataflow.sdk.util.WindowingStrategy; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.PDone; +import com.google.cloud.dataflow.sdk.values.PInput; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +import javax.annotation.Nullable; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +/** + * Read and Write {@link PTransform}s for JMS broker. These transforms create and consume + * unbounded {@link PCollection PCollections}. + */ +public class JmsIO { + + private static final Logger LOG = LoggerFactory.getLogger(JmsIO.class); + + /** The default {@link Coder} used to translate to/from JMS messages. */ + public static final Coder DEFAULT_JMS_CODER = StringUtf8Coder.of(); + + /** + * A {@link PTransform} that continuously reads from a JMS broker and + * return a {@link PCollection} of {@link String Strings} containing the items. + */ + public static class Read { + + /** + * Creates and returns a transform for reading from the JMS broker with the specified + * transform name. + * + * @param name the step name + * @return the Bound + */ + public static Bound named(String name) { + return new Bound<>(DEFAULT_JMS_CODER).named(name); + } + + /** + * Creates and returns a transform for reading from a JMS broker with the provided + * JMS connection factory. + * + * @param connectionFactory the JMS connection factory to use to connect to the broker + * @return the Bound + */ + public static Bound connectionFactory(ConnectionFactory connectionFactory) { + return new Bound<>(DEFAULT_JMS_CODER).connectionFactory(connectionFactory); + } + + /** + * Creates and returns a transform for reading from a JMS broker with the provided + * JMS queue. + * + * @param name the JMS queue name + * @return the Bound + */ + public static Bound queue(String name) { + return new Bound<>(DEFAULT_JMS_CODER).queue(name); + } + + /** + * Creates and returns a transform for reading from a JMS broker with the provided + * JMS topic. + * + * @param name the JMS topic name. + * @return the Bound + */ + public static Bound topic(String name) { + return new Bound<>(DEFAULT_JMS_CODER).topic(name); + } + + /** + * Creates and returns a transform for reading from JMS broker that uses the given + * {@link Coder} to decode JMS messages into a value of type {@code T}. + * + * @param coder the coder to use + * @return the Bound + */ + public static Bound withCoder(Coder coder) { + return new Bound<>(coder); + } + + /** + * Creates and returns a tranform for reading from JMS broker that limit the + * number of messages consumed. + * + * @param maxNumMessages the max number of messages consumed + * @return the Bound + */ + public static Bound maxNumMessages(int maxNumMessages) { + return new Bound<>(DEFAULT_JMS_CODER).maxNumMessages(maxNumMessages); + } + + /** + * A {@link PTransform} that reads from a JMS broker and returns + * a unbounded {@link PCollection} containing the items from the stream. + */ + public static class Bound extends PTransform> { + + /** The JMS connection factory. */ + private final ConnectionFactory connectionFactory; + + /** The JMS queue to read from. */ + private final String queue; + + /** The JMS topic to read from. */ + private final String topic; + + /** The coder used to decode each message. */ + @Nullable + private final Coder coder; + + /** Stop after reading this many messages. */ + private final int maxNumMessages; + + private Bound(Coder coder) { + this(null, null, null, null, coder, 0); + } + + private Bound(String name, ConnectionFactory connectionFactory, String queue, + String topic, Coder coder, int maxNumMessages) { + super(name); + this.connectionFactory = connectionFactory; + this.queue = queue; + this.topic = topic; + this.coder = coder; + this.maxNumMessages = maxNumMessages; + } + + /** + * Returns a transform that's like this one but with the given step name. + * + * @param name the step name + * @return the Bound + */ + public Bound named(String name) { + return new Bound(name, connectionFactory, queue, topic, coder, maxNumMessages); + } + + /** + * Returns a transform that's like this one but using the JMS connection factory. + * + * @param connectionFactory the JMS connection factory to use + * @return the Bound + */ + public Bound connectionFactory(ConnectionFactory connectionFactory) { + return new Bound(name, connectionFactory, queue, topic, coder, maxNumMessages); + } + + /** + * Returns a transform that's like this one but that reads + * from the specified JMS queue. + * + * @param queue the JMS queue + * @return the Bound + */ + public Bound queue(String queue) { + return new Bound(name, connectionFactory, queue, topic, coder, maxNumMessages); + } + + /** + * Returns a transform that's like this one but that reads + * from the specified JMS topic. + * + * @param topic the JMQ topic + * @return the Bound + */ + public Bound topic(String topic) { + return new Bound(name, connectionFactory, queue, topic, coder, maxNumMessages); + } + + /** + * Returns a transform that's like this one but that uses the + * given {@link Coder} to decode each record into a value of + * type {@code X}. + * + * @param coder the coder to use + * @return the Bound + */ + public Bound withCoder(Coder coder) { + return new Bound<>(name, connectionFactory, queue, topic, coder, maxNumMessages); + } + + /** + * Return a transform that's like this one but will only read up + * to the specified maximum number of messages from the JMS broker. + * The transform produces a bounded {@link PCollection}. + * + * @param maxNumMessages the max number messages limit to consume + * @return the Bound + */ + public Bound maxNumMessages(int maxNumMessages) { + return new Bound(name, connectionFactory, queue, topic, coder, maxNumMessages); + } + + @Override + public PCollection apply(PInput input) { + if (connectionFactory == null) { + throw new IllegalArgumentException("need to set connection factory " + + "for a JmsIO.Read transform"); + } + if (queue == null && topic == null) { + throw new java.lang.IllegalStateException("need to set destination " + + "(queue or topic) for a JmsIO.Read transform"); + } + + boolean boundedOutput = getMaxNumMessages() > 0; + + if (boundedOutput) { + return input.getPipeline().begin() + .apply(Create.of((Void) null)).setCoder(VoidCoder.of()) + .apply(ParDo.of(new JmsReader())).setCoder(coder); + } else { + return PCollection.createPrimitiveOutputInternal( + input.getPipeline(), WindowingStrategy.globalDefault(), + PCollection.IsBounded.UNBOUNDED).setCoder(coder); + } + } + + @Override + protected Coder getDefaultOutputCoder() { + return coder; + } + + public ConnectionFactory getConnectionFactory() { + return connectionFactory; + } + + public String getQueue() { + return queue; + } + + public String getTopic() { + return topic; + } + + public Coder getCoder() { + return coder; + } + + public int getMaxNumMessages() { + return maxNumMessages; + } + + private class JmsReader extends DoFn { + + @Override + public void processElement(ProcessContext c) throws Exception { + LOG.debug("Connecting to the JMS broker"); + Connection connection = connectionFactory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer; + if (queue != null) { + consumer = session.createConsumer(session.createQueue(queue)); + } else { + consumer = session.createConsumer(session.createTopic(topic)); + } + + List messages = new ArrayList<>(); + + while ((getMaxNumMessages() == 0 || + messages.size() <= getMaxNumMessages())) { + LOG.debug("Consuming JMS message"); + TextMessage message = (TextMessage) consumer.receive(); + LOG.debug("Adding JMS message to the list ({})", messages.size()); + messages.add(message); + } + + consumer.close(); + session.close(); + connection.stop(); + connection.close(); + + for (TextMessage message : messages) { + c.output(CoderUtils.decodeFromByteArray(coder, + message.getText().getBytes())); + } + } + + } + + + } + + /** Disallow construction of utility class. */ + private Read() {} + + } + + /** Disallow construction of utility class. */ + private JmsIO() {} + + /** + * A {@link PTransform} that continuously writes a {@link PCollection} of {@link String Strings} + * to a JMS broker on the specified destination. + */ + public static class Write { + + /** + * Creates a transform that produces to a JMS broker with the given step name. + * + * @param name the step name + * @return the Bound + */ + public static Bound named(String name) { + return new Bound<>(DEFAULT_JMS_CODER).named(name); + } + + /** + * Creates a transform that produces to a JMS broker with the given JMS connection factory. + * + * @param connectionFactory the JMS connection factory to use to connect to the JMS broker + * @return the Bound + */ + public static Bound connectionFactory(ConnectionFactory connectionFactory) { + return new Bound<>(DEFAULT_JMS_CODER).connectionFactory(connectionFactory); + } + + /** + * Creates a transform that produces to a JMS broker + * on the given JMS queue. + * + * @param queue the JMS queue where to produce messages + * @return the Bound + */ + public static Bound queue(String queue) { + return new Bound<>(DEFAULT_JMS_CODER).queue(queue); + } + + /** + * Creates a trasnform that produces to a JMS broker + * on the given JMS topic. + * + * @param topic the JMS topic where to produce messages + * @return the Bound + */ + public static Bound topic(String topic) { + return new Bound<>(DEFAULT_JMS_CODER).topic(topic); + } + + /** + * Creates a transform that uses the given {@link Coder} + * to encode each of the elements of the input collection + * into an output message. + * + * @param coder the coder to use + * @return the Bound + */ + public static Bound withCoder(Coder coder) { + return new Bound<>(coder); + } + + /** + * A {@link PTransform} that reads from a JMS broker and returns + * a unbounded {@link PCollection} containing the items from the stream. + */ + public static class Bound extends PTransform, PDone> { + + /** + * The JMS connection factory. + */ + private final ConnectionFactory connectionFactory; + + /** + * The JMS queue where to produce messages. + */ + private final String queue; + + /** + * The JMS topic where to produce messages. + */ + private final String topic; + + /** + * The coder used to decode each message. + */ + @Nullable + private final Coder coder; + + private Bound(Coder coder) { + this(null, null, null, null, coder); + } + + private Bound(String name, ConnectionFactory connectionFactory, + String queue, String topic, Coder coder) { + super(name); + this.connectionFactory = connectionFactory; + this.queue = queue; + this.topic = topic; + this.coder = coder; + } + + public Bound named(String name) { + return new Bound(name, + connectionFactory, queue, topic, coder); + } + + public Bound connectionFactory(ConnectionFactory connectionFactory) { + return new Bound(name, connectionFactory, queue, topic, coder); + } + + public Bound queue(String queue) { + return new Bound(name, connectionFactory, queue, topic, coder); + } + + public Bound topic(String topic) { + return new Bound(name, connectionFactory, queue, topic, coder); + } + + public Bound withCoder(Coder coder) { + return new Bound<>(name, connectionFactory, queue, topic, coder); + } + + @Override + public PDone apply(PCollection input) { + if (connectionFactory == null) { + throw new java.lang.IllegalStateException("need to set the JMS " + + "connection factory of a " + + "JmsIO.Write transform"); + } + if (queue == null && topic == null) { + throw new java.lang.IllegalStateException("need to set the JMS " + + "destination (queue or topic) of " + + "a JmsIO.Write transform"); + } + input.apply(ParDo.of(new JmsWriter())); + return PDone.in(input.getPipeline()); + } + + @Override + protected Coder getDefaultOutputCoder() { + return coder; + } + + public ConnectionFactory getConnectionFactory() { + return connectionFactory; + } + + public String getQueue() { + return queue; + } + + public String getTopic() { + return topic; + } + + public Coder getCoder() { + return coder; + } + + private class JmsWriter extends DoFn { + private transient Connection connection; + private transient Session session; + + @Override + public void startBundle(Context c) throws Exception { + connection = connectionFactory.createConnection(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + @Override + public void processElement(ProcessContext c) throws Exception { + TextMessage message = session.createTextMessage( + (String) CoderUtils.decodeFromByteArray( + getCoder(), + ((String) c.element()).getBytes())); + MessageProducer producer; + if (queue != null) { + producer = session.createProducer(session.createQueue(queue)); + } else { + producer = session.createProducer(session.createTopic(topic)); + } + producer.send(message); + producer.close(); + } + + @Override + public void finishBundle(Context c) throws Exception { + if (session != null) { + session.close(); + } + if (connection != null) { + connection.close(); + } + } + + } + + } + + /** Disallow construction of utility class. */ + private Write() {} + + } + +} diff --git a/contrib/jms/src/test/java/com/google/cloud/dataflow/contrib/jms/JmsIOTest.java b/contrib/jms/src/test/java/com/google/cloud/dataflow/contrib/jms/JmsIOTest.java new file mode 100644 index 0000000000..ff0448b68e --- /dev/null +++ b/contrib/jms/src/test/java/com/google/cloud/dataflow/contrib/jms/JmsIOTest.java @@ -0,0 +1,140 @@ +/* + * Copyright (C) 2015 The Google Cloud Dataflow Hadoop Library Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + + +package com.google.cloud.dataflow.contrib.jms; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; +import com.google.cloud.dataflow.sdk.testing.DataflowAssert; +import com.google.cloud.dataflow.sdk.testing.TestPipeline; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.values.PCollection; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.memory.MemoryPersistenceAdapter; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +/** + * Test of the JmsIO. + */ +public class JmsIOTest { + + private static final String BROKER_URL = "vm://localhost"; + + private BrokerService broker; + private ConnectionFactory connectionFactory; + + @Before + public void startBroker() throws Exception { + System.out.println("Starting ActiveMQ broker on " + BROKER_URL); + broker = new BrokerService(); + broker.setUseJmx(false); + broker.setPersistenceAdapter(new MemoryPersistenceAdapter()); + broker.addConnector(BROKER_URL); + broker.setBrokerName("localhost"); + broker.start(); + + // create JMS connection factory + System.out.println("Create JMS connection factory on " + BROKER_URL); + connectionFactory = new ActiveMQConnectionFactory(BROKER_URL); + } + + @Test + public void testReadSingleMessage() throws Exception { + + // produce message + System.out.println("Producing a test message on the broker ..."); + Connection connection = connectionFactory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(session.createQueue("test")); + TextMessage message = session.createTextMessage("This Is A Test"); + producer.send(message); + producer.send(message); + producer.send(message); + producer.send(message); + producer.send(message); + producer.send(message); + producer.close(); + session.close(); + connection.close(); + + System.out.println("Creating test pipeline"); + Pipeline pipeline = TestPipeline.create(); + + // read from the queue + System.out.println("Reading from the JMS queue and create PCollection"); + PCollection output = pipeline.apply( + JmsIO.Read + .connectionFactory(connectionFactory) + .queue("test") + .maxNumMessages(5)); + + System.out.println("Starting the pipeline"); + pipeline.run(); + + System.out.println("Check assertion"); + DataflowAssert.that(output).containsInAnyOrder(new String[]{ "test "}); + + System.out.println(output); + } + + @Test + public void testWriteMessage() throws Exception { + System.out.println("Create test pipeline"); + Pipeline pipeline = TestPipeline.create(); + + System.out.println("Create PCollection"); + ArrayList data = new ArrayList<>(); + data.add("Test"); + PCollection input = pipeline.apply(Create.of(data).withCoder(StringUtf8Coder.of())); + + System.out.println("Write PCollection to the JMS queue"); + JmsIO.Write.Bound write = JmsIO.Write + .connectionFactory(connectionFactory) + .queue("test"); + + input.apply(write); + + pipeline.run(); + + Connection connection = connectionFactory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer messageConsumer = session.createConsumer(session.createQueue("test")); + TextMessage message = (TextMessage) messageConsumer.receive(10000); + Assert.assertEquals(message.getText(), "Test"); + } + + @After + public void stopBroker() throws Exception { + broker.stop(); + } + +}