Skip to content

Commit

Permalink
Reduce flexibility and improve JMS sink (#1807)
Browse files Browse the repository at this point in the history
Remove `sessionFn`, `flushFn` and `sendFn`. They would prevent implementing ex-once in the future without breaking changes.

Use transactions for sending: this is recommended for ActiveMQ and WebLogic JMS as an optimization. `MessageProducer.send` is synchronous normally, but in transactions it's async
and `commit()` waits for all in-flight sends.
  • Loading branch information
viliam-durina committed Dec 16, 2019
1 parent 52f00bb commit c4aa54e
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 136 deletions.
Expand Up @@ -37,7 +37,6 @@
import javax.annotation.Nonnull;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import java.io.BufferedWriter;
import java.io.OutputStreamWriter;
Expand Down Expand Up @@ -296,29 +295,23 @@ public static <W, T> SupplierEx<Processor> writeBufferedP(
*/
@Nonnull
public static <T> ProcessorMetaSupplier writeJmsQueueP(
@Nonnull String queueName,
@Nonnull SupplierEx<? extends Connection> newConnectionFn,
@Nonnull FunctionEx<? super Connection, ? extends Session> newSessionFn,
@Nonnull BiFunctionEx<? super Session, ? super T, ? extends Message> messageFn,
@Nonnull BiConsumerEx<? super MessageProducer, ? super Message> sendFn,
@Nonnull ConsumerEx<? super Session> flushFn,
@Nonnull String name
@Nonnull BiFunctionEx<? super Session, ? super T, ? extends Message> messageFn
) {
return WriteJmsP.supplier(newConnectionFn, newSessionFn, messageFn, sendFn, flushFn, name, false);
return WriteJmsP.supplier(queueName, newConnectionFn, messageFn, false);
}

/**
* Returns a supplier of processors for {@link Sinks#jmsTopicBuilder}.
*/
@Nonnull
public static <T> ProcessorMetaSupplier writeJmsTopicP(
@Nonnull String topicName,
@Nonnull SupplierEx<? extends Connection> newConnectionFn,
@Nonnull FunctionEx<? super Connection, ? extends Session> newSessionFn,
@Nonnull BiFunctionEx<? super Session, ? super T, ? extends Message> messageFn,
@Nonnull BiConsumerEx<? super MessageProducer, ? super Message> sendFn,
@Nonnull ConsumerEx<? super Session> flushFn,
@Nonnull String name
@Nonnull BiFunctionEx<? super Session, ? super T, ? extends Message> messageFn
) {
return WriteJmsP.supplier(newConnectionFn, newSessionFn, messageFn, sendFn, flushFn, name, true);
return WriteJmsP.supplier(topicName, newConnectionFn, messageFn, true);
}

/**
Expand Down
Expand Up @@ -18,7 +18,6 @@

import com.hazelcast.function.BiConsumerEx;
import com.hazelcast.function.BiFunctionEx;
import com.hazelcast.function.ConsumerEx;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.SupplierEx;
import com.hazelcast.jet.core.Processor;
Expand All @@ -29,6 +28,7 @@
import javax.annotation.Nonnull;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
Expand All @@ -55,54 +55,38 @@ private WriteJmsP() {
* SinkProcessors#writeJmsTopicP} instead
*/
public static <T> ProcessorMetaSupplier supplier(
String destinationName,
SupplierEx<? extends Connection> newConnectionFn,
FunctionEx<? super Connection, ? extends Session> newSessionFn,
BiFunctionEx<? super Session, T, ? extends Message> messageFn,
BiConsumerEx<? super MessageProducer, ? super Message> sendFn,
ConsumerEx<? super Session> flushFn,
String name,
boolean isTopic
) {
checkSerializable(newConnectionFn, "newConnectionFn");
checkSerializable(newSessionFn, "newSessionFn");
checkSerializable(messageFn, "messageFn");
checkSerializable(sendFn, "sendFn");
checkSerializable(flushFn, "flushFn");

return ProcessorMetaSupplier.of(
PREFERRED_LOCAL_PARALLELISM,
new Supplier<>(newConnectionFn, newSessionFn, messageFn, sendFn, flushFn, name, isTopic)
);
return ProcessorMetaSupplier.of(PREFERRED_LOCAL_PARALLELISM,
new Supplier<>(destinationName, newConnectionFn, messageFn, isTopic));
}

private static final class Supplier<T> implements ProcessorSupplier {

static final long serialVersionUID = 1L;

private final SupplierEx<? extends Connection> newConnectionFn;
private final FunctionEx<? super Connection, ? extends Session> newSessionFn;
private final String name;
private final boolean isTopic;
private final BiFunctionEx<? super Session, ? super T, ? extends Message> messageFn;
private final BiConsumerEx<? super MessageProducer, ? super Message> sendFn;
private final ConsumerEx<? super Session> flushFn;

private transient Connection connection;

private Supplier(SupplierEx<? extends Connection> newConnectionFn,
FunctionEx<? super Connection, ? extends Session> newSessionFn,
BiFunctionEx<? super Session, ? super T, ? extends Message> messageFn,
BiConsumerEx<? super MessageProducer, ? super Message> sendFn,
ConsumerEx<? super Session> flushFn,
String name,
boolean isTopic
private Supplier(
String destinationName,
SupplierEx<? extends Connection> newConnectionFn,
BiFunctionEx<? super Session, ? super T, ? extends Message> messageFn,
boolean isTopic
) {
this.newConnectionFn = newConnectionFn;
this.newSessionFn = newSessionFn;
this.messageFn = messageFn;
this.sendFn = sendFn;
this.flushFn = flushFn;
this.name = name;
this.name = destinationName;
this.isTopic = isTopic;
}

Expand All @@ -112,25 +96,19 @@ public void init(@Nonnull Context ignored) throws Exception {
connection.start();
}

@Nonnull
@Override
@Nonnull @Override
public Collection<? extends Processor> get(int count) {
FunctionEx<Processor.Context, JmsContext> createFn = jet -> {
Session session = newSessionFn.apply(connection);
Session session = connection.createSession(true, 0);
Destination destination = isTopic ? session.createTopic(name) : session.createQueue(name);
MessageProducer producer = session.createProducer(destination);
return new JmsContext(session, producer);
};
BiConsumerEx<JmsContext, T> onReceiveFn = (jmsContext, item) -> {
Message message = messageFn.apply(jmsContext.session, item);
sendFn.accept(jmsContext.producer, message);
};
ConsumerEx<JmsContext> flushF = jmsContext -> flushFn.accept(jmsContext.session);
ConsumerEx<JmsContext> destroyFn = jmsContext -> {
jmsContext.producer.close();
jmsContext.session.close();
jmsContext.producer.send(message);
};
SupplierEx<Processor> supplier = writeBufferedP(createFn, onReceiveFn, flushF, destroyFn);
SupplierEx<Processor> supplier = writeBufferedP(createFn, onReceiveFn, JmsContext::commit, JmsContext::close);

return Stream.generate(supplier).limit(count).collect(toList());
}
Expand All @@ -142,14 +120,23 @@ public void close(Throwable error) throws Exception {
}
}

class JmsContext {
static class JmsContext {
private final Session session;
private final MessageProducer producer;

JmsContext(Session session, MessageProducer producer) {
this.session = session;
this.producer = producer;
}

public void commit() throws JMSException {
session.commit();
}

public void close() throws JMSException {
producer.close();
session.close();
}
}
}
}
Expand Up @@ -16,9 +16,7 @@

package com.hazelcast.jet.pipeline;

import com.hazelcast.function.BiConsumerEx;
import com.hazelcast.function.BiFunctionEx;
import com.hazelcast.function.ConsumerEx;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.SupplierEx;
import com.hazelcast.jet.impl.connector.WriteJmsP;
Expand All @@ -27,7 +25,6 @@
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;

import static com.hazelcast.internal.util.Preconditions.checkNotNull;
Expand All @@ -46,15 +43,10 @@ public final class JmsSinkBuilder<T> {
private final boolean isTopic;

private FunctionEx<ConnectionFactory, Connection> connectionFn;
private FunctionEx<Connection, Session> sessionFn;
private BiFunctionEx<Session, T, Message> messageFn;
private BiConsumerEx<MessageProducer, Message> sendFn;
private ConsumerEx<Session> flushFn;

private String username;
private String password;
private boolean transacted;
private int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
private String destinationName;

/**
Expand Down Expand Up @@ -92,34 +84,6 @@ public JmsSinkBuilder<T> connectionFn(@Nonnull FunctionEx<ConnectionFactory, Con
return this;
}

/**
* Sets the session parameters. If {@code sessionFn} is provided, these
* parameters are ignored.
*
* @param transacted if true, marks the session as transacted.
* Default value is false.
* @param acknowledgeMode sets the acknowledge mode of the session,
* Default value is {@code Session.AUTO_ACKNOWLEDGE}
*/
public JmsSinkBuilder<T> sessionParams(boolean transacted, int acknowledgeMode) {
this.transacted = transacted;
this.acknowledgeMode = acknowledgeMode;
return this;
}

/**
* Sets the function which creates a session from a connection.
* <p>
* If not provided, the builder creates a function which uses {@code
* Connection#createSession(boolean transacted, int acknowledgeMode)} to
* create the session. See {@link #sessionParams(boolean, int)}.
*/
public JmsSinkBuilder<T> sessionFn(@Nonnull FunctionEx<Connection, Session> sessionFn) {
checkSerializable(sessionFn, "sessionFn");
this.sessionFn = sessionFn;
return this;
}

/**
* Sets the name of the destination.
*/
Expand All @@ -141,62 +105,28 @@ public JmsSinkBuilder<T> messageFn(BiFunctionEx<Session, T, Message> messageFn)
return this;
}

/**
* Sets the function which sends the message via message producer.
* <p>
* If not provided, the builder creates a function which sends the message via
* {@code MessageProducer#send(Message message)}.
*/
public JmsSinkBuilder<T> sendFn(BiConsumerEx<MessageProducer, Message> sendFn) {
checkSerializable(sendFn, "sendFn");
this.sendFn = sendFn;
return this;
}

/**
* Sets the function which flushes the session after a batch of messages is
* sent.
* <p>
* If not provided, the builder creates a no-op consumer.
*/
public JmsSinkBuilder<T> flushFn(ConsumerEx<Session> flushFn) {
checkSerializable(flushFn, "flushFn");
this.flushFn = flushFn;
return this;
}

/**
* Creates and returns the JMS {@link Sink} with the supplied components.
*/
public Sink<T> build() {
String usernameLocal = username;
String passwordLocal = password;
boolean transactedLocal = transacted;
int acknowledgeModeLocal = acknowledgeMode;

checkNotNull(destinationName);
if (connectionFn == null) {
connectionFn = factory -> factory.createConnection(usernameLocal, passwordLocal);
}
if (sessionFn == null) {
sessionFn = connection -> connection.createSession(transactedLocal, acknowledgeModeLocal);
}
if (messageFn == null) {
messageFn = (session, item) ->
item instanceof Message ? (Message) item : session.createTextMessage(item.toString());
}
if (sendFn == null) {
sendFn = MessageProducer::send;
}
if (flushFn == null) {
flushFn = ConsumerEx.noop();
}

FunctionEx<ConnectionFactory, Connection> connectionFnLocal = connectionFn;
@SuppressWarnings("UnnecessaryLocalVariable") // it's necessary to not capture this in the lambda
SupplierEx<ConnectionFactory> factorySupplierLocal = factorySupplier;
SupplierEx<Connection> newConnectionFn = () -> connectionFnLocal.apply(factorySupplierLocal.get());
return Sinks.fromProcessor(sinkName(),
WriteJmsP.supplier(newConnectionFn, sessionFn, messageFn, sendFn, flushFn, destinationName, isTopic));
WriteJmsP.supplier(destinationName, newConnectionFn, messageFn, isTopic));
}

private String sinkName() {
Expand Down
Expand Up @@ -787,9 +787,8 @@ public static <T> Sink<T> noop() {

/**
* Convenience for {@link #jmsQueueBuilder(SupplierEx)}. Creates a
* connection without any authentication parameters and uses non-transacted
* sessions with {@code Session.AUTO_ACKNOWLEDGE} mode. If a received item
* is not an instance of {@code javax.jms.Message}, the sink wraps {@code
* connection without any authentication parameters. If a received item is
* not an instance of {@code javax.jms.Message}, the sink wraps {@code
* item.toString()} into a {@link javax.jms.TextMessage}.
*
* @param factorySupplier supplier to obtain JMS connection factory
Expand All @@ -807,12 +806,13 @@ public static <T> Sink<T> jmsQueue(

/**
* Returns a builder object that offers a step-by-step fluent API to build
* a custom JMS queue sink for the Pipeline API. See javadoc on {@link
* a custom JMS queue sink for the Pipeline API. See javadoc for {@link
* JmsSinkBuilder} methods for more details.
* <p>
* Behavior on job restart: the processor is stateless. If the job is
* restarted, duplicate events can occur. If you need exactly-once behavior,
* you must ensure idempotence on the application level.
* Behavior on job restart: the processor is stateless. Items are written
* in auto-acknowledge mode. If the job is restarted, duplicate events can
* occur, giving you at-least-once guarantee. If you need exactly-once
* behavior, you must ensure idempotence on the consumer end.
* <p>
* IO failures should be handled by the JMS provider. If any JMS operation
* throws an exception, the job will fail. Most of the providers offer a
Expand Down Expand Up @@ -854,9 +854,10 @@ public static <T> Sink<T> jmsTopic(
* a custom JMS topic sink for the Pipeline API. See javadoc on {@link
* JmsSinkBuilder} methods for more details.
* <p>
* Behavior on job restart: the processor is stateless. If the job is
* restarted, duplicate events can occur. If you need exactly-once behavior,
* you must ensure idempotence on the application level.
* Behavior on job restart: the processor is stateless. Items are written
* in auto-acknowledge mode. If the job is restarted, duplicate events can
* occur, giving you at-least-once guarantee. If you need exactly-once
* behavior, you must ensure idempotence on the consumer end.
* <p>
* IO failures should be handled by the JMS provider. If any JMS operation
* throws an exception, the job will fail. Most of the providers offer a
Expand Down
Expand Up @@ -270,10 +270,7 @@ public void sinkQueue_whenBuilder_withFunctions() throws JMSException {

Sink<String> sink = Sinks.<String>jmsQueueBuilder(() -> broker.createConnectionFactory())
.connectionFn(ConnectionFactory::createConnection)
.sessionFn(connection -> connection.createSession(false, AUTO_ACKNOWLEDGE))
.messageFn(Session::createTextMessage)
.sendFn(MessageProducer::send)
.flushFn(ConsumerEx.noop())
.destinationName(destinationName)
.build();

Expand Down Expand Up @@ -314,7 +311,6 @@ public void sinkTopic_whenBuilder_withParameters() throws JMSException {

Sink<String> sink = Sinks.<String>jmsTopicBuilder(() -> broker.createConnectionFactory())
.connectionParams(null, null)
.sessionParams(false, AUTO_ACKNOWLEDGE)
.destinationName(destinationName)
.build();

Expand Down

0 comments on commit c4aa54e

Please sign in to comment.