From 5de6cf117ea6b10c0306263041518153ded1a01d Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Jean-Baptiste=20Onofr=C3=A9?= MqttIO source returns an unbounded collection of {@code byte[]} as
+ * {@code PCollection To configure a MQTT source, you have to provide a {@code ClientId}, a {@code ServerURI},
+ * and eventually a {@code Topic} pattern. The following example illustrates various options
+ * for configuring the source: MqttIO sink supports writing {@code byte[]} to a topic on a MQTT broker. To configure a MQTT sink, you must specify the {@code ClientId}, {@code ServerURI},
+ * {@code Topic}. Eventually, you can also specify the {@code Retained} and {@code QoS} of the
+ * message. For instance: MqttIO source returns an unbounded collection of {@code byte[]} as
- * {@code PCollectionReading from a MQTT broker
+ *
+ * {@code
+ *
+ * pipeline.apply(MqttIO.read().withClientId("my_client").withServerUri("tcp://host:11883")
+ * .withTopic("topic")
+ *
+ * }
+ *
+ * Writing to a MQTT broker
+ *
+ * {@code
+ *
+ * pipeline
+ * .apply(...) // provide PCollection
+ */
+public class MqttIO {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(MqttIO.class);
+
+ public static Read read() {
+ return new Read(new UnboundedMqttSource(null, null, null,
+ Long.MAX_VALUE, null));
+ }
+
+ public static Write write() {
+ return new Write(new Write.MqttWriter(null, null, null, 1, false));
+ }
+
+ private MqttIO() {
+ }
+
+ /**
+ * A {@link PTransformReading from a MQTT broker
*
*
To configure a MQTT source, you have to provide a {@code ClientId}, a {@code ServerURI}, - * and eventually a {@code Topic} pattern. The following example illustrates various options - * for configuring the source:
+ *To configure a MQTT source, you have to provide a MQTT connection configuration including + * {@code ClientId}, a {@code ServerURI}, and eventually a {@code Topic} pattern. The following + * example illustrates various options for configuring the source:
* *{@code
*
- * pipeline.apply(MqttIO.read().withClientId("my_client").withServerUri("tcp://host:11883")
- * .withTopic("topic")
+ * pipeline.apply(
+ * MqttIO.read()
+ * .withMqttConnectionConfiguration(MqttIO.MqttConnectionConfiguration.create(
+ * "tcp://host:11883",
+ * "my_client_id",
+ * "my_topic"))
*
* }
*
@@ -73,8 +83,10 @@
*
* MqttIO sink supports writing {@code byte[]} to a topic on a MQTT broker.
* - *To configure a MQTT sink, you must specify the {@code ClientId}, {@code ServerURI}, - * {@code Topic}. Eventually, you can also specify the {@code Retained} and {@code QoS} of the + *
To configure a MQTT sink, as for the read, you have to specify a MQTT connection + * configuration with {@code ClientId}, {@code ServerURI}, {@code Topic}.
+ * + *Eventually, you can also specify the {@code Retained} and {@code QoS} of the MQTT * message.
* *For instance:
@@ -84,9 +96,10 @@ * pipeline * .apply(...) // provide PCollectionMqttIO source returns an unbounded collection of {@code byte[]} as
- * {@code PCollection
MqttIO source returns an unbounded {@link PCollection} containing MQTT message + * payloads (as {@code byte[]}).
* *To configure a MQTT source, you have to provide a MQTT connection configuration including - * {@code ClientId}, a {@code ServerURI}, and eventually a {@code Topic} pattern. The following + * {@code ClientId}, a {@code ServerURI}, and a {@code Topic} pattern. The following * example illustrates various options for configuring the source:
* *{@code
*
* pipeline.apply(
* MqttIO.read()
- * .withMqttConnectionConfiguration(MqttIO.MqttConnectionConfiguration.create(
+ * .withConnectionConfiguration(MqttIO.ConnectionConfiguration.create(
* "tcp://host:11883",
* "my_client_id",
* "my_topic"))
@@ -86,7 +86,7 @@
* To configure a MQTT sink, as for the read, you have to specify a MQTT connection
* configuration with {@code ClientId}, {@code ServerURI}, {@code Topic}.
*
- * Eventually, you can also specify the {@code Retained} and {@code QoS} of the MQTT
+ *
Optionally, you can also specify the {@code Retained} and {@code QoS} of the MQTT
* message.
*
* For instance:
@@ -96,7 +96,7 @@
* pipeline
* .apply(...) // provide PCollection
* .MqttIO.write()
- * .withMqttConnectionConfiguration(MqttIO.MqttConnectionConfiguration.create(
+ * .withConnectionConfiguration(MqttIO.ConnectionConfiguration.create(
* "tcp://host:11883",
* "my_client_id",
* "my_topic"))
@@ -128,18 +128,18 @@ private MqttIO() {
* A POJO describing a MQTT connection.
*/
@AutoValue
- public abstract static class MqttConnectionConfiguration implements Serializable {
+ public abstract static class ConnectionConfiguration implements Serializable {
@Nullable abstract String serverUri();
@Nullable abstract String clientId();
@Nullable abstract String topic();
- public static MqttConnectionConfiguration create(String serverUri, String clientId,
- String topic) {
+ public static ConnectionConfiguration create(String serverUri, String clientId,
+ String topic) {
checkNotNull(serverUri, "serverUri");
checkNotNull(clientId, "clientId");
checkNotNull(topic, "topic");
- return new AutoValue_MqttIO_MqttConnectionConfiguration(serverUri, clientId, topic);
+ return new AutoValue_MqttIO_ConnectionConfiguration(serverUri, clientId, topic);
}
private void populateDisplayData(DisplayData.Builder builder) {
@@ -148,7 +148,7 @@ private void populateDisplayData(DisplayData.Builder builder) {
builder.add(DisplayData.item("topic", topic()));
}
- private MqttClient getClient() throws Exception {
+ private MqttClient getClient() throws MqttException {
MqttClient client = new MqttClient(serverUri(), clientId());
client.connect();
return client;
@@ -162,15 +162,15 @@ private MqttClient getClient() throws Exception {
@AutoValue
public abstract static class Read extends PTransform> {
- @Nullable abstract MqttConnectionConfiguration mqttConnectionConfiguration();
- @Nullable abstract long maxNumRecords();
+ @Nullable abstract ConnectionConfiguration connectionConfiguration();
+ abstract long maxNumRecords();
@Nullable abstract Duration maxReadTime();
abstract Builder toBuilder();
@AutoValue.Builder
abstract static class Builder {
- abstract Builder setMqttConnectionConfiguration(MqttConnectionConfiguration config);
+ abstract Builder setConnectionConfiguration(ConnectionConfiguration config);
abstract Builder setMaxNumRecords(long maxNumRecords);
abstract Builder setMaxReadTime(Duration maxReadTime);
abstract Read build();
@@ -178,24 +178,16 @@ abstract static class Builder {
/**
* Define the MQTT connection configuration used to connect to the MQTT broker.
- *
- * @param configuration A {@link MqttConnectionConfiguration} instance.
- * @return The {@link Read} {@link PTransform} with the corresponding MQTT connection
- * configuration.
*/
- public Read withMqttConnectionConfiguration(MqttConnectionConfiguration configuration) {
- checkNotNull(configuration, "MqttConnectionConfiguration");
- return toBuilder().setMqttConnectionConfiguration(configuration).build();
+ public Read withConnectionConfiguration(ConnectionConfiguration configuration) {
+ checkNotNull(configuration, "ConnectionConfiguration");
+ return toBuilder().setConnectionConfiguration(configuration).build();
}
/**
* Define the max number of records received by the {@link Read}.
- * When this max number of records is lower then {@code Long.MAX_VALUE}, the {@link Read}
+ * When this max number of records is lower than {@code Long.MAX_VALUE}, the {@link Read}
* will provide a bounded {@link PCollection}.
- *
- * @param maxNumRecords The max number of records received.
- * @return The {@link Read} {@link PTransform} with the corresponding max num records
- * configuration.
*/
public Read withMaxNumRecords(long maxNumRecords) {
checkArgument(maxReadTime() == null,
@@ -207,10 +199,6 @@ public Read withMaxNumRecords(long maxNumRecords) {
* Define the max read time (duration) while the {@link Read} will receive messages.
* When this max read time is not null, the {@link Read} will provide a bounded
* {@link PCollection}.
- *
- * @param maxReadTime The max read time duration.
- * @return The {@link Read} {@link PTransform} with the corresponding max read time
- * configuration.
*/
public Read withMaxReadTime(Duration maxReadTime) {
checkArgument(maxNumRecords() == Long.MAX_VALUE,
@@ -237,13 +225,13 @@ public PCollection apply(PBegin input) {
@Override
public void validate(PBegin input) {
- // validation is performed in the MqttConnectionConfiguration create()
+ // validation is performed in the ConnectionConfiguration create()
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
- mqttConnectionConfiguration().populateDisplayData(builder);
+ connectionConfiguration().populateDisplayData(builder);
builder.add(DisplayData.item("maxNumRecords", maxNumRecords()));
builder.addIfNotNull(DisplayData.item("maxReadTime", maxReadTime()));
}
@@ -260,7 +248,7 @@ public UnboundedMqttSource(Read spec) {
}
@Override
- public UnboundedReader createReader(PipelineOptions options,
+ public UnboundedReader createReader(PipelineOptions options,
CheckpointMark checkpointMark) {
return new UnboundedMqttReader(this);
}
@@ -301,8 +289,8 @@ public Coder getDefaultOutputCoder() {
*/
private static class MqttMessageWithTimestamp {
- private MqttMessage message;
- private Instant timestamp;
+ private final MqttMessage message;
+ private final Instant timestamp;
public MqttMessageWithTimestamp(MqttMessage message, Instant timestamp) {
this.message = message;
@@ -330,15 +318,15 @@ public boolean start() throws IOException {
LOGGER.debug("Starting MQTT reader");
Read spec = source.spec;
try {
- client = spec.mqttConnectionConfiguration().getClient();
- client.subscribe(spec.mqttConnectionConfiguration().topic());
+ client = spec.connectionConfiguration().getClient();
+ client.subscribe(spec.connectionConfiguration().topic());
client.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
LOGGER.warn("MQTT connection lost", cause);
try {
close();
- } catch (Exception e) {
+ } catch (IOException e) {
// nothing to do
}
}
@@ -355,7 +343,7 @@ public void deliveryComplete(IMqttDeliveryToken token) {
}
});
return advance();
- } catch (Exception e) {
+ } catch (MqttException e) {
throw new IOException(e);
}
}
@@ -363,17 +351,20 @@ public void deliveryComplete(IMqttDeliveryToken token) {
@Override
public boolean advance() throws IOException {
LOGGER.debug("Taking from the pending queue ({})", queue.size());
+ MqttMessageWithTimestamp message = null;
try {
- MqttMessageWithTimestamp message = queue.poll(5, TimeUnit.SECONDS);
- if (message == null) {
- return false;
- }
- current = message.message.getPayload();
- currentTimestamp = message.timestamp;
- return true;
+ message = queue.poll(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new IOException(e);
}
+ if (message == null) {
+ current = null;
+ currentTimestamp = null;
+ return false;
+ }
+ current = message.message.getPayload();
+ currentTimestamp = message.timestamp;
+ return true;
}
@Override
@@ -443,7 +434,7 @@ public UnboundedMqttSource getCurrentSource() {
@AutoValue
public abstract static class Write extends PTransform, PDone> {
- @Nullable abstract MqttConnectionConfiguration mqttConnectionConfiguration();
+ @Nullable abstract ConnectionConfiguration connectionConfiguration();
@Nullable abstract int qos();
@Nullable abstract boolean retained();
@@ -451,7 +442,7 @@ public abstract static class Write extends PTransform, PDone
@AutoValue.Builder
abstract static class Builder {
- abstract Builder setMqttConnectionConfiguration(MqttConnectionConfiguration configuration);
+ abstract Builder setConnectionConfiguration(ConnectionConfiguration configuration);
abstract Builder setQos(int qos);
abstract Builder setRetained(boolean retained);
abstract Write build();
@@ -459,12 +450,9 @@ abstract static class Builder {
/**
* Define MQTT connection configuration used to connect to the MQTT broker.
- *
- * @param configuration The {@link MqttConnectionConfiguration} instance.
- * @return The {@link Write} {@link PTransform} with the corresponding connection configuration.
*/
- public Write withMqttConnectionConfiguration(MqttConnectionConfiguration configuration) {
- return toBuilder().setMqttConnectionConfiguration(configuration).build();
+ public Write withConnectionConfiguration(ConnectionConfiguration configuration) {
+ return toBuilder().setConnectionConfiguration(configuration).build();
}
/**
@@ -477,33 +465,31 @@ public Write withMqttConnectionConfiguration(MqttConnectionConfiguration configu
* but should only be used for messages which are not valuable - note that
* if the server cannot process the message (for example, there
* is an authorization problem), then an
- * {@link MqttCallback#deliveryComplete(IMqttDeliveryToken)}.
+ * {@link MqttCallback#deliveryComplete(IMqttDeliveryToken)} won't be called.
* Also known as "fire and forget".
*
* Quality of Service 1 - indicates that a message should
* be delivered at least once (one or more times). The message can only be delivered safely if
- * it can be persisted, so the application must supply a means of
- * persistence using {@code MqttConnectOptions}.
- * If a persistence mechanism is not specified, the message will not be
+ * it can be persisted on the broker.
+ * If the broker can't persist the message, the message will not be
* delivered in the event of a client failure.
* The message will be acknowledged across the network.
* This is the default QoS.
*
* Quality of Service 2 - indicates that a message should
- * be delivered once. The message will be persisted to disk, and will
+ * be delivered once. The message will be persisted to disk on the broker, and will
* be subject to a two-phase acknowledgement across the network.
* The message can only be delivered safely if
- * it can be persisted, so the application must supply a means of
- * persistence using {@code MqttConnectOptions}.
- * If a persistence mechanism is not specified, the message will not be
+ * it can be persisted on the broker.
+ * If a persistence mechanism is not specified on the broker, the message will not be
* delivered in the event of a client failure.
*
*
- * If persistence is not configured, QoS 1 and 2 messages will still be delivered
+ * If persistence is not configured, QoS 1 and 2 messages will still be delivered
* in the event of a network or server problem as the client will hold state in memory.
* If the MQTT client is shutdown or fails and persistence is not configured then
* delivery of QoS 1 and 2 messages can not be maintained as client-side state will
- * be lost.
+ * be lost.
*
* @param qos The quality of service value.
* @return The {@link Write} {@link PTransform} with the corresponding QoS configuration.
@@ -532,12 +518,12 @@ public PDone apply(PCollection input) {
@Override
public void validate(PCollection input) {
- // validation is performed in the MqttConnectionConfiguration create()
+ // validation is performed in the ConnectionConfiguration create()
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
- mqttConnectionConfiguration().populateDisplayData(builder);
+ connectionConfiguration().populateDisplayData(builder);
builder.add(DisplayData.item("qos", qos()));
builder.add(DisplayData.item("retained", retained()));
}
@@ -554,7 +540,7 @@ public WriteFn(Write spec) {
@Setup
public void createMqttClient() throws Exception {
- client = spec.mqttConnectionConfiguration().getClient();
+ client = spec.connectionConfiguration().getClient();
}
@ProcessElement
@@ -564,7 +550,7 @@ public void processElement(ProcessContext context) throws Exception {
message.setQos(spec.qos());
message.setRetained(spec.retained());
message.setPayload(payload);
- client.publish(spec.mqttConnectionConfiguration().topic(), message);
+ client.publish(spec.connectionConfiguration().topic(), message);
}
@Teardown
diff --git a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
index 308ba4df6139..e5e36d981b23 100644
--- a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
+++ b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
@@ -69,8 +69,8 @@ public void testRead() throws Exception {
PCollection output = pipeline.apply(
MqttIO.read()
- .withMqttConnectionConfiguration(
- MqttIO.MqttConnectionConfiguration.create(
+ .withConnectionConfiguration(
+ MqttIO.ConnectionConfiguration.create(
"tcp://localhost:11883",
"BEAM_PIPELINE",
"READ_TOPIC"))
@@ -141,8 +141,8 @@ public void testWrite() throws Exception {
// be persisted to disk, and will be subject to a two-phase ack.
pipeline.apply(Create.of(data))
.apply(MqttIO.write()
- .withMqttConnectionConfiguration(
- MqttIO.MqttConnectionConfiguration.create(
+ .withConnectionConfiguration(
+ MqttIO.ConnectionConfiguration.create(
"tcp://localhost:11883",
"BEAM_PIPELINE",
"WRITE_TOPIC"))
From 6483580bbd88346eb32364e1fbbe82971aa7393e Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Jean-Baptiste=20Onofr=C3=A9?=
Date: Thu, 13 Oct 2016 10:41:52 +0200
Subject: [PATCH 08/27] [BEAM-606] Refactore read test to be more reliable
---
.../test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java | 7 ++++---
1 file changed, 4 insertions(+), 3 deletions(-)
diff --git a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
index e5e36d981b23..5c6aef3528cc 100644
--- a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
+++ b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
@@ -81,11 +81,12 @@ public void testRead() throws Exception {
PAssert.that(output).satisfies(new SerializableFunction, Void>() {
@Override
public Void apply(Iterable input) {
- int count = 0;
for (byte[] element : input) {
String inputString = new String(element);
- Assert.assertEquals("This is test " + count, inputString);
- count++;
+ Assert.assertTrue(inputString.startsWith("This is test "));
+ int count = Integer.parseInt(inputString.substring("This is test ".length()));
+ Assert.assertTrue(count < 10);
+ Assert.assertTrue(count >= 0);
}
return null;
}
From 3b443127b810bc7cafc7e246f0dbc2d839eaaeeb Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Jean-Baptiste=20Onofr=C3=A9?=
Date: Fri, 21 Oct 2016 15:50:44 +0200
Subject: [PATCH 09/27] [BEAM-606] Update documentation, change the QoS in the
test
---
.../main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java | 11 +++++------
.../java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java | 2 +-
2 files changed, 6 insertions(+), 7 deletions(-)
diff --git a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
index 614012ad8199..7bf356f2d7a3 100644
--- a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
+++ b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
@@ -279,7 +279,7 @@ public Coder getCheckpointMarkCoder() {
}
@Override
- public Coder getDefaultOutputCoder() {
+ public Coder getDefaultOutputCoder() {
return ByteArrayCoder.of();
}
}
@@ -324,11 +324,6 @@ public boolean start() throws IOException {
@Override
public void connectionLost(Throwable cause) {
LOGGER.warn("MQTT connection lost", cause);
- try {
- close();
- } catch (IOException e) {
- // nothing to do
- }
}
@Override
@@ -491,6 +486,10 @@ public Write withConnectionConfiguration(ConnectionConfiguration configuration)
* delivery of QoS 1 and 2 messages can not be maintained as client-side state will
* be lost.
*
+ * For now, MqttIO fully supports QoS 0 and 1 (delivery at least once). QoS 2 is for now
+ * limited and use with care, as it can result to duplication of message (delivery exactly
+ * once).
+ *
* @param qos The quality of service value.
* @return The {@link Write} {@link PTransform} with the corresponding QoS configuration.
*/
diff --git a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
index 5c6aef3528cc..69f9e559a8eb 100644
--- a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
+++ b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
@@ -107,7 +107,7 @@ public void run() {
client.connect();
for (int i = 0; i < 10; i++) {
MqttMessage message = new MqttMessage();
- message.setQos(2);
+ message.setQos(1);
message.setRetained(true);
message.setPayload(("This is test " + i).getBytes());
client.publish("READ_TOPIC", message);
From eb31df8dfc11e6581135792220f8fe07752f1d0d Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Jean-Baptiste=20Onofr=C3=A9?=
Date: Wed, 2 Nov 2016 07:01:39 +0100
Subject: [PATCH 10/27] [BEAM-606] Start ActiveMQ broker on free network port
in the tests
---
.../apache/beam/sdk/io/mqtt/MqttIOTest.java | 23 +++++++++++++++----
1 file changed, 18 insertions(+), 5 deletions(-)
diff --git a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
index 69f9e559a8eb..891667140475 100644
--- a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
+++ b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.io.mqtt;
import java.io.Serializable;
+import java.net.ServerSocket;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
@@ -44,6 +45,8 @@
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Tests of {@link MqttIO}.
@@ -51,14 +54,24 @@
@RunWith(JUnit4.class)
public class MqttIOTest implements Serializable {
+ private static final Logger LOGGER = LoggerFactory.getLogger(MqttIOTest.class);
+
private static transient BrokerService broker;
+ private static int port;
+
@BeforeClass
public static void startBroker() throws Exception {
+ LOGGER.info("Finding free network port");
+ ServerSocket socket = new ServerSocket(0);
+ port = socket.getLocalPort();
+ socket.close();
+
+ LOGGER.info("Starting ActiveMQ broker on {}", port);
broker = new BrokerService();
broker.setUseJmx(false);
broker.setPersistenceAdapter(new MemoryPersistenceAdapter());
- broker.addConnector(new URI("mqtt://localhost:11883"));
+ broker.addConnector(new URI("mqtt://localhost:" + port));
broker.start();
}
@@ -71,7 +84,7 @@ public void testRead() throws Exception {
MqttIO.read()
.withConnectionConfiguration(
MqttIO.ConnectionConfiguration.create(
- "tcp://localhost:11883",
+ "tcp://localhost:" + port,
"BEAM_PIPELINE",
"READ_TOPIC"))
.withMaxNumRecords(10));
@@ -103,7 +116,7 @@ public void run() {
// nothing to do
}
try {
- MqttClient client = new MqttClient("tcp://localhost:11883", "publisher");
+ MqttClient client = new MqttClient("tcp://localhost:" + port, "publisher");
client.connect();
for (int i = 0; i < 10; i++) {
MqttMessage message = new MqttMessage();
@@ -144,7 +157,7 @@ public void testWrite() throws Exception {
.apply(MqttIO.write()
.withConnectionConfiguration(
MqttIO.ConnectionConfiguration.create(
- "tcp://localhost:11883",
+ "tcp://localhost:" + port,
"BEAM_PIPELINE",
"WRITE_TOPIC"))
.withQoS(2));
@@ -157,7 +170,7 @@ public void testWrite() throws Exception {
}
private MqttClient receive(final List messages) throws MqttException {
- MqttClient client = new MqttClient("tcp://localhost:11883", "receiver");
+ MqttClient client = new MqttClient("tcp://localhost:" + port, "receiver");
MqttCallback callback = new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
From ec93b77c24ca8e57fb3bc242fb07f7719c16054e Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Jean-Baptiste=20Onofr=C3=A9?=
Date: Wed, 2 Nov 2016 10:38:55 +0100
Subject: [PATCH 11/27] [BEAM-606] Fix parent pom version
---
sdks/java/io/mqtt/pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/sdks/java/io/mqtt/pom.xml b/sdks/java/io/mqtt/pom.xml
index 1e383362ade1..a515a00860ae 100644
--- a/sdks/java/io/mqtt/pom.xml
+++ b/sdks/java/io/mqtt/pom.xml
@@ -22,7 +22,7 @@
org.apache.beam
beam-sdks-java-io-parent
- 0.3.0-incubating-SNAPSHOT
+ 0.4.0-incubating-SNAPSHOT
../pom.xml
From 911e98b317f63f54f3f95927299ef6aae0ec8fe2 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Jean-Baptiste=20Onofr=C3=A9?=
Date: Thu, 3 Nov 2016 09:50:00 +0100
Subject: [PATCH 12/27] [BEAM-606] Exclude support of QoS 2, cleanup in javadoc
---
.../org/apache/beam/sdk/io/mqtt/MqttIO.java | 62 +++++++++++++------
.../apache/beam/sdk/io/mqtt/MqttIOTest.java | 7 +--
2 files changed, 44 insertions(+), 25 deletions(-)
diff --git a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
index 7bf356f2d7a3..befde12e3afc 100644
--- a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
+++ b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
@@ -23,6 +23,8 @@
import com.google.auto.value.AutoValue;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
@@ -33,8 +35,10 @@
import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -62,11 +66,11 @@
* Reading from a MQTT broker
*
* MqttIO source returns an unbounded {@link PCollection} containing MQTT message
- * payloads (as {@code byte[]}).
+ * payloads (as {@code byte[]}).
*
* To configure a MQTT source, you have to provide a MQTT connection configuration including
* {@code ClientId}, a {@code ServerURI}, and a {@code Topic} pattern. The following
- * example illustrates various options for configuring the source:
+ * example illustrates various options for configuring the source:
*
* {@code
*
@@ -81,15 +85,15 @@
*
* Writing to a MQTT broker
*
- * MqttIO sink supports writing {@code byte[]} to a topic on a MQTT broker.
+ * MqttIO sink supports writing {@code byte[]} to a topic on a MQTT broker.
*
*
To configure a MQTT sink, as for the read, you have to specify a MQTT connection
- * configuration with {@code ClientId}, {@code ServerURI}, {@code Topic}.
+ * configuration with {@code ClientId}, {@code ServerURI}, {@code Topic}.
*
* Optionally, you can also specify the {@code Retained} and {@code QoS} of the MQTT
- * message.
+ * message.
*
- * For instance:
+ * For instance:
*
*
{@code
*
@@ -100,8 +104,6 @@
* "tcp://host:11883",
* "my_client_id",
* "my_topic"))
- * .withRetained(true)
- * .withQoS(2)
*
* }
*/
@@ -232,7 +234,9 @@ public void validate(PBegin input) {
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
connectionConfiguration().populateDisplayData(builder);
- builder.add(DisplayData.item("maxNumRecords", maxNumRecords()));
+ if (maxNumRecords() != Long.MAX_VALUE) {
+ builder.add(DisplayData.item("maxNumRecords", maxNumRecords()));
+ }
builder.addIfNotNull(DisplayData.item("maxReadTime", maxReadTime()));
}
@@ -274,8 +278,8 @@ public void populateDisplayData(DisplayData.Builder builder) {
}
@Override
- public Coder getCheckpointMarkCoder() {
- return VoidCoder.of();
+ public Coder getCheckpointMarkCoder() {
+ return new CheckpointCoder();
}
@Override
@@ -284,6 +288,25 @@ public Coder getDefaultOutputCoder() {
}
}
+ /**
+ * Checkpoint coder acting as a {@link VoidCoder}.
+ */
+ private static class CheckpointCoder extends AtomicCoder {
+
+ @Override
+ public void encode(UnboundedSource.CheckpointMark value, OutputStream outStream, Context
+ context) throws CoderException, IOException {
+ // nothing to write
+ }
+
+ @Override
+ public UnboundedSource.CheckpointMark decode(InputStream inStream, Context context) throws
+ CoderException, IOException {
+ // nothing to read
+ return null;
+ }
+ }
+
/**
* POJO used to store MQTT message and its timestamp.
*/
@@ -459,16 +482,16 @@ public Write withConnectionConfiguration(ConnectionConfiguration configuration)
* and will not be acknowledged across the network. This QoS is the fastest,
* but should only be used for messages which are not valuable - note that
* if the server cannot process the message (for example, there
- * is an authorization problem), then an
- * {@link MqttCallback#deliveryComplete(IMqttDeliveryToken)} won't be called.
+ * is an authorization problem), then the message will be silently dropped.
* Also known as "fire and forget".
*
* Quality of Service 1 - indicates that a message should
* be delivered at least once (one or more times). The message can only be delivered safely if
* it can be persisted on the broker.
* If the broker can't persist the message, the message will not be
- * delivered in the event of a client failure.
- * The message will be acknowledged across the network.
+ * delivered in the event of a subscriber failure.
+ * The {@link MqttIO.Write} with this QoS guarantee that all messages written to it will be
+ * delivered to subscribers at least once.
* This is the default QoS.
*
* Quality of Service 2 - indicates that a message should
@@ -477,18 +500,19 @@ public Write withConnectionConfiguration(ConnectionConfiguration configuration)
* The message can only be delivered safely if
* it can be persisted on the broker.
* If a persistence mechanism is not specified on the broker, the message will not be
- * delivered in the event of a client failure.
+ * delivered in the event of a client failure.
+ * This QoS is not supported by {@link MqttIO}.
*
*
* If persistence is not configured, QoS 1 and 2 messages will still be delivered
* in the event of a network or server problem as the client will hold state in memory.
* If the MQTT client is shutdown or fails and persistence is not configured then
* delivery of QoS 1 and 2 messages can not be maintained as client-side state will
- * be lost.
+ * be lost.
*
* For now, MqttIO fully supports QoS 0 and 1 (delivery at least once). QoS 2 is for now
* limited and use with care, as it can result to duplication of message (delivery exactly
- * once).
+ * once).
*
* @param qos The quality of service value.
* @return The {@link Write} {@link PTransform} with the corresponding QoS configuration.
@@ -517,7 +541,7 @@ public PDone apply(PCollection input) {
@Override
public void validate(PCollection input) {
- // validation is performed in the ConnectionConfiguration create()
+ checkArgument(qos() != 0 || qos() != 1, "Supported QoS are 0 and 1");
}
@Override
diff --git a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
index 891667140475..72b6a9e7fa5d 100644
--- a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
+++ b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
@@ -120,8 +120,7 @@ public void run() {
client.connect();
for (int i = 0; i < 10; i++) {
MqttMessage message = new MqttMessage();
- message.setQos(1);
- message.setRetained(true);
+ message.setQos(0);
message.setPayload(("This is test " + i).getBytes());
client.publish("READ_TOPIC", message);
}
@@ -149,10 +148,6 @@ public void testWrite() throws Exception {
for (int i = 0; i < 100; i++) {
data.add("Test".getBytes());
}
- // we use QoS 2 here to be sure the subscriber completely receive all messages before
- // shutting down the MQTT broker.
- // Quality of Service 2 indicates that a message should be delivered once. The message will
- // be persisted to disk, and will be subject to a two-phase ack.
pipeline.apply(Create.of(data))
.apply(MqttIO.write()
.withConnectionConfiguration(
From 655b53e6024a5a7cc3efa4c416c484753a936685 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Jean-Baptiste=20Onofr=C3=A9?=
Date: Mon, 14 Nov 2016 13:07:01 +0100
Subject: [PATCH 13/27] [BEAM-606] Add core MQTT test
---
sdks/java/io/mqtt/pom.xml | 2 +-
.../apache/beam/sdk/io/mqtt/MqttIOTest.java | 48 +++++++++++++++++++
2 files changed, 49 insertions(+), 1 deletion(-)
diff --git a/sdks/java/io/mqtt/pom.xml b/sdks/java/io/mqtt/pom.xml
index a515a00860ae..24ff8e6c6b8e 100644
--- a/sdks/java/io/mqtt/pom.xml
+++ b/sdks/java/io/mqtt/pom.xml
@@ -60,7 +60,7 @@
- 1.0.2
+ 1.1.0
5.13.1
diff --git a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
index 72b6a9e7fa5d..a67c9afb75aa 100644
--- a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
+++ b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
@@ -136,6 +136,54 @@ public void run() {
pipeline.run();
}
+ public class MyMqttCallback implements MqttCallback {
+ private String client;
+
+ public MyMqttCallback(String client) {
+ this.client = client;
+ }
+
+ @Override
+ public void connectionLost(Throwable cause) {
+ LOGGER.warn("Connection lost client {}", client, cause);
+ }
+
+ @Override
+ public void messageArrived(String topic, MqttMessage message) throws Exception {
+ LOGGER.info("Message arrived client {} on topic {}: {}", client, topic,
+ new String(message.getPayload()));
+ }
+
+ @Override
+ public void deliveryComplete(IMqttDeliveryToken token) {
+ LOGGER.info("Delivery complete client {}", client);
+ }
+ }
+
+ @Test
+ public void severalSubscriber() throws Exception {
+ MqttClient sub1 = new MqttClient("tcp://localhost:" + port, MqttClient.generateClientId());
+ sub1.setCallback(new MyMqttCallback("sub1"));
+ sub1.connect();
+ sub1.subscribe("test");
+
+ MqttClient sub2 = new MqttClient("tcp://localhost:" + port, MqttClient.generateClientId());
+ sub2.setCallback(new MyMqttCallback("sub2"));
+ sub2.connect();
+ sub2.subscribe("test");
+
+ MqttClient pub = new MqttClient("tcp://localhost:" + port, MqttClient.generateClientId());
+ pub.connect();
+ MqttMessage message = new MqttMessage("foo".getBytes());
+ message.setQos(0);
+ pub.publish("test", message);
+
+ pub.disconnect();
+
+ sub1.disconnect();
+ sub2.disconnect();
+ }
+
@Test
@Category(NeedsRunner.class)
public void testWrite() throws Exception {
From 134cca0868672ac3b5a1594bd0ff1dbc8ce7e0a4 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Jean-Baptiste=20Onofr=C3=A9?=
Date: Tue, 22 Nov 2016 17:58:14 +0100
Subject: [PATCH 14/27] [BEAM-606] Improved checkpoint with manual ack, update
comments about QoS (relationship between publisher and subsribers), auto
generate clientId (reliability)
---
sdks/java/io/mqtt/pom.xml | 10 +-
.../org/apache/beam/sdk/io/mqtt/MqttIO.java | 214 ++++++++++--------
.../apache/beam/sdk/io/mqtt/MqttIOTest.java | 57 +----
3 files changed, 125 insertions(+), 156 deletions(-)
diff --git a/sdks/java/io/mqtt/pom.xml b/sdks/java/io/mqtt/pom.xml
index 24ff8e6c6b8e..5f496a77f24c 100644
--- a/sdks/java/io/mqtt/pom.xml
+++ b/sdks/java/io/mqtt/pom.xml
@@ -85,17 +85,17 @@
guava
-
- com.google.code.findbugs
- annotations
-
-
org.eclipse.paho
org.eclipse.paho.client.mqttv3
${paho.version}
+
+ com.google.code.findbugs
+ jsr305
+
+
com.google.auto.value
diff --git a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
index befde12e3afc..d3c398ffefee 100644
--- a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
+++ b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
@@ -18,15 +18,13 @@
package org.apache.beam.sdk.io.mqtt;
import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
import com.google.auto.value.AutoValue;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.BlockingQueue;
@@ -35,11 +33,10 @@
import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
@@ -78,7 +75,6 @@
* MqttIO.read()
* .withConnectionConfiguration(MqttIO.ConnectionConfiguration.create(
* "tcp://host:11883",
- * "my_client_id",
* "my_topic"))
*
* }
@@ -102,7 +98,6 @@
* .MqttIO.write()
* .withConnectionConfiguration(MqttIO.ConnectionConfiguration.create(
* "tcp://host:11883",
- * "my_client_id",
* "my_topic"))
*
* }
@@ -133,25 +128,25 @@ private MqttIO() {
public abstract static class ConnectionConfiguration implements Serializable {
@Nullable abstract String serverUri();
- @Nullable abstract String clientId();
@Nullable abstract String topic();
- public static ConnectionConfiguration create(String serverUri, String clientId,
- String topic) {
- checkNotNull(serverUri, "serverUri");
- checkNotNull(clientId, "clientId");
- checkNotNull(topic, "topic");
- return new AutoValue_MqttIO_ConnectionConfiguration(serverUri, clientId, topic);
+ public static ConnectionConfiguration create(String serverUri, String topic) {
+ checkArgument(serverUri != null,
+ "MqttIO.ConnectionConfiguration.create(serverUri, topic) called with null "
+ + "serverUri");
+ checkArgument(topic != null,
+ "MqttIO.ConnectionConfiguration.create(serverUri, topic) called with null "
+ + "topic");
+ return new AutoValue_MqttIO_ConnectionConfiguration(serverUri, topic);
}
private void populateDisplayData(DisplayData.Builder builder) {
builder.add(DisplayData.item("serverUri", serverUri()));
- builder.add(DisplayData.item("clientId", clientId()));
builder.add(DisplayData.item("topic", topic()));
}
private MqttClient getClient() throws MqttException {
- MqttClient client = new MqttClient(serverUri(), clientId());
+ MqttClient client = new MqttClient(serverUri(), MqttClient.generateClientId());
client.connect();
return client;
}
@@ -168,7 +163,7 @@ public abstract static class Read extends PTransformIf persistence is not configured, QoS 1 and 2 messages will still be delivered @@ -510,27 +530,25 @@ public Write withConnectionConfiguration(ConnectionConfiguration configuration) * delivery of QoS 1 and 2 messages can not be maintained as client-side state will * be lost. * - *
For now, MqttIO fully supports QoS 0 and 1 (delivery at least once). QoS 2 is for now
- * limited and use with care, as it can result to duplication of message (delivery exactly
- * once).
- *
* @param qos The quality of service value.
* @return The {@link Write} {@link PTransform} with the corresponding QoS configuration.
*/
public Write withQoS(int qos) {
- return toBuilder().setQos(qos).build();
+ return builder().setQos(qos).build();
}
/**
* Whether or not the publish message should be retained by the messaging engine.
* Sending a message with the retained set to {@code false} will clear the
* retained message from the server. The default value is {@code false}.
+ * When a subscriber connects, he gets the latest retained message (else it doesn't get any
+ * existing message, he will have to wait a new incoming message).
*
* @param retained Whether or not the messaging engine should retain the message.
* @return The {@link Write} {@link PTransform} with the corresponding retained configuration.
*/
public Write withRetained(boolean retained) {
- return toBuilder().setRetained(retained).build();
+ return builder().setRetained(retained).build();
}
@Override
@@ -541,7 +559,7 @@ public PDone apply(PCollection To configure a MQTT sink, as for the read, you have to specify a MQTT connection
* configuration with {@code ClientId}, {@code ServerURI}, {@code Topic}.
*
- * Optionally, you can also specify the {@code Retained} and {@code QoS} of the MQTT
- * message.
+ * The MqttIO only fully supports QoS 1 (at least once). It's the only QoS level guaranteed
+ * due to potential retries on bundles.
*
* For instance:
*
@@ -112,7 +112,6 @@ public static Read read() {
public static Write write() {
return new AutoValue_MqttIO_Write.Builder()
.setRetained(false)
- .setQos(0)
.build();
}
@@ -204,7 +203,7 @@ public Read withMaxReadTime(Duration maxReadTime) {
}
@Override
- public PCollection If persistence is not configured, QoS 1 and 2 messages will still be delivered
- * in the event of a network or server problem as the client will hold state in memory.
- * If the MQTT client is shutdown or fails and persistence is not configured then
- * delivery of QoS 1 and 2 messages can not be maintained as client-side state will
- * be lost.
- *
- * @param qos The quality of service value.
- * @return The {@link Write} {@link PTransform} with the corresponding QoS configuration.
- */
- public Write withQoS(int qos) {
- return builder().setQos(qos).build();
- }
-
/**
* Whether or not the publish message should be retained by the messaging engine.
* Sending a message with the retained set to {@code false} will clear the
@@ -572,7 +539,7 @@ public Write withRetained(boolean retained) {
}
@Override
- public PDone apply(PCollection To configure a MQTT source, you have to provide a MQTT connection configuration including
- * {@code ClientId}, a {@code ServerURI}, and a {@code Topic} pattern. The following
+ * {@code ClientId}, a {@code ServerURI}, a {@code Topic} pattern, and optionally {@code
+ * username} and {@code password} to connect to the MQTT broker. The following
* example illustrates various options for configuring the source:
*
* MqttIO sink supports writing {@code byte[]} to a topic on a MQTT broker.
+ * MqttIO sink supports writing {@code byte[]} to a getTopic on a MQTT broker.
*
* To configure a MQTT sink, as for the read, you have to specify a MQTT connection
- * configuration with {@code ClientId}, {@code ServerURI}, {@code Topic}.
+ * configuration with {@code ServerURI}, {@code Topic}, ...
*
* The MqttIO only fully supports QoS 1 (at least once). It's the only QoS level guaranteed
* due to potential retries on bundles.
@@ -124,51 +122,90 @@ private MqttIO() {
@AutoValue
public abstract static class ConnectionConfiguration implements Serializable {
- @Nullable abstract String serverUri();
- @Nullable abstract String topic();
- @Nullable abstract String clientId();
+ @Nullable abstract String getServerUri();
+ @Nullable abstract String getTopic();
+ @Nullable abstract String getClientId();
+ @Nullable abstract String getUsername();
+ @Nullable abstract String getPassword();
+ abstract Builder builder();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setServerUri(String serverUri);
+ abstract Builder setTopic(String topic);
+ abstract Builder setClientId(String clientId);
+ abstract Builder setUsername(String username);
+ abstract Builder setPassword(String password);
+ abstract ConnectionConfiguration build();
+ }
+
+ /**
+ * Describe a connection configuration to the MQTT broker. This method creates an unique random
+ * MQTT client ID.
+ *
+ * @param serverUri The MQTT broker URI.
+ * @param topic The MQTT getTopic pattern.
+ * @return A connection configuration to the MQTT broker.
+ */
public static ConnectionConfiguration create(String serverUri, String topic) {
checkArgument(serverUri != null,
- "MqttIO.ConnectionConfiguration.create(serverUri, topic) called with null "
- + "serverUri");
+ "MqttIO.ConnectionConfiguration.create(getServerUri, getTopic) called with null "
+ + "getServerUri");
checkArgument(topic != null,
- "MqttIO.ConnectionConfiguration.create(serverUri, topic) called with null "
- + "topic");
- return new AutoValue_MqttIO_ConnectionConfiguration(serverUri, topic,
- MqttClient.generateClientId());
+ "MqttIO.ConnectionConfiguration.create(getServerUri, getTopic) called with null "
+ + "getTopic");
+ return new AutoValue_MqttIO_ConnectionConfiguration.Builder().setServerUri(serverUri)
+ .setTopic(topic).build();
}
+ /**
+ * Describe a connection configuration to the MQTT broker.
+ *
+ * @param serverUri The MQTT broker URI.
+ * @param topic The MQTT getTopic pattern.
+ * @param clientId A client ID prefix, used to construct an unique client ID.
+ * @return A connection configuration to the MQTT broker.
+ */
public static ConnectionConfiguration create(String serverUri, String topic, String clientId) {
checkArgument(serverUri != null,
- "MqttIO.ConnectionConfiguration.create(serverUri, topic) called with null "
- + "serverUri");
+ "MqttIO.ConnectionConfiguration.create(getServerUri, getTopic) called with null "
+ + "getServerUri");
checkArgument(topic != null,
- "MqttIO.ConnectionConfiguration.create(serverUri, topic) called with null "
- + "topic");
- checkArgument(clientId != null, "MqttIO.ConnectionConfiguration.create(serverUri, topic, "
- + "clientId) called with null clientId");
- return new AutoValue_MqttIO_ConnectionConfiguration(serverUri, topic, clientId);
+ "MqttIO.ConnectionConfiguration.create(getServerUri, getTopic) called with null "
+ + "getTopic");
+ checkArgument(clientId != null, "MqttIO.ConnectionConfiguration.create(getServerUri,"
+ + "getTopic, getClientId) called with null getClientId");
+ return new AutoValue_MqttIO_ConnectionConfiguration.Builder().setServerUri(serverUri)
+ .setTopic(topic).setClientId(clientId).build();
}
- private void populateDisplayData(DisplayData.Builder builder) {
- builder.add(DisplayData.item("serverUri", serverUri()));
- builder.add(DisplayData.item("topic", topic()));
- builder.add(DisplayData.item("clientId", clientId()));
+ public ConnectionConfiguration withUsername(String username) {
+ return builder().setUsername(username).build();
}
- private MqttClient getClient(boolean random) throws MqttException {
- String id = clientId();
- if (random) {
- id = clientId() + "-" + MqttClient.generateClientId();
- }
- MqttClient client = new MqttClient(serverUri(), id);
- client.connect();
- return client;
+ public ConnectionConfiguration withPassword(String password) {
+ return builder().setPassword(password).build();
}
- private MqttClient getClient() throws MqttException {
- return getClient(false);
+ private void populateDisplayData(DisplayData.Builder builder) {
+ builder.add(DisplayData.item("serverUri", getServerUri()));
+ builder.add(DisplayData.item("topic", getTopic()));
+ builder.addIfNotNull(DisplayData.item("clientId", getClientId()));
+ builder.addIfNotNull(DisplayData.item("username", getUsername()));
+ }
+
+ private MQTT getClient() throws Exception {
+ MQTT client = new MQTT();
+ client.setHost(getServerUri());
+ if (getUsername() != null) {
+ client.setUserName(getUsername());
+ client.setPassword(getPassword());
+ }
+ if (getClientId() != null) {
+ client.setClientId(getClientId() + "-" + System.currentTimeMillis());
+ }
+ return client;
}
}
@@ -265,45 +302,27 @@ public void populateDisplayData(DisplayData.Builder builder) {
*/
private static class MqttCheckpointMark implements UnboundedSource.CheckpointMark {
- private transient MqttClient client;
private Instant oldestMessageTimestamp = Instant.now();
- private static class MessageIdWithQos {
-
- private int id;
- private int qos;
-
- MessageIdWithQos(int id, int qos) {
- this.id = id;
- this.qos = qos;
- }
-
- }
-
- private final List MqttIO sink supports writing {@code byte[]} to a getTopic on a MQTT broker.
+ * MqttIO sink supports writing {@code byte[]} to a topic on a MQTT broker.
*
* To configure a MQTT sink, as for the read, you have to specify a MQTT connection
* configuration with {@code ServerURI}, {@code Topic}, ...
@@ -151,11 +151,11 @@ abstract static class Builder {
*/
public static ConnectionConfiguration create(String serverUri, String topic) {
checkArgument(serverUri != null,
- "MqttIO.ConnectionConfiguration.create(getServerUri, getTopic) called with null "
- + "getServerUri");
+ "MqttIO.ConnectionConfiguration.create(serverUri, topic) called with null "
+ + "serverUri");
checkArgument(topic != null,
- "MqttIO.ConnectionConfiguration.create(getServerUri, getTopic) called with null "
- + "getTopic");
+ "MqttIO.ConnectionConfiguration.create(serverUri, topic) called with null "
+ + "topic");
return new AutoValue_MqttIO_ConnectionConfiguration.Builder().setServerUri(serverUri)
.setTopic(topic).build();
}
@@ -170,13 +170,13 @@ public static ConnectionConfiguration create(String serverUri, String topic) {
*/
public static ConnectionConfiguration create(String serverUri, String topic, String clientId) {
checkArgument(serverUri != null,
- "MqttIO.ConnectionConfiguration.create(getServerUri, getTopic) called with null "
- + "getServerUri");
+ "MqttIO.ConnectionConfiguration.create(serverUri, topic) called with null "
+ + "serverUri");
checkArgument(topic != null,
- "MqttIO.ConnectionConfiguration.create(getServerUri, getTopic) called with null "
- + "getTopic");
- checkArgument(clientId != null, "MqttIO.ConnectionConfiguration.create(getServerUri,"
- + "getTopic, getClientId) called with null getClientId");
+ "MqttIO.ConnectionConfiguration.create(serverUri, topic) called with null "
+ + "topic");
+ checkArgument(clientId != null, "MqttIO.ConnectionConfiguration.create(serverUri,"
+ + "topic, clientId) called with null clientId");
return new AutoValue_MqttIO_ConnectionConfiguration.Builder().setServerUri(serverUri)
.setTopic(topic).setClientId(clientId).build();
}
@@ -362,7 +362,7 @@ public UnboundedReader
- *
- *
- * {@code
@@ -79,10 +77,10 @@
*
* Writing to a MQTT broker
*
- * Writing to a MQTT broker
*
- *