Skip to content

Commit

Permalink
[BEAM-716] Use AutoValue in JmsIO
Browse files Browse the repository at this point in the history
  • Loading branch information
jbonofre committed Dec 11, 2016
1 parent 1fab152 commit 4989d83
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 100 deletions.
7 changes: 7 additions & 0 deletions sdks/java/io/jms/pom.xml
Expand Up @@ -94,6 +94,13 @@
<artifactId>jsr305</artifactId>
</dependency>

<!-- compile dependencies -->
<dependency>
<groupId>com.google.auto.value</groupId>
<artifactId>auto-value</artifactId>
<scope>provided</scope>
</dependency>

<!-- test dependencies -->
<dependency>
<groupId>org.apache.activemq</groupId>
Expand Down
189 changes: 89 additions & 100 deletions sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
Expand Up @@ -20,6 +20,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -101,37 +102,66 @@ public class JmsIO {
private static final Logger LOG = LoggerFactory.getLogger(JmsIO.class);

public static Read read() {
return new Read(null, null, null, Long.MAX_VALUE, null);
return new AutoValue_JmsIO_Read.Builder().setMaxNumRecords(Long.MAX_VALUE).build();
}

public static Write write() {
return new Write(null, null, null);
return new AutoValue_JmsIO_Write.Builder().build();
}

/**
* A {@link PTransform} to read from a JMS destination. See {@link JmsIO} for more
* information on usage and configuration.
*/
public static class Read extends PTransform<PBegin, PCollection<JmsRecord>> {
@AutoValue
public abstract static class Read extends PTransform<PBegin, PCollection<JmsRecord>> {

/**
* NB: According to http://docs.oracle.com/javaee/1.4/api/javax/jms/ConnectionFactory.html
* "It is expected that JMS providers will provide the tools an administrator needs to create
* and configure administered objects in a JNDI namespace. JMS provider implementations of
* administered objects should be both javax.jndi.Referenceable and java.io.Serializable so
* that they can be stored in all JNDI naming contexts. In addition, it is recommended that
* these implementations follow the JavaBeansTM design patterns."
*
* <p>So, a {@link ConnectionFactory} implementation is serializable.
*/
@Nullable abstract ConnectionFactory getConnectionFactory();
@Nullable abstract String getQueue();
@Nullable abstract String getTopic();
abstract long getMaxNumRecords();
@Nullable abstract Duration getMaxReadTime();

abstract Builder builder();

@AutoValue.Builder
abstract static class Builder {
abstract Builder setConnectionFactory(ConnectionFactory connectionFactory);
abstract Builder setQueue(String queue);
abstract Builder setTopic(String topic);
abstract Builder setMaxNumRecords(long maxNumRecords);
abstract Builder setMaxReadTime(Duration maxReadTime);
abstract Read build();
}

public Read withConnectionFactory(ConnectionFactory connectionFactory) {
return new Read(connectionFactory, queue, topic, maxNumRecords, maxReadTime);
return builder().setConnectionFactory(connectionFactory).build();
}

public Read withQueue(String queue) {
return new Read(connectionFactory, queue, topic, maxNumRecords, maxReadTime);
return builder().setQueue(queue).build();
}

public Read withTopic(String topic) {
return new Read(connectionFactory, queue, topic, maxNumRecords, maxReadTime);
return builder().setTopic(topic).build();
}

public Read withMaxNumRecords(long maxNumRecords) {
return new Read(connectionFactory, queue, topic, maxNumRecords, maxReadTime);
return builder().setMaxNumRecords(maxNumRecords).build();
}

public Read withMaxReadTime(Duration maxReadTime) {
return new Read(connectionFactory, queue, topic, maxNumRecords, maxReadTime);
return builder().setMaxReadTime(maxReadTime).build();
}

@Override
Expand All @@ -141,76 +171,40 @@ public PCollection<JmsRecord> expand(PBegin input) {

PTransform<PBegin, PCollection<JmsRecord>> transform = unbounded;

if (maxNumRecords != Long.MAX_VALUE) {
transform = unbounded.withMaxNumRecords(maxNumRecords);
} else if (maxReadTime != null) {
transform = unbounded.withMaxReadTime(maxReadTime);
if (getMaxNumRecords() != Long.MAX_VALUE) {
transform = unbounded.withMaxNumRecords(getMaxNumRecords());
} else if (getMaxReadTime() != null) {
transform = unbounded.withMaxReadTime(getMaxReadTime());
}

return input.getPipeline().apply(transform);
}

@Override
public void validate(PBegin input) {
checkNotNull(connectionFactory, "ConnectionFactory not specified");
checkArgument((queue != null || topic != null), "Either queue or topic not specified");
checkNotNull(getConnectionFactory(), "ConnectionFactory not specified");
checkArgument((getQueue() != null || getTopic() != null), "Either queue or topic not "
+ "specified");
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);

builder.addIfNotNull(DisplayData.item("queue", queue));
builder.addIfNotNull(DisplayData.item("topic", topic));
builder.addIfNotNull(DisplayData.item("queue", getQueue()));
builder.addIfNotNull(DisplayData.item("topic", getTopic()));

}

///////////////////////////////////////////////////////////////////////////////////////

/**
* NB: According to http://docs.oracle.com/javaee/1.4/api/javax/jms/ConnectionFactory.html
* "It is expected that JMS providers will provide the tools an administrator needs to create
* and configure administered objects in a JNDI namespace. JMS provider implementations of
* administered objects should be both javax.jndi.Referenceable and java.io.Serializable so
* that they can be stored in all JNDI naming contexts. In addition, it is recommended that
* these implementations follow the JavaBeansTM design patterns."
*
* <p>So, a {@link ConnectionFactory} implementation is serializable.
*/
protected ConnectionFactory connectionFactory;
@Nullable
protected String queue;
@Nullable
protected String topic;
protected long maxNumRecords;
protected Duration maxReadTime;

private Read(
ConnectionFactory connectionFactory,
String queue,
String topic,
long maxNumRecords,
Duration maxReadTime) {
super("JmsIO.Read");

this.connectionFactory = connectionFactory;
this.queue = queue;
this.topic = topic;
this.maxNumRecords = maxNumRecords;
this.maxReadTime = maxReadTime;
}

/**
* Creates an {@link UnboundedSource UnboundedSource&lt;JmsRecord, ?&gt;} with the configuration
* in {@link Read}. Primary use case is unit tests, should not be used in an
* application.
*/
@VisibleForTesting
UnboundedSource<JmsRecord, JmsCheckpointMark> createSource() {
return new UnboundedJmsSource(
connectionFactory,
queue,
topic);
return new UnboundedJmsSource(this);
}

}
Expand All @@ -219,25 +213,18 @@ private JmsIO() {}

private static class UnboundedJmsSource extends UnboundedSource<JmsRecord, JmsCheckpointMark> {

private final ConnectionFactory connectionFactory;
private final String queue;
private final String topic;
private final Read spec;

public UnboundedJmsSource(
ConnectionFactory connectionFactory,
String queue,
String topic) {
this.connectionFactory = connectionFactory;
this.queue = queue;
this.topic = topic;
public UnboundedJmsSource(Read spec) {
this.spec = spec;
}

@Override
public List<UnboundedJmsSource> generateInitialSplits(
int desiredNumSplits, PipelineOptions options) throws Exception {
List<UnboundedJmsSource> sources = new ArrayList<>();
for (int i = 0; i < desiredNumSplits; i++) {
sources.add(new UnboundedJmsSource(connectionFactory, queue, topic));
sources.add(new UnboundedJmsSource(spec));
}
return sources;
}
Expand All @@ -250,8 +237,7 @@ public UnboundedJmsReader createReader(PipelineOptions options,

@Override
public void validate() {
checkNotNull(connectionFactory, "ConnectionFactory is not defined");
checkArgument((queue != null || topic != null), "Either queue or topic is not defined");
spec.validate(null);
}

@Override
Expand Down Expand Up @@ -291,15 +277,17 @@ public UnboundedJmsReader(

@Override
public boolean start() throws IOException {
ConnectionFactory connectionFactory = source.connectionFactory;
ConnectionFactory connectionFactory = source.spec.getConnectionFactory();
try {
this.connection = connectionFactory.createConnection();
this.connection.start();
this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
if (source.topic != null) {
this.consumer = this.session.createConsumer(this.session.createTopic(source.topic));
if (source.spec.getTopic() != null) {
this.consumer =
this.session.createConsumer(this.session.createTopic(source.spec.getTopic()));
} else {
this.consumer = this.session.createConsumer(this.session.createQueue(source.queue));
this.consumer =
this.session.createConsumer(this.session.createQueue(source.spec.getQueue()));
}

return advance();
Expand Down Expand Up @@ -409,70 +397,72 @@ public void close() throws IOException {
* A {@link PTransform} to write to a JMS queue. See {@link JmsIO} for
* more information on usage and configuration.
*/
public static class Write extends PTransform<PCollection<String>, PDone> {
@AutoValue
public abstract static class Write extends PTransform<PCollection<String>, PDone> {

@Nullable abstract ConnectionFactory getConnectionFactory();
@Nullable abstract String getQueue();
@Nullable abstract String getTopic();

abstract Builder builder();

protected ConnectionFactory connectionFactory;
protected String queue;
protected String topic;
@AutoValue.Builder
abstract static class Builder {
abstract Builder setConnectionFactory(ConnectionFactory connectionFactory);
abstract Builder setQueue(String queue);
abstract Builder setTopic(String topic);
abstract Write build();
}

public Write withConnectionFactory(ConnectionFactory connectionFactory) {
return new Write(connectionFactory, queue, topic);
return builder().setConnectionFactory(connectionFactory).build();
}

public Write withQueue(String queue) {
return new Write(connectionFactory, queue, topic);
return builder().setQueue(queue).build();
}

public Write withTopic(String topic) {
return new Write(connectionFactory, queue, topic);
}

private Write(ConnectionFactory connectionFactory, String queue, String topic) {
this.connectionFactory = connectionFactory;
this.queue = queue;
this.topic = topic;
return builder().setTopic(topic).build();
}

@Override
public PDone expand(PCollection<String> input) {
input.apply(ParDo.of(new JmsWriter(connectionFactory, queue, topic)));
input.apply(ParDo.of(new WriterFn(this)));
return PDone.in(input.getPipeline());
}

@Override
public void validate(PCollection<String> input) {
checkNotNull(connectionFactory, "ConnectionFactory is not defined");
checkArgument((queue != null || topic != null), "Either queue or topic is required");
checkNotNull(getConnectionFactory(), "ConnectionFactory is not defined");
checkArgument((getQueue() != null || getTopic() != null), "Either queue or topic is "
+ "required");
}

private static class JmsWriter extends DoFn<String, Void> {
private static class WriterFn extends DoFn<String, Void> {

private ConnectionFactory connectionFactory;
private String queue;
private String topic;
private Write spec;

private Connection connection;
private Session session;
private MessageProducer producer;

public JmsWriter(ConnectionFactory connectionFactory, String queue, String topic) {
this.connectionFactory = connectionFactory;
this.queue = queue;
this.topic = topic;
public WriterFn(Write spec) {
this.spec = spec;
}

@StartBundle
public void startBundle(Context c) throws Exception {
if (producer == null) {
this.connection = connectionFactory.createConnection();
this.connection = spec.getConnectionFactory().createConnection();
this.connection.start();
// false means we don't use JMS transaction.
this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination;
if (queue != null) {
destination = session.createQueue(queue);
if (spec.getQueue() != null) {
destination = session.createQueue(spec.getQueue());
} else {
destination = session.createTopic(topic);
destination = session.createTopic(spec.getTopic());
}
this.producer = this.session.createProducer(destination);
}
Expand All @@ -481,7 +471,6 @@ public void startBundle(Context c) throws Exception {
@ProcessElement
public void processElement(ProcessContext ctx) throws Exception {
String value = ctx.element();

try {
TextMessage message = session.createTextMessage(value);
producer.send(message);
Expand Down

0 comments on commit 4989d83

Please sign in to comment.