From c4aa54e1396b2b2f914eb55e18b7bc697db2ef04 Mon Sep 17 00:00:00 2001 From: Viliam Durina Date: Mon, 16 Dec 2019 14:40:03 +0100 Subject: [PATCH] Reduce flexibility and improve JMS sink (#1807) Remove `sessionFn`, `flushFn` and `sendFn`. They would prevent implementing ex-once in the future without breaking changes. Use transactions for sending: this is recommended for ActiveMQ and WebLogic JMS as an optimization. `MessageProducer.send` is synchronous normally, but in transactions it's async and `commit()` waits for all in-flight sends. --- .../jet/core/processor/SinkProcessors.java | 19 ++--- .../jet/impl/connector/WriteJmsP.java | 61 ++++++--------- .../jet/pipeline/JmsSinkBuilder.java | 74 +------------------ .../com/hazelcast/jet/pipeline/Sinks.java | 21 +++--- .../impl/connector/JmsIntegrationTest.java | 4 - 5 files changed, 43 insertions(+), 136 deletions(-) diff --git a/hazelcast-jet-core/src/main/java/com/hazelcast/jet/core/processor/SinkProcessors.java b/hazelcast-jet-core/src/main/java/com/hazelcast/jet/core/processor/SinkProcessors.java index 4b5c0d6d1fda..f41c5f24f11b 100644 --- a/hazelcast-jet-core/src/main/java/com/hazelcast/jet/core/processor/SinkProcessors.java +++ b/hazelcast-jet-core/src/main/java/com/hazelcast/jet/core/processor/SinkProcessors.java @@ -37,7 +37,6 @@ import javax.annotation.Nonnull; import javax.jms.Connection; import javax.jms.Message; -import javax.jms.MessageProducer; import javax.jms.Session; import java.io.BufferedWriter; import java.io.OutputStreamWriter; @@ -296,14 +295,11 @@ public static SupplierEx writeBufferedP( */ @Nonnull public static ProcessorMetaSupplier writeJmsQueueP( + @Nonnull String queueName, @Nonnull SupplierEx newConnectionFn, - @Nonnull FunctionEx newSessionFn, - @Nonnull BiFunctionEx messageFn, - @Nonnull BiConsumerEx sendFn, - @Nonnull ConsumerEx flushFn, - @Nonnull String name + @Nonnull BiFunctionEx messageFn ) { - return WriteJmsP.supplier(newConnectionFn, newSessionFn, messageFn, sendFn, flushFn, name, false); + return WriteJmsP.supplier(queueName, newConnectionFn, messageFn, false); } /** @@ -311,14 +307,11 @@ public static ProcessorMetaSupplier writeJmsQueueP( */ @Nonnull public static ProcessorMetaSupplier writeJmsTopicP( + @Nonnull String topicName, @Nonnull SupplierEx newConnectionFn, - @Nonnull FunctionEx newSessionFn, - @Nonnull BiFunctionEx messageFn, - @Nonnull BiConsumerEx sendFn, - @Nonnull ConsumerEx flushFn, - @Nonnull String name + @Nonnull BiFunctionEx messageFn ) { - return WriteJmsP.supplier(newConnectionFn, newSessionFn, messageFn, sendFn, flushFn, name, true); + return WriteJmsP.supplier(topicName, newConnectionFn, messageFn, true); } /** diff --git a/hazelcast-jet-core/src/main/java/com/hazelcast/jet/impl/connector/WriteJmsP.java b/hazelcast-jet-core/src/main/java/com/hazelcast/jet/impl/connector/WriteJmsP.java index 03ebaf6fb2c4..4bc0da1d9aa3 100644 --- a/hazelcast-jet-core/src/main/java/com/hazelcast/jet/impl/connector/WriteJmsP.java +++ b/hazelcast-jet-core/src/main/java/com/hazelcast/jet/impl/connector/WriteJmsP.java @@ -18,7 +18,6 @@ import com.hazelcast.function.BiConsumerEx; import com.hazelcast.function.BiFunctionEx; -import com.hazelcast.function.ConsumerEx; import com.hazelcast.function.FunctionEx; import com.hazelcast.function.SupplierEx; import com.hazelcast.jet.core.Processor; @@ -29,6 +28,7 @@ import javax.annotation.Nonnull; import javax.jms.Connection; import javax.jms.Destination; +import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; @@ -55,24 +55,16 @@ private WriteJmsP() { * SinkProcessors#writeJmsTopicP} instead */ public static ProcessorMetaSupplier supplier( + String destinationName, SupplierEx newConnectionFn, - FunctionEx newSessionFn, BiFunctionEx messageFn, - BiConsumerEx sendFn, - ConsumerEx flushFn, - String name, boolean isTopic ) { checkSerializable(newConnectionFn, "newConnectionFn"); - checkSerializable(newSessionFn, "newSessionFn"); checkSerializable(messageFn, "messageFn"); - checkSerializable(sendFn, "sendFn"); - checkSerializable(flushFn, "flushFn"); - return ProcessorMetaSupplier.of( - PREFERRED_LOCAL_PARALLELISM, - new Supplier<>(newConnectionFn, newSessionFn, messageFn, sendFn, flushFn, name, isTopic) - ); + return ProcessorMetaSupplier.of(PREFERRED_LOCAL_PARALLELISM, + new Supplier<>(destinationName, newConnectionFn, messageFn, isTopic)); } private static final class Supplier implements ProcessorSupplier { @@ -80,29 +72,21 @@ private static final class Supplier implements ProcessorSupplier { static final long serialVersionUID = 1L; private final SupplierEx newConnectionFn; - private final FunctionEx newSessionFn; private final String name; private final boolean isTopic; private final BiFunctionEx messageFn; - private final BiConsumerEx sendFn; - private final ConsumerEx flushFn; private transient Connection connection; - private Supplier(SupplierEx newConnectionFn, - FunctionEx newSessionFn, - BiFunctionEx messageFn, - BiConsumerEx sendFn, - ConsumerEx flushFn, - String name, - boolean isTopic + private Supplier( + String destinationName, + SupplierEx newConnectionFn, + BiFunctionEx messageFn, + boolean isTopic ) { this.newConnectionFn = newConnectionFn; - this.newSessionFn = newSessionFn; this.messageFn = messageFn; - this.sendFn = sendFn; - this.flushFn = flushFn; - this.name = name; + this.name = destinationName; this.isTopic = isTopic; } @@ -112,25 +96,19 @@ public void init(@Nonnull Context ignored) throws Exception { connection.start(); } - @Nonnull - @Override + @Nonnull @Override public Collection get(int count) { FunctionEx createFn = jet -> { - Session session = newSessionFn.apply(connection); + Session session = connection.createSession(true, 0); Destination destination = isTopic ? session.createTopic(name) : session.createQueue(name); MessageProducer producer = session.createProducer(destination); return new JmsContext(session, producer); }; BiConsumerEx onReceiveFn = (jmsContext, item) -> { Message message = messageFn.apply(jmsContext.session, item); - sendFn.accept(jmsContext.producer, message); - }; - ConsumerEx flushF = jmsContext -> flushFn.accept(jmsContext.session); - ConsumerEx destroyFn = jmsContext -> { - jmsContext.producer.close(); - jmsContext.session.close(); + jmsContext.producer.send(message); }; - SupplierEx supplier = writeBufferedP(createFn, onReceiveFn, flushF, destroyFn); + SupplierEx supplier = writeBufferedP(createFn, onReceiveFn, JmsContext::commit, JmsContext::close); return Stream.generate(supplier).limit(count).collect(toList()); } @@ -142,7 +120,7 @@ public void close(Throwable error) throws Exception { } } - class JmsContext { + static class JmsContext { private final Session session; private final MessageProducer producer; @@ -150,6 +128,15 @@ class JmsContext { this.session = session; this.producer = producer; } + + public void commit() throws JMSException { + session.commit(); + } + + public void close() throws JMSException { + producer.close(); + session.close(); + } } } } diff --git a/hazelcast-jet-core/src/main/java/com/hazelcast/jet/pipeline/JmsSinkBuilder.java b/hazelcast-jet-core/src/main/java/com/hazelcast/jet/pipeline/JmsSinkBuilder.java index 10aef0133674..421777e645d6 100644 --- a/hazelcast-jet-core/src/main/java/com/hazelcast/jet/pipeline/JmsSinkBuilder.java +++ b/hazelcast-jet-core/src/main/java/com/hazelcast/jet/pipeline/JmsSinkBuilder.java @@ -16,9 +16,7 @@ package com.hazelcast.jet.pipeline; -import com.hazelcast.function.BiConsumerEx; import com.hazelcast.function.BiFunctionEx; -import com.hazelcast.function.ConsumerEx; import com.hazelcast.function.FunctionEx; import com.hazelcast.function.SupplierEx; import com.hazelcast.jet.impl.connector.WriteJmsP; @@ -27,7 +25,6 @@ import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Message; -import javax.jms.MessageProducer; import javax.jms.Session; import static com.hazelcast.internal.util.Preconditions.checkNotNull; @@ -46,15 +43,10 @@ public final class JmsSinkBuilder { private final boolean isTopic; private FunctionEx connectionFn; - private FunctionEx sessionFn; private BiFunctionEx messageFn; - private BiConsumerEx sendFn; - private ConsumerEx flushFn; private String username; private String password; - private boolean transacted; - private int acknowledgeMode = Session.AUTO_ACKNOWLEDGE; private String destinationName; /** @@ -92,34 +84,6 @@ public JmsSinkBuilder connectionFn(@Nonnull FunctionEx sessionParams(boolean transacted, int acknowledgeMode) { - this.transacted = transacted; - this.acknowledgeMode = acknowledgeMode; - return this; - } - - /** - * Sets the function which creates a session from a connection. - *

- * If not provided, the builder creates a function which uses {@code - * Connection#createSession(boolean transacted, int acknowledgeMode)} to - * create the session. See {@link #sessionParams(boolean, int)}. - */ - public JmsSinkBuilder sessionFn(@Nonnull FunctionEx sessionFn) { - checkSerializable(sessionFn, "sessionFn"); - this.sessionFn = sessionFn; - return this; - } - /** * Sets the name of the destination. */ @@ -141,62 +105,28 @@ public JmsSinkBuilder messageFn(BiFunctionEx messageFn) return this; } - /** - * Sets the function which sends the message via message producer. - *

- * If not provided, the builder creates a function which sends the message via - * {@code MessageProducer#send(Message message)}. - */ - public JmsSinkBuilder sendFn(BiConsumerEx sendFn) { - checkSerializable(sendFn, "sendFn"); - this.sendFn = sendFn; - return this; - } - - /** - * Sets the function which flushes the session after a batch of messages is - * sent. - *

- * If not provided, the builder creates a no-op consumer. - */ - public JmsSinkBuilder flushFn(ConsumerEx flushFn) { - checkSerializable(flushFn, "flushFn"); - this.flushFn = flushFn; - return this; - } - /** * Creates and returns the JMS {@link Sink} with the supplied components. */ public Sink build() { String usernameLocal = username; String passwordLocal = password; - boolean transactedLocal = transacted; - int acknowledgeModeLocal = acknowledgeMode; checkNotNull(destinationName); if (connectionFn == null) { connectionFn = factory -> factory.createConnection(usernameLocal, passwordLocal); } - if (sessionFn == null) { - sessionFn = connection -> connection.createSession(transactedLocal, acknowledgeModeLocal); - } if (messageFn == null) { messageFn = (session, item) -> item instanceof Message ? (Message) item : session.createTextMessage(item.toString()); } - if (sendFn == null) { - sendFn = MessageProducer::send; - } - if (flushFn == null) { - flushFn = ConsumerEx.noop(); - } FunctionEx connectionFnLocal = connectionFn; + @SuppressWarnings("UnnecessaryLocalVariable") // it's necessary to not capture this in the lambda SupplierEx factorySupplierLocal = factorySupplier; SupplierEx newConnectionFn = () -> connectionFnLocal.apply(factorySupplierLocal.get()); return Sinks.fromProcessor(sinkName(), - WriteJmsP.supplier(newConnectionFn, sessionFn, messageFn, sendFn, flushFn, destinationName, isTopic)); + WriteJmsP.supplier(destinationName, newConnectionFn, messageFn, isTopic)); } private String sinkName() { diff --git a/hazelcast-jet-core/src/main/java/com/hazelcast/jet/pipeline/Sinks.java b/hazelcast-jet-core/src/main/java/com/hazelcast/jet/pipeline/Sinks.java index b5850a8be075..1d8acb376e09 100644 --- a/hazelcast-jet-core/src/main/java/com/hazelcast/jet/pipeline/Sinks.java +++ b/hazelcast-jet-core/src/main/java/com/hazelcast/jet/pipeline/Sinks.java @@ -787,9 +787,8 @@ public static Sink noop() { /** * Convenience for {@link #jmsQueueBuilder(SupplierEx)}. Creates a - * connection without any authentication parameters and uses non-transacted - * sessions with {@code Session.AUTO_ACKNOWLEDGE} mode. If a received item - * is not an instance of {@code javax.jms.Message}, the sink wraps {@code + * connection without any authentication parameters. If a received item is + * not an instance of {@code javax.jms.Message}, the sink wraps {@code * item.toString()} into a {@link javax.jms.TextMessage}. * * @param factorySupplier supplier to obtain JMS connection factory @@ -807,12 +806,13 @@ public static Sink jmsQueue( /** * Returns a builder object that offers a step-by-step fluent API to build - * a custom JMS queue sink for the Pipeline API. See javadoc on {@link + * a custom JMS queue sink for the Pipeline API. See javadoc for {@link * JmsSinkBuilder} methods for more details. *

- * Behavior on job restart: the processor is stateless. If the job is - * restarted, duplicate events can occur. If you need exactly-once behavior, - * you must ensure idempotence on the application level. + * Behavior on job restart: the processor is stateless. Items are written + * in auto-acknowledge mode. If the job is restarted, duplicate events can + * occur, giving you at-least-once guarantee. If you need exactly-once + * behavior, you must ensure idempotence on the consumer end. *

* IO failures should be handled by the JMS provider. If any JMS operation * throws an exception, the job will fail. Most of the providers offer a @@ -854,9 +854,10 @@ public static Sink jmsTopic( * a custom JMS topic sink for the Pipeline API. See javadoc on {@link * JmsSinkBuilder} methods for more details. *

- * Behavior on job restart: the processor is stateless. If the job is - * restarted, duplicate events can occur. If you need exactly-once behavior, - * you must ensure idempotence on the application level. + * Behavior on job restart: the processor is stateless. Items are written + * in auto-acknowledge mode. If the job is restarted, duplicate events can + * occur, giving you at-least-once guarantee. If you need exactly-once + * behavior, you must ensure idempotence on the consumer end. *

* IO failures should be handled by the JMS provider. If any JMS operation * throws an exception, the job will fail. Most of the providers offer a diff --git a/hazelcast-jet-core/src/test/java/com/hazelcast/jet/impl/connector/JmsIntegrationTest.java b/hazelcast-jet-core/src/test/java/com/hazelcast/jet/impl/connector/JmsIntegrationTest.java index aad04cdf3b54..c804fe590699 100644 --- a/hazelcast-jet-core/src/test/java/com/hazelcast/jet/impl/connector/JmsIntegrationTest.java +++ b/hazelcast-jet-core/src/test/java/com/hazelcast/jet/impl/connector/JmsIntegrationTest.java @@ -270,10 +270,7 @@ public void sinkQueue_whenBuilder_withFunctions() throws JMSException { Sink sink = Sinks.jmsQueueBuilder(() -> broker.createConnectionFactory()) .connectionFn(ConnectionFactory::createConnection) - .sessionFn(connection -> connection.createSession(false, AUTO_ACKNOWLEDGE)) .messageFn(Session::createTextMessage) - .sendFn(MessageProducer::send) - .flushFn(ConsumerEx.noop()) .destinationName(destinationName) .build(); @@ -314,7 +311,6 @@ public void sinkTopic_whenBuilder_withParameters() throws JMSException { Sink sink = Sinks.jmsTopicBuilder(() -> broker.createConnectionFactory()) .connectionParams(null, null) - .sessionParams(false, AUTO_ACKNOWLEDGE) .destinationName(destinationName) .build();