From 1b946e2d793bb673b4b8792e45a4f3c73e2a3210 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Baptiste=20Onofr=C3=A9?= Date: Fri, 6 Jan 2017 17:50:03 +0100 Subject: [PATCH 1/4] [BEAM-837] Add authentication support in JmsIO --- .../org/apache/beam/sdk/io/jms/JmsIO.java | 66 +++++++++++++++++-- 1 file changed, 60 insertions(+), 6 deletions(-) diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java index b6de26a0c2ae..b8239a17dda1 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java @@ -129,6 +129,8 @@ public abstract static class Read extends PTransform, PDone @Nullable abstract ConnectionFactory getConnectionFactory(); @Nullable abstract String getQueue(); @Nullable abstract String getTopic(); + @Nullable abstract String getUsername(); + @Nullable abstract String getPassword(); abstract Builder builder(); @@ -503,6 +532,8 @@ abstract static class Builder { abstract Builder setConnectionFactory(ConnectionFactory connectionFactory); abstract Builder setQueue(String queue); abstract Builder setTopic(String topic); + abstract Builder setUsername(String username); + abstract Builder setPassword(String password); abstract Write build(); } @@ -572,6 +603,23 @@ public Write withTopic(String topic) { return builder().setTopic(topic).build(); } + /** + * Define the username to connect to the JMS broker (authenticated). + */ + public Write withUsername(String username) { + checkArgument(username != null, "JmsIO.write().withUsername(username) called with null " + + "username"); + return builder().setUsername(username).build(); + } + + /** + * Define the password to connect to the JMS broker (authenticated). + */ + public Write withPassword(String password) { + // password can be null + return builder().setPassword(password).build(); + } + @Override public PDone expand(PCollection input) { input.apply(ParDo.of(new WriterFn(this))); @@ -601,7 +649,13 @@ public WriterFn(Write spec) { @StartBundle public void startBundle(Context c) throws Exception { if (producer == null) { - this.connection = spec.getConnectionFactory().createConnection(); + if (spec.getUsername() != null) { + this.connection = + spec.getConnectionFactory() + .createConnection(spec.getUsername(), spec.getPassword()); + } else { + this.connection = spec.getConnectionFactory().createConnection(); + } this.connection.start(); // false means we don't use JMS transaction. this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); From f8231f0ada5e7b6f96e07a59d3158838930c6579 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Baptiste=20Onofr=C3=A9?= Date: Sun, 29 Jan 2017 08:00:50 +0100 Subject: [PATCH 2/4] [BEAM-837] Change and add tests for JMS authentication --- sdks/java/io/jms/pom.xml | 6 +++ .../org/apache/beam/sdk/io/jms/JmsIOTest.java | 48 +++++++++++++++++-- 2 files changed, 49 insertions(+), 5 deletions(-) diff --git a/sdks/java/io/jms/pom.xml b/sdks/java/io/jms/pom.xml index 80d1f6c23b5e..eacde7698243 100644 --- a/sdks/java/io/jms/pom.xml +++ b/sdks/java/io/jms/pom.xml @@ -95,6 +95,12 @@ ${activemq.version} test + + org.apache.activemq + activemq-jaas + ${activemq.version} + test + org.apache.activemq activemq-kahadb-store diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java index 7259ce88b08d..1158fe997053 100644 --- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java +++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java @@ -18,6 +18,8 @@ package org.apache.beam.sdk.io.jms; import java.util.ArrayList; +import java.util.List; + import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Message; @@ -26,7 +28,10 @@ import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerPlugin; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.security.AuthenticationUser; +import org.apache.activemq.security.SimpleAuthenticationPlugin; import org.apache.activemq.store.memory.MemoryPersistenceAdapter; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; @@ -40,6 +45,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -49,7 +55,6 @@ @RunWith(JUnit4.class) public class JmsIOTest { - private static final String BROKER_URL = "vm://localhost"; private BrokerService broker; @@ -58,6 +63,9 @@ public class JmsIOTest { @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + @Rule + public ExpectedException expectedException = ExpectedException.none(); + @Before public void startBroker() throws Exception { broker = new BrokerService(); @@ -65,6 +73,16 @@ public void startBroker() throws Exception { broker.setPersistenceAdapter(new MemoryPersistenceAdapter()); broker.addConnector(BROKER_URL); broker.setBrokerName("localhost"); + broker.setPopulateJMSXUserID(true); + broker.setUseAuthenticatedPrincipalForJMSXUserID(true); + + // enable authentication + List users = new ArrayList<>(); + users.add(new AuthenticationUser("test", "test", "users,admins")); + SimpleAuthenticationPlugin plugin = new SimpleAuthenticationPlugin(users); + BrokerPlugin[] plugins = new BrokerPlugin[]{ plugin }; + broker.setPlugins(plugins); + broker.start(); // create JMS connection factory @@ -76,12 +94,26 @@ public void stopBroker() throws Exception { broker.stop(); } + @Test + @Category(NeedsRunner.class) + public void testAuthenticationRequired() { + expectedException.expect(Exception.class); + expectedException.expectMessage("JMSSecurityException"); + + pipeline.apply( + JmsIO.read() + .withConnectionFactory(connectionFactory) + .withQueue("test")); + + pipeline.run(); + } + @Test @Category(NeedsRunner.class) public void testReadMessages() throws Exception { // produce message - Connection connection = connectionFactory.createConnection(); + Connection connection = connectionFactory.createConnection("test", "test"); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer producer = session.createProducer(session.createQueue("test")); TextMessage message = session.createTextMessage("This Is A Test"); @@ -100,6 +132,8 @@ public void testReadMessages() throws Exception { JmsIO.read() .withConnectionFactory(connectionFactory) .withQueue("test") + .withUsername("test") + .withPassword("test") .withMaxNumRecords(5)); PAssert @@ -107,7 +141,7 @@ public void testReadMessages() throws Exception { .isEqualTo(new Long(5)); pipeline.run(); - connection = connectionFactory.createConnection(); + connection = connectionFactory.createConnection("test", "test"); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = session.createConsumer(session.createQueue("test")); Message msg = consumer.receiveNoWait(); @@ -123,11 +157,15 @@ public void testWriteMessage() throws Exception { data.add("Message " + i); } pipeline.apply(Create.of(data)) - .apply(JmsIO.write().withConnectionFactory(connectionFactory).withQueue("test")); + .apply(JmsIO.write() + .withConnectionFactory(connectionFactory) + .withQueue("test") + .withUsername("test") + .withPassword("test")); pipeline.run(); - Connection connection = connectionFactory.createConnection(); + Connection connection = connectionFactory.createConnection("test", "test"); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = session.createConsumer(session.createQueue("test")); From 14a2c7bab2bd27d4bf25b4803dfe97a370173b2a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Baptiste=20Onofr=C3=A9?= Date: Wed, 1 Feb 2017 17:24:47 +0100 Subject: [PATCH 3/4] [BEAM-837] Set password as non null. Improve tests. --- .../org/apache/beam/sdk/io/jms/JmsIO.java | 6 ++- .../org/apache/beam/sdk/io/jms/JmsIOTest.java | 52 +++++++++++++------ 2 files changed, 41 insertions(+), 17 deletions(-) diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java index b8239a17dda1..fd22c084d152 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java @@ -227,7 +227,8 @@ public Read withUsername(String username) { * Define the password to connect to the JMS broker (authenticated). */ public Read withPassword(String password) { - // password can be null + checkArgument(password != null, "JmsIO.read().withPassword(password) called with null " + + "password"); return builder().setPassword(password).build(); } @@ -616,7 +617,8 @@ public Write withUsername(String username) { * Define the password to connect to the JMS broker (authenticated). */ public Write withPassword(String password) { - // password can be null + checkArgument(password != null, "JmsIO.write().withPasswprd(password) called with null " + + "password"); return builder().setPassword(password).build(); } diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java index 1158fe997053..c756cd019210 100644 --- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java +++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java @@ -57,6 +57,10 @@ public class JmsIOTest { private static final String BROKER_URL = "vm://localhost"; + private static final String USERNAME = "test_user"; + private static final String PASSWORD = "test_password"; + private static final String QUEUE = "test_queue"; + private BrokerService broker; private ConnectionFactory connectionFactory; @@ -78,7 +82,9 @@ public void startBroker() throws Exception { // enable authentication List users = new ArrayList<>(); - users.add(new AuthenticationUser("test", "test", "users,admins")); + // username and password to use to connect to the broker. + // This user has users privilege (able to browse, consume, produce, list destinations) + users.add(new AuthenticationUser(USERNAME, PASSWORD, "users")); SimpleAuthenticationPlugin plugin = new SimpleAuthenticationPlugin(users); BrokerPlugin[] plugins = new BrokerPlugin[]{ plugin }; broker.setPlugins(plugins); @@ -98,12 +104,28 @@ public void stopBroker() throws Exception { @Category(NeedsRunner.class) public void testAuthenticationRequired() { expectedException.expect(Exception.class); - expectedException.expectMessage("JMSSecurityException"); + expectedException.expectMessage("User name [null] or password is invalid."); + + pipeline.apply( + JmsIO.read() + .withConnectionFactory(connectionFactory) + .withQueue(QUEUE)); + + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testAuthenticationWithBadPassword() { + expectedException.expect(Exception.class); + expectedException.expectMessage("User name [" + USERNAME + "] or password is invalid."); pipeline.apply( JmsIO.read() .withConnectionFactory(connectionFactory) - .withQueue("test")); + .withQueue(QUEUE) + .withUsername(USERNAME) + .withPassword("BAD")); pipeline.run(); } @@ -113,9 +135,9 @@ public void testAuthenticationRequired() { public void testReadMessages() throws Exception { // produce message - Connection connection = connectionFactory.createConnection("test", "test"); + Connection connection = connectionFactory.createConnection(USERNAME, PASSWORD); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(session.createQueue("test")); + MessageProducer producer = session.createProducer(session.createQueue(QUEUE)); TextMessage message = session.createTextMessage("This Is A Test"); producer.send(message); producer.send(message); @@ -131,9 +153,9 @@ public void testReadMessages() throws Exception { PCollection output = pipeline.apply( JmsIO.read() .withConnectionFactory(connectionFactory) - .withQueue("test") - .withUsername("test") - .withPassword("test") + .withQueue(QUEUE) + .withUsername(USERNAME) + .withPassword(PASSWORD) .withMaxNumRecords(5)); PAssert @@ -141,9 +163,9 @@ public void testReadMessages() throws Exception { .isEqualTo(new Long(5)); pipeline.run(); - connection = connectionFactory.createConnection("test", "test"); + connection = connectionFactory.createConnection(USERNAME, PASSWORD); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createConsumer(session.createQueue("test")); + MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE)); Message msg = consumer.receiveNoWait(); Assert.assertNull(msg); } @@ -159,16 +181,16 @@ public void testWriteMessage() throws Exception { pipeline.apply(Create.of(data)) .apply(JmsIO.write() .withConnectionFactory(connectionFactory) - .withQueue("test") - .withUsername("test") - .withPassword("test")); + .withQueue(QUEUE) + .withUsername(USERNAME) + .withPassword(PASSWORD)); pipeline.run(); - Connection connection = connectionFactory.createConnection("test", "test"); + Connection connection = connectionFactory.createConnection(USERNAME, PASSWORD); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createConsumer(session.createQueue("test")); + MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE)); int count = 0; while (consumer.receive(1000) != null) { count++; From ab5b3a6d6faf84b03b0056f2aa3dd397bfd6af0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Baptiste=20Onofr=C3=A9?= Date: Wed, 1 Feb 2017 17:30:07 +0100 Subject: [PATCH 4/4] [BEAM-837] Fix typo in checkArgument message --- .../io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java index fd22c084d152..c1f1cb4b9039 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java @@ -617,7 +617,7 @@ public Write withUsername(String username) { * Define the password to connect to the JMS broker (authenticated). */ public Write withPassword(String password) { - checkArgument(password != null, "JmsIO.write().withPasswprd(password) called with null " + checkArgument(password != null, "JmsIO.write().withPassword(password) called with null " + "password"); return builder().setPassword(password).build(); }