From de00c58808902d5d4593fba41f637884df67c90b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Baptiste=20Onofr=C3=A9?= Date: Wed, 10 May 2017 07:39:56 +0200 Subject: [PATCH 1/3] [BEAM-2246] Use CLIENT_ACK instead of AUTO_ACK in JmsIO --- .../io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java index 4493e56d3fee..8c785494671f 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java @@ -421,7 +421,7 @@ public boolean start() throws IOException { } try { - this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + this.session = this.connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); } catch (Exception e) { throw new IOException("Error creating JMS session", e); } From b45639ae56a28799e609e1b0d5f45ad082d68cad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Baptiste=20Onofr=C3=A9?= Date: Tue, 30 May 2017 21:47:04 +0200 Subject: [PATCH 2/3] [BEAM-2246] Add a test on checkpoint mark finalize that acknowledges messages --- .../org/apache/beam/sdk/io/jms/JmsIO.java | 3 +- .../org/apache/beam/sdk/io/jms/JmsIOTest.java | 77 +++++++++++++++++++ 2 files changed, 79 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java index 8c785494671f..32edaa545f5c 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java @@ -379,7 +379,8 @@ public Coder getDefaultOutputCoder() { } - private static class UnboundedJmsReader extends UnboundedReader { + @VisibleForTesting + static class UnboundedJmsReader extends UnboundedReader { private UnboundedJmsSource source; private JmsCheckpointMark checkpointMark; diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java index 7edda1acd296..fc8fe58391e6 100644 --- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java +++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java @@ -21,12 +21,16 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; import java.util.ArrayList; +import java.util.Enumeration; import java.util.List; import javax.jms.Connection; import javax.jms.ConnectionFactory; @@ -34,6 +38,7 @@ import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; +import javax.jms.QueueBrowser; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; @@ -42,12 +47,14 @@ import org.apache.activemq.security.AuthenticationUser; import org.apache.activemq.security.SimpleAuthenticationPlugin; import org.apache.activemq.store.memory.MemoryPersistenceAdapter; +import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.values.PCollection; import org.junit.After; import org.junit.Before; @@ -71,6 +78,7 @@ public class JmsIOTest { private BrokerService broker; private ConnectionFactory connectionFactory; + private ConnectionFactory connectionFactoryWithoutPrefetch; @Rule public final transient TestPipeline pipeline = TestPipeline.create(); @@ -98,6 +106,8 @@ public void startBroker() throws Exception { // create JMS connection factory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL); + connectionFactoryWithoutPrefetch = + new ActiveMQConnectionFactory(BROKER_URL + "?jms.prefetchPolicy.all=0"); } @After @@ -236,4 +246,71 @@ public void testSplitForTopic() throws Exception { assertEquals(1, splits.size()); } + @Test + public void testCheckpointMark() throws Exception { + Connection connection = connectionFactoryWithoutPrefetch.createConnection(USERNAME, PASSWORD); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(session.createQueue(QUEUE)); + for (int i = 0; i < 1000; i++) { + producer.send(session.createTextMessage("test " + i)); + } + producer.close(); + session.close(); + connection.close(); + + JmsIO.Read spec = JmsIO.read() + .withConnectionFactory(connectionFactoryWithoutPrefetch) + .withUsername(USERNAME) + .withPassword(PASSWORD) + .withQueue(QUEUE); + JmsIO.UnboundedJmsSource source = new JmsIO.UnboundedJmsSource(spec); + JmsIO.UnboundedJmsReader reader = source.createReader(null, null); + + // start the reader and move to the first record + assertTrue(reader.start()); + + // consume 249 messages (as start already consumed the first message) + for (int i = 0; i < 249; i++) { + assertTrue(reader.advance()); + } + + // the messages are still pending in the queue (no ACK yet) + assertEquals(1000, count(QUEUE)); + + // we finalize the checkpoint + reader.getCheckpointMark().finalizeCheckpoint(); + + // the checkpoint finalize ack the messages, and so they are not pending in the queue anymore + assertEquals(750, count(QUEUE)); + + // we read the 750 pending messages + for (int i = 0; i < 750; i++) { + assertTrue(reader.advance()); + } + + // still 750 pending messages as we didn't finalize the checkpoint + assertEquals(750, count(QUEUE)); + + // we finalize the checkpoint: no more message in the queue + reader.getCheckpointMark().finalizeCheckpoint(); + + assertEquals(0, count(QUEUE)); + } + + + private int count(String queue) throws Exception { + Connection connection = connectionFactory.createConnection(USERNAME, PASSWORD); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + QueueBrowser browser = session.createBrowser(session.createQueue(queue)); + Enumeration messages = browser.getEnumeration(); + int count = 0; + while (messages.hasMoreElements()) { + messages.nextElement(); + count++; + } + return count; + } + } From 92b69fdd570d21efb83395f9e432f547aa8db61a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Baptiste=20Onofr=C3=A9?= Date: Wed, 31 May 2017 18:25:34 +0200 Subject: [PATCH 3/3] [BEAM-2246] Add explanation about no prefetch, reduce to 10 messages in the checkpoint mark test --- .../org/apache/beam/sdk/io/jms/JmsIOTest.java | 29 ++++++++++--------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java index fc8fe58391e6..43c050e3dac4 100644 --- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java +++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java @@ -21,8 +21,6 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -47,14 +45,12 @@ import org.apache.activemq.security.AuthenticationUser; import org.apache.activemq.security.SimpleAuthenticationPlugin; import org.apache.activemq.store.memory.MemoryPersistenceAdapter; -import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.values.PCollection; import org.junit.After; import org.junit.Before; @@ -248,11 +244,17 @@ public void testSplitForTopic() throws Exception { @Test public void testCheckpointMark() throws Exception { + // we are using no prefetch here + // prefetch is an ActiveMQ feature: to make efficient use of network resources the broker + // utilizes a 'push' model to dispatch messages to consumers. However, in the case of our + // test, it means that we can have some latency between the receiveNoWait() method used by + // the consumer and the prefetch buffer populated by the broker. Using a prefetch to 0 means + // that the consumer will poll for message, which is exactly what we want for the test. Connection connection = connectionFactoryWithoutPrefetch.createConnection(USERNAME, PASSWORD); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer producer = session.createProducer(session.createQueue(QUEUE)); - for (int i = 0; i < 1000; i++) { + for (int i = 0; i < 10; i++) { producer.send(session.createTextMessage("test " + i)); } producer.close(); @@ -270,27 +272,27 @@ public void testCheckpointMark() throws Exception { // start the reader and move to the first record assertTrue(reader.start()); - // consume 249 messages (as start already consumed the first message) - for (int i = 0; i < 249; i++) { + // consume 3 messages (NB: start already consumed the first message) + for (int i = 0; i < 3; i++) { assertTrue(reader.advance()); } // the messages are still pending in the queue (no ACK yet) - assertEquals(1000, count(QUEUE)); + assertEquals(10, count(QUEUE)); // we finalize the checkpoint reader.getCheckpointMark().finalizeCheckpoint(); // the checkpoint finalize ack the messages, and so they are not pending in the queue anymore - assertEquals(750, count(QUEUE)); + assertEquals(6, count(QUEUE)); - // we read the 750 pending messages - for (int i = 0; i < 750; i++) { + // we read the 6 pending messages + for (int i = 0; i < 6; i++) { assertTrue(reader.advance()); } - // still 750 pending messages as we didn't finalize the checkpoint - assertEquals(750, count(QUEUE)); + // still 6 pending messages as we didn't finalize the checkpoint + assertEquals(6, count(QUEUE)); // we finalize the checkpoint: no more message in the queue reader.getCheckpointMark().finalizeCheckpoint(); @@ -298,7 +300,6 @@ public void testCheckpointMark() throws Exception { assertEquals(0, count(QUEUE)); } - private int count(String queue) throws Exception { Connection connection = connectionFactory.createConnection(USERNAME, PASSWORD); connection.start();