Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions contrib/jms/AUTHORS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Authors of 'jms' module

The following is the official list of authors for copyright purposes of this community-contributed module.

Apache
Jean-Baptiste Onofré, jbonofre [at] apache [dot] org
Google Inc.
45 changes: 45 additions & 0 deletions contrib/jms/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# JMS module
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably you want to say what JMS is and provide a link somewhere in the first line or two of the README


This library provides Dataflow sources and sinkgs to make it possible to read
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sinks

and write on JMS brokers from Dataflow pipelines.

It supports both JMS queues and topics, with unbounded or bounded `PCollections`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merge into previous paragraph?


To use JmsIO, you have to:

1. Create a JMS `ConnectionFactory` specific to the JMS broker you want to use (for instance, ActiveMQ, IBM MQ, ...)
2. Specify the JMS destination (queue or topic) you want to use

## Reading (consuming messages) with JmsIO

The `JmsIO.Read` transform continuously consumes from the JMS broker and returns an unbounded `PCollection` of `Strings` that
represent the messages. By default, each element in the resulting `PCollection` is encoded as a UTF-8 string.
You can override the default encoding by using `withCoder` when you call JmsIO.Read.

----
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);

// create JMS connection factory, for instance, with ActiveMQ
javax.jms.ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");

PCollection<String> data = p.apply(JmsIO.Read.named("ConsumeFromJMS").connectionFactory(connectionFactory).queue("my-queue"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drop the .named code, in favor of just applying the name during p.apply. I think its just unnecessary code in this case.

----

### Reading a bounded set of messages from JMS

The `DirectPipelineRunner` and the batch mode of the Dataflow service do not support unbounded `PCollections`.
To use JmsIO as source in these contexts, you need to supply a bound on the amount of messages to consume.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This text makes me think that bounded use of JmsIO is a weird corner case. I think there's plenty of reasons a user may want to operate in bounded mode, e.g., for testing or during early pipeline development, or because they're loading historical data. So the text below de-marginalizes it.


You can specify the `.maxNumMessages` option to read a fixed maximum number of messages.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find it interesting that you separated out the above two sections. Is there a good reason for this?

I would have kept it simpler, and more unified, something like this:

Reading from JMS

The JmsIO.Read transform produces an unbounded PCollection containing messages received from a JMS queue. By default, each element in the output collection is a UTF-8-encoded string. You can override the default encoding using JmsIO.Read.withCoder.

Pipeline p = ...;

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");

// Producing Strings by default
JmsIO.Read read = JmsIO.Read.connectionFactory(connectionFactory).queue("my-queue");
PCollection<String> data = p.apply("Read from ActiveMQ", read);

// Producing a byte[] using a different Coder
JmsIO.Read read = JmsIO.Read.connectionFactory(connectionFactory).queue("my-queue").withCoder(ByteArrayCoder.class);

JmsIO can also produce a bounded PCollection by limiting either the number of records returned (Read.withMaxNumRecords) or the time for which records are consumed (Read.withMaxReadTime). Note that if only a record limit is configured, your pipeline will not finish executing until the desired number of records is received.

JmsIO.Read read = JmsIO.Read.connectionFactory(connectionFactory).queue("my-queue").withMaxNumRecords(5);

There should probably be a type variable on JmsIO.Read, like JmsIO.Read<String>, JmsIO.Read<byte[]>, but I haven't gotten to that code yet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would keep the unbounded -> bounded method names unchanged from BoundedReadFromUnboundedSource: withMaxNumRecords and withMaxReadTime.

More unified API.

I'd also suggest copying both methods, not just limiting records, because the blocking semantics or limiting records only has tripped users up in the past.


## Writing (producing messages) with JmsIO
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar changes here.

  • .connectionFactory -> .withConnectionFactory.
  • The section could be more concisely "Writing to JMS"
  • drop "continuously".
  • hopefully, the writer takes a bounded or unbounded PCollection, not just an unbounded PCollection.


The JmsIO.Write transform continuously writes an unbounded `PCollection` of `String` objects, produced to a
JMS broker. By default, the input `PCollection` to `JmsIO.Write` must contain strings encoded in UTF-8.
You can change the expected input type and encoding by using `withCoder`.

----
PCollection<String> data = ...;
data.apply(JmsIO.Write.named("ProduceToJMS").connectionFactory(connectionFactory).queue("my-queue"));
----
170 changes: 170 additions & 0 deletions contrib/jms/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
~ Copyright (C) 2015 The Google Cloud Dataflow JMS Library Authors
~
~ Licensed under the Apache License, Version 2.0 (the "License"); you may not
~ use this file except in compliance with the License. You may obtain a copy of
~ the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
~ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
~ License for the specific language governing permissions and limitations under
~ the License.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.google.cloud.dataflow</groupId>
<artifactId>google-cloud-dataflow-java-contrib-jms</artifactId>
<name>Google Cloud Dataflow JMS Library</name>
<description>Library to read and write data from a JMS broker.</description>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<licenses>
<license>
<name>Apache License, Version 2.0</name>
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
<distribution>repo</distribution>
</license>
</licenses>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<google-cloud-dataflow-version>[1.2.0,2.0.0)</google-cloud-dataflow-version>
</properties>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>2.12</version>
<dependencies>
<dependency>
<groupId>com.puppycrawl.tools</groupId>
<artifactId>checkstyle</artifactId>
<version>6.6</version>
</dependency>
</dependencies>
<configuration>
<configLocation>../../checkstyle.xml</configLocation>
<consoleOutput>true</consoleOutput>
<failOnViolation>true</failOnViolation>
<includeTestSourceDirectory>true</includeTestSourceDirectory>
</configuration>
<executions>
<execution>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>

<!-- Source plugin for generating source and test-source JARs. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>2.4</version>
<executions>
<execution>
<id>attach-sources</id>
<phase>compile</phase>
<goals>
<goal>jar</goal>
</goals>
</execution>
<execution>
<id>attach-test-sources</id>
<phase>test-compile</phase>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<configuration>
<windowtitle>Google Cloud Dataflow JMS Contrib</windowtitle>
<doctitle>Google Cloud Dataflow JMS Contrib</doctitle>

<subpackages>com.google.cloud.dataflow.contrib.jms</subpackages>
<use>false</use>
<bottom><![CDATA[<br>]]></bottom>

<offlineLinks>
<offlineLink>
<url>https://cloud.google.com/dataflow/java-sdk/JavaDoc/</url>
<location>${basedir}/../../javadoc/dataflow-sdk-docs</location>
</offlineLink>
<offlineLink>
<url>http://docs.guava-libraries.googlecode.com/git-history/release18/javadoc/</url>
<location>${basedir}/../../javadoc/guava-docs</location>
</offlineLink>
</offlineLinks>
</configuration>
<executions>
<execution>
<goals>
<goal>jar</goal>
</goals>
<phase>package</phase>
</execution>
</executions>
</plugin>
</plugins>
</build>

<dependencies>
<dependency>
<groupId>com.google.cloud.dataflow</groupId>
<artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
<version>${google-cloud-dataflow-version}</version>
</dependency>

<dependency>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jms_1.1_spec</artifactId>
<version>1.1.1</version>
</dependency>

<!-- test dependencies -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-broker</artifactId>
<version>5.11.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-kahadb-store</artifactId>
<version>5.11.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Loading