From b3bd69187f4a920dc69fc8171a9b7ebf5563604e Mon Sep 17 00:00:00 2001 From: Borisa Zivkovic Date: Fri, 10 Mar 2017 12:20:13 +0000 Subject: [PATCH] fixed problem when client id is not set --- .../org/apache/beam/sdk/io/mqtt/MqttIO.java | 4 ++ .../apache/beam/sdk/io/mqtt/MqttIOTest.java | 61 +++++++++++++++++++ 2 files changed, 65 insertions(+) diff --git a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java index 26234cff2878..46f2dcc308ad 100644 --- a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java +++ b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java @@ -209,6 +209,10 @@ private MQTT createClient() throws Exception { String clientId = getClientId() + "-" + UUID.randomUUID().toString(); LOG.debug("MQTT client id set to {}", clientId); client.setClientId(clientId); + } else { + String clientId = UUID.randomUUID().toString(); + LOG.debug("MQTT client id set to random value {}", clientId); + client.setClientId(clientId); } return client; } diff --git a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java index 8a82f4080b40..28ca5f7e1ccf 100644 --- a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java +++ b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java @@ -27,6 +27,7 @@ import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.Connection; +import org.apache.beam.sdk.io.mqtt.MqttIO.Read; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; @@ -77,6 +78,66 @@ public void startBroker() throws Exception { brokerService.waitUntilStarted(); } + @Test(timeout = 60 * 1000) + @Category(RunnableOnService.class) + public void testReadNoClientId() throws Exception { + final String topicName = "READ_TOPIC_NO_CLIENT_ID"; + Read mqttReader = MqttIO.read() + .withConnectionConfiguration( + MqttIO.ConnectionConfiguration.create( + "tcp://localhost:" + port, + topicName)) + .withMaxNumRecords(10); + PCollection output = pipeline.apply(mqttReader); + PAssert.that(output).containsInAnyOrder( + "This is test 0".getBytes(), + "This is test 1".getBytes(), + "This is test 2".getBytes(), + "This is test 3".getBytes(), + "This is test 4".getBytes(), + "This is test 5".getBytes(), + "This is test 6".getBytes(), + "This is test 7".getBytes(), + "This is test 8".getBytes(), + "This is test 9".getBytes() + ); + + // produce messages on the brokerService in another thread + // This thread prevents to block the pipeline waiting for new messages + MQTT client = new MQTT(); + client.setHost("tcp://localhost:" + port); + final BlockingConnection publishConnection = client.blockingConnection(); + publishConnection.connect(); + Thread publisherThread = new Thread() { + public void run() { + try { + LOG.info("Waiting pipeline connected to the MQTT broker before sending " + + "messages ..."); + boolean pipelineConnected = false; + while (!pipelineConnected) { + Thread.sleep(1000); + for (Connection connection : brokerService.getBroker().getClients()) { + if (!connection.getConnectionId().isEmpty()) { + pipelineConnected = true; + } + } + } + for (int i = 0; i < 10; i++) { + publishConnection.publish(topicName, ("This is test " + i).getBytes(), + QoS.AT_LEAST_ONCE, false); + } + } catch (Exception e) { + // nothing to do + } + } + }; + publisherThread.start(); + pipeline.run(); + + publishConnection.disconnect(); + publisherThread.join(); + } + @Test(timeout = 60 * 1000) @Category(RunnableOnService.class) public void testRead() throws Exception {