From 522ca570e9556e41f84327adfe8f3fb1c96f47e6 Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Tue, 7 Feb 2017 09:35:35 -0800 Subject: [PATCH 1/2] [BEAM-1384] JmsIO: better errors during start, better testing For BEAM-1384, the test has been failing because the error may be surfaced in either start() or close(), depending on execution path. The underlying bug is that start was implemented so that close might fail (this.connection would be set, but this.connection.close would fail in a bad way). Rewrite start() to fix the invariant needed by close() and also to provide better error messages. Fixup the tests. Unfortunately, expectedException doesn't really support testing causes and nested caused, so the rewrite dropped its use. --- .../org/apache/beam/sdk/io/jms/JmsIO.java | 24 +++++++++--- .../org/apache/beam/sdk/io/jms/JmsIOTest.java | 37 ++++++++++++------- 2 files changed, 42 insertions(+), 19 deletions(-) diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java index 270fe3103ac5..a935b56af419 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java @@ -405,14 +405,26 @@ public boolean start() throws IOException { Read spec = source.spec; ConnectionFactory connectionFactory = spec.getConnectionFactory(); try { + Connection connection; if (spec.getUsername() != null) { - this.connection = + connection = connectionFactory.createConnection(spec.getUsername(), spec.getPassword()); } else { - this.connection = connectionFactory.createConnection(); + connection = connectionFactory.createConnection(); } - this.connection.start(); + connection.start(); + this.connection = connection; + } catch (Exception e) { + throw new IOException("Error connecting to JMS", e); + } + + try { this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } catch (Exception e) { + throw new IOException("Error creating JMS session", e); + } + + try { if (spec.getTopic() != null) { this.consumer = this.session.createConsumer(this.session.createTopic(spec.getTopic())); @@ -420,11 +432,11 @@ public boolean start() throws IOException { this.consumer = this.session.createConsumer(this.session.createQueue(spec.getQueue())); } - - return advance(); } catch (Exception e) { - throw new IOException(e); + throw new IOException("Error creating JMS consumer", e); } + + return advance(); } @Override diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java index a06bba33bec4..d329d78f773a 100644 --- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java +++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java @@ -17,14 +17,20 @@ */ package org.apache.beam.sdk.io.jms; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; +import java.io.IOException; import java.util.ArrayList; import java.util.List; - import javax.jms.Connection; import javax.jms.ConnectionFactory; +import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; @@ -49,7 +55,6 @@ import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -72,9 +77,6 @@ public class JmsIOTest { @Rule public final transient TestPipeline pipeline = TestPipeline.create(); - @Rule - public ExpectedException expectedException = ExpectedException.none(); - @Before public void startBroker() throws Exception { broker = new BrokerService(); @@ -105,26 +107,34 @@ public void stopBroker() throws Exception { broker.stop(); } + private void runPipelineExpectingJmsConnectException(String innerMessage) { + try { + pipeline.run(); + fail(); + } catch (Exception e) { + Throwable cause = e.getCause(); + assertThat(cause, instanceOf(IOException.class)); + assertThat(cause.getMessage(), equalTo("Error connecting to JMS")); + Throwable innerCause = cause.getCause(); + assertThat(innerCause, instanceOf(JMSException.class)); + assertThat(innerCause.getMessage(), containsString(innerMessage)); + } + } + @Test @Category(NeedsRunner.class) public void testAuthenticationRequired() { - expectedException.expect(Exception.class); - expectedException.expectMessage("User name [null] or password is invalid."); - pipeline.apply( JmsIO.read() .withConnectionFactory(connectionFactory) .withQueue(QUEUE)); - pipeline.run(); + runPipelineExpectingJmsConnectException("User name [null] or password is invalid."); } @Test @Category(NeedsRunner.class) public void testAuthenticationWithBadPassword() { - expectedException.expect(Exception.class); - expectedException.expectMessage("User name [" + USERNAME + "] or password is invalid."); - pipeline.apply( JmsIO.read() .withConnectionFactory(connectionFactory) @@ -132,7 +142,8 @@ public void testAuthenticationWithBadPassword() { .withUsername(USERNAME) .withPassword("BAD")); - pipeline.run(); + runPipelineExpectingJmsConnectException( + "User name [\" + USERNAME + \"] or password is invalid."); } @Test From 222de270cc4df27b13f5247611f066d095d5f4b8 Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Tue, 7 Feb 2017 10:27:19 -0800 Subject: [PATCH 2/2] fixup intellij is too smart for me. --- .../jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java index d329d78f773a..f07247d578fa 100644 --- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java +++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java @@ -143,7 +143,7 @@ public void testAuthenticationWithBadPassword() { .withPassword("BAD")); runPipelineExpectingJmsConnectException( - "User name [\" + USERNAME + \"] or password is invalid."); + "User name [" + USERNAME + "] or password is invalid."); } @Test