Skip to content

[BEAM-606] Create MqttIO#971

Closed
jbonofre wants to merge 27 commits intoapache:masterfrom
jbonofre:BEAM-606-MQTTIO
Closed

[BEAM-606] Create MqttIO#971
jbonofre wants to merge 27 commits intoapache:masterfrom
jbonofre:BEAM-606-MQTTIO

Conversation

@jbonofre
Copy link
Copy Markdown
Member

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

  • Make sure the PR title is formatted like:
    [BEAM-<Jira issue #>] Description of pull request
  • Make sure tests pass via mvn clean verify. (Even better, enable
    Travis-CI on your fork and ensure the whole test matrix passes).
  • Replace <Jira issue #> in the title with the actual Jira issue
    number, if there is one.
  • If this contribution is large, please file an Apache
    Individual Contributor License Agreement.

@jbonofre
Copy link
Copy Markdown
Member Author

R: @jkff
R: @dhalperi

org.apache.beam.sdk.io.Read.Unbounded<byte[]> unbounded =
org.apache.beam.sdk.io.Read.from(getSource());

PTransform<PBegin, PCollection<byte[]>> transform = unbounded;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This snippet is repeating in all unbounded sources we have. What do you think about deduplicating it? E.g. making .withMaxNumRecords() special-case Long.MAX_VALUE, and making withMaxReadTime special-case Duration.millis(Long.MAX_VALUE) to do exactly this, so that you can say:

return input.getPipeline().apply(Read.from(getSource()).withMaxNumRecords(maxNumRecords).withMaxReadTime(maxReadTime));

And one more thing: maxNumRecords and maxReadTime are not part of the source - they're not used in the code of the source; they're actually configuration of the transform on top of the source.

public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);

source.populateDisplayData(builder);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like UnboundedMqttSource doesn't implement populateDisplayData.


}

private static class UnboundedMqttReader extends UnboundedSource.UnboundedReader
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't use raw types.


private UnboundedMqttSource source;

private MqttCheckpointMark checkpointMark;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This field is effectively unused; the checkpoint mark is passed around but it doesn't do anything, in particular, it does not affect what data is being read; I suppose because the MQTT protocol actually doesn't support checkpointing and resuming. To better express that, why not just use Void as the checkpoint type, and get rid of MqttCheckpointMark.

public void close() throws IOException {
try {
if (client != null) {
client.disconnect();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can disconnect() throw an exception? Is it an issue that in that case you don't close the client?

}

@Override
public byte[] getCurrent() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Throw new NSEE if it's unavailable.

}

@Override
public void messageArrived(String topic, MqttMessage message) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the thread safety of this? Can it be called from multiple threads?

private final int qos;
private final boolean retained;

private MqttClient client;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be transient.

Copy link
Copy Markdown
Contributor

@jkff jkff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

@jbonofre
Copy link
Copy Markdown
Member Author

I did a mistake on the commit I wanted to push. I'm fixing the issues. Sorry about that.

@jbonofre
Copy link
Copy Markdown
Member Author

jbonofre commented Oct 3, 2016

Updated.

@jbonofre
Copy link
Copy Markdown
Member Author

jbonofre commented Oct 3, 2016

The Jenkins failure is not related to MQTT (it failed on the WordCount example).

@jbonofre
Copy link
Copy Markdown
Member Author

jbonofre commented Oct 3, 2016

Rebased to see if it helps to fix the WordCountIT failure (investigating in the mean time).

public Coder getDefaultOutputCoder() {
return SerializableCoder.of(byte[].class);
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clean up blank lines like these too. Generally they are a good idea between members, but not before the first member or after last.


private final UnboundedMqttSource source;

private transient MqttClient client;
Copy link
Copy Markdown
Contributor

@jkff jkff Oct 3, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unnecessary to make these transient, readers are not serialized.

}
});
return advance();
} catch (Exception e) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What other exception is there to be thrown here, other than IOException? seems like you can remove the try/catch.

Copy link
Copy Markdown
Contributor

@jkff jkff Oct 11, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please address this comment. I suppose you mean MqttException - change this to catch(MqttException).

KV<MqttMessage, Instant> message = queue.take();
current = message.getKey().getPayload();
watermark = message.getValue();
if (current == null) {
Copy link
Copy Markdown
Contributor

@jkff jkff Oct 3, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When could this be null?


@Override
public boolean advance() throws IOException {
LOGGER.info("Taking from the pending queue ({})", queue.size());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logging at INFO once per message is too verbose. Imagine what this will do with tens of thousands of messages per second.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

MqttClient client = new MqttClient(serverUri(), clientId());
client.connect();
if (topic() != null) {
client.subscribe(topic());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this part make sense for the writer too?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's only for the Read. I remove this part from here.

client.connect();
if (topic() != null) {
client.subscribe(topic());
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If topic is null, then we don't subscribe at all? What does it mean to have a null topic?

client.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
LOGGER.warn("MQTT connection lost", cause);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What should the reader do in this case?


@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// nothing to do
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this possibly be called when reading?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, only when publishing (when async).

public boolean advance() throws IOException {
LOGGER.info("Taking from the pending queue ({})", queue.size());
try {
KV<MqttMessage, Instant> message = queue.take();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this block forever if no messages are arriving?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, according to the javadoc, take() will wait if the blocking queue is empty.

* }</pre>
*/
public class MqttIO {

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not my IDE, it's me actually, I like to have "aerated" code (my feeling ;)).

Copy link
Copy Markdown
Contributor

@jkff jkff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, much better!

return new AutoValue_MqttIO_MqttConnectionConfiguration(serverUri, clientId, topic);
}

private void validate() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this redundant with the validation happening in create()? - create() should not allow to create an invalid object.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I remove the validate() method and do the validation only in the create() method.

return toBuilder().setMqttConnectionConfiguration(configuration).build();
}

public Read withMaxNumRecords(long maxNumRecords) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either disallow setting both, or specify which takes priority between maxNumRecords and maxReadTime.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, maxNumRecords and maxReadTime should be exclusive.

private static class UnboundedMqttSource
extends UnboundedSource<byte[], UnboundedSource.CheckpointMark> {

private Read spec;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

PipelineOptions options) {
List<UnboundedMqttSource> sources = new ArrayList<>();
for (int i = 0; i < desiredNumSplits; i++) {
// NB: it's important that user understand the impact of MQTT message QoS
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment seems out of place here - at least, I'm not sure how to interpret it. Is it intended to be read by the user? If yes, then it should be somewhere in a prominent place in the javadoc where a user is likely to notice it when getting started with this class.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, it was more in the tests implementation. It makes sense to remove this comment (useless).


@Override
public Coder getDefaultOutputCoder() {
return SerializableCoder.of(byte[].class);
Copy link
Copy Markdown
Contributor

@jkff jkff Oct 7, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use ByteArrayCoder instead. SerializableCoder is very inefficient.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Done.

.isEqualTo(10L);
PAssert.that(output).satisfies(new SerializableFunction<Iterable<byte[]>, Void>() {

int count = 0;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be a local variable?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed. Changing that.

});

// produce messages on the broker in another thread
// This thread prevents to block the pipeline waiting for new messages
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you make advance() use a take() with a timeout, will this be unnecessary?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's a different think: it's to have publish and consume in the same time.

Thread thread = new Thread() {
public void run() {
try {
// gives time to the pipeline to start
Copy link
Copy Markdown
Contributor

@jkff jkff Oct 7, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explain why do we need to wait for the pipeline to start? Will MQTT drop messages if nobody's subscribed?

In general, I very strongly suggest to avoid timing-dependent tests. It always seems like you can choose a "large enough" delay, but I've again and again found that this is not the case, and environmental flakiness sooner or later causes the test to fail (maybe 1 in 1000 times - but at Beam's scale in terms of number of tests and number of test runs (e.g. pre-commits), pretty soon you have enough tests with this level of flakiness and pretty soon the overall flakiness grows unacceptable) and you have to rewrite the test. Is it possible to avoid this?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, MQTT broker will delete the messages if there are no subscribers on the topic. That's why I created another thread.
I introduced a startup timeout to give time to the test pipeline to start.
I don't see an easy way to improve that. Let me try changing retained and QoS if I can avoid the client thread.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you changed retained and QoS - can you avoid the client thread now?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the purpose of QoS and retained change was to avoid the client thread.

I tried to remove the client thread. Unfortunately, I don't know why yet, but the test seems to be stuck. Debugging didn't help for now. I will try to ping you to investigate together if you don't mind.
By the way, I have a similar code in another project (Apache Karaf Decanter) where it works fine. So, I suspect kind of race condition or thread deadlock with the pipeline and the direct runner.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's investigate together. I'm not comfortable submitting an IO connector where we don't understand why its tests work or don't work.

.withQoS(2));
pipeline.run();

Assert.assertEquals(100, messages.size());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this guaranteed to be 100 at this point? Is this thread-safe / race-free? Is it possible that the pipeline already finished, but your receiver has not yet received all the messages?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's guarantee thanks to QoS 2. With this QoS, the pipeline write will wait the ack from the receiver.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment about this. This depends on 2 things:

  1. QoS 2 blocks the writer until a receiver sends an ack,
  2. MqttClient sends an ack only after messageArrived() has successfully completed.
    This was not obvious to me until I read javadocs of MqttClient.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a comment before the pipeline.apply. Does it look good to you ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it's not sufficient. I'll wait for your reply to my comments on the documentation of different QoS in the main java file.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QoS 2 makes no guarantees on when the message will be delivered to the subscriber - it synchronizes the publisher against broker, and synchronized broker against subscriber, but broker is still allowed to buffer messages. By the time you do this assertEquals(), the message may still be buffered on the broker and not yet delivered to the subscriber, so you may get something less than 100 here (including getting 0).


@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
synchronized (messages) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nobody else is synchronizing on "messages", so this method is synchronizing against itself, which is pointless because it is invoked sequentially anyway. Which thread safety guarantee is this synchronized block trying to accomplish?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, the synchronized is not required and actually, it's not a problem as messages list is accessed only by the receiver. Again, thanks to the QoS 2, we have a single access to the messages list.

Copy link
Copy Markdown
Contributor

@jkff jkff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I think the semantics is mostly correct now. Most of my remaining comments are cosmetic.

*
* <h3>Reading from a MQTT broker</h3>
*
* <p>MqttIO source returns an unbounded collection of {@code byte[]} as
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MqttIO returns an unbounded {@code PCollection<byte[]>} containing message payloads.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

* {@code PCollection<byte[]>}, where {@code byte[]} is the MQTT message payload.</p>
*
* <p>To configure a MQTT source, you have to provide a MQTT connection configuration including
* {@code ClientId}, a {@code ServerURI}, and eventually a {@code Topic} pattern. The following
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what "eventually" is supposed to mean here? Did you mean "optionally", "finally", or something else? I think the sentence will make as much sense if you drop the word.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed eventually (I had in mind "optionaly" but the topic is not optional actually).

*
* pipeline.apply(
* MqttIO.read()
* .withMqttConnectionConfiguration(MqttIO.MqttConnectionConfiguration.create(
Copy link
Copy Markdown
Contributor

@jkff jkff Oct 11, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rename this to withConnectionConfiguration to have fewer repetitions of the word "Mqtt" in this line. Same for write.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

* <p>To configure a MQTT sink, as for the read, you have to specify a MQTT connection
* configuration with {@code ClientId}, {@code ServerURI}, {@code Topic}.</p>
*
* <p>Eventually, you can also specify the {@code Retained} and {@code QoS} of the MQTT
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, "eventually" seems out of place here.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, it's actually "optionally"

public abstract static class Read extends PTransform<PBegin, PCollection<byte[]>> {

@Nullable abstract MqttConnectionConfiguration mqttConnectionConfiguration();
@Nullable abstract long maxNumRecords();
Copy link
Copy Markdown
Contributor

@jkff jkff Oct 11, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maxNumRecords is not nullable, it's a long. (ideally this would be an error caught by checkstyle)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

* persistence using {@code MqttConnectOptions}.
* If a persistence mechanism is not specified, the message will not be
* delivered in the event of a client failure.
* The message will be acknowledged across the network.
Copy link
Copy Markdown
Contributor

@jkff jkff Oct 11, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What exactly does this mean: does it mean that writing a message will wait for a subscriber to acknowledge it? How long will it wait? What if there's currently no subscriber, or if the subscriber forgets to acknowledge? Do the answers to this depend on whether or not the message is persisted? (I suppose if it's persisted by the broker, then the writer doesn't need to wait for a subscriber ack because it's now the broker's responsibility to ensure it's delivered? But I'm not familiar enough with Mqtt)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, it's on the broker side. The broker will wait a subscriber ack (depending of the QoS) before removing the message from the persistence store.

*
* <li>Quality of Service 2 - indicates that a message should
* be delivered once. The message will be persisted to disk, and will
* be subject to a two-phase acknowledgement across the network.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the javadoc.

Thread thread = new Thread() {
public void run() {
try {
// gives time to the pipeline to start
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you changed retained and QoS - can you avoid the client thread now?

}
}
};
thread.start();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to also join the thread, so that it doesn't interfere with other test methods.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by "join" ? Using a ExecutorService ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean thread.join().

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to change from thread.start() to thread.join(), and then, I have the "blocked thread" issue. It seems that something in the pipeline or the direct runner impact my thread (maybe a ExecutorService).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to thread.start() to start the thread, and then thread.join() to join it (i.e. wait for it to complete). Sounds like you called join() without calling start(), which deadlocked because the thread wasn't started.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's what I did (start and then join), but the thread is blocked.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot what was the conclusion here?

Here's what I had in mind:

thread.start();
pipeline.run();
thread.join();

So that there's no lingering activity going on after the test method has completed.

.withQoS(2));
pipeline.run();

Assert.assertEquals(100, messages.size());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment about this. This depends on 2 things:

  1. QoS 2 blocks the writer until a receiver sends an ack,
  2. MqttClient sends an ack only after messageArrived() has successfully completed.
    This was not obvious to me until I read javadocs of MqttClient.

Assert.assertEquals("This is test " + count, inputString);
count++;
Assert.assertTrue(inputString.startsWith("This is test "));
int count = Integer.parseInt(inputString.substring("This is test ".length()));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test only verifies that we don't get garbage (which seems extremely unlikely - I can not imagine what bug in the MqttIO implementation could lead to receiving garbage in a test like this), but it does not verify that messages aren't dropped/duplicated, which is really the important part. So this test will still pass, e.g., if the code drops all messages.

You need to verify by putting the strings into a HashSet and comparing it against an expected HashSet. Or perhaps even better, apply Flatten.iterables() and verify it using PAssert.contains() against an expected list - it will handle the unordered-ness for you.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it makes sense. Let me improve that.

*
* <h3>Writing to a MQTT broker</h3>
*
* <p>MqttIO sink supports writing {@code byte[]} to a topic on a MQTT broker.</p>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

</p> is unnecessary everywhere, this HTML tag is self-closing.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

javadoc throws warning if an opening <p> is not closed with </p>.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's very surprising to me, since the official Oracle documentation about Javadoc does not use </p> - http://www.oracle.com/technetwork/java/javase/documentation/index-137868.html

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, the warning is not from javadoc but from checkstyle.

If I use this form (as in the Oracle documentation):

line
<p>
line

it results with:

[ERROR] (javadoc) JavadocParagraph: <p> tag should be placed immediately before the first word, with no space after.
[ERROR] (javadoc) JavadocParagraph: <p> tag should be preceded with an empty line.

So, I have to use at least:


<p>line

<p>line

with an empty line before <p> (to avoid <p> tag should be preceded with an empty line.).

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anyway, I remove the closing </p> and keep only <p>.

}

@Override
public Coder getCheckpointMarkCoder() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a raw type. It needs to be Coder<CheckpointMarkT> where CheckpointMarkT is your UnboundedSource's second type parameter.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no actual checkpoint in the source. I'm using a VoidCoder there. That's why I didn't explicitly defined the type.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a checkpoint (even though you don't use it) - you are returning a non-null value from getCheckpointMark, and this value is being passed to VoidCoder - the only reason this doesn't throw a ClassCastException is that Java's type erasure didn't insert a class cast somewhere on the way, and that VoidCoder.encode ignores the supplied value (e.g. doesn't test that it's actually instanceof Void). At the very least, you'd need to make getCheckpointMark return null.

Then, it would happen to work in practice because you'd always return null for the checkpoint and VoidCoder can encode it too, and it compiles because you effectively disable type-checking by using a raw type - but formally speaking the code is not type-safe: the type of the checkpoint is UnboundedSource.CheckpointMark, but the coder is a Coder<Void>.

So: as a rule of thumb, please never use raw types, period - unless the code really does something that is provably safe but too difficult for the Java type system to express.

In this case, you can make the code type-safe by changing signature of this method to Coder<UnboundedSource.CheckpointMark> and creating a Coder<UnboundedSource.CheckpointMark> that effectively works like VoidCoder - i.e. uses 0 bytes, and can only encode null (and bails on anything else), and always decodes as null. I'm using this in KafkaIO https://github.com/apache/incubator-beam/pull/1048/files#diff-2fa38a7f8d24217f1f7bde0f5c7dbb40R1254

It might make sense to add a class like that to the SDK - e.g. "class NullOnlyCoder", and make VoidCoder derive from NullOnlyCoder<Void>.

}

@Override
public Coder getDefaultOutputCoder() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a raw type too. Please consider configuring the IDE to report rawtypes as errors and check if there's anything else I missed.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, done.

public void connectionLost(Throwable cause) {
LOGGER.warn("MQTT connection lost", cause);
try {
close();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(still needs to be addressed)

* delivered in the event of a client failure.</li>
* </ul>
*
* <p>If persistence is not configured, QoS 1 and 2 messages will still be delivered
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If persistence is not configured on the broker, you mean?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, if the broker doesn't support persistence (it's very rare, but can happen).

*
* <p>If persistence is not configured, QoS 1 and 2 messages will still be delivered
* in the event of a network or server problem as the client will hold state in memory.
* If the MQTT client is shutdown or fails and persistence is not configured then
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest to rephrase this in terms of consequences for someone who uses this transform in a pipeline. The pipeline writer has no control over the MQTT client - it runs inside a bundle that runs inside a VM or container or somewhere else, controlled by the runner and subject to arbitrary failures that the runner transparently handles and retries.

So basically, for a pipeline writer, this means that messages can be duplicated (in case the writer bundle is retried multiple times), but can not be lost: at-least-once delivery.

By the way, this means specifying QoS 2 is meaningless, since we can not provide exactly-once guarantees anyway. In order to get those guarantees, MQTT protocol would need to provide deduplication-by-message-id - does it have a feature like that? If yes, how can the pipeline writer control message ids? (automatically generated ones are not sufficient - see my latest email to beam-dev@ about the Checkpoint transform)

This might be too complicated to address in the current PR, so I'd suggest to just document that this class provides best-effort delivery at QoS 0 and at-least-once delivery at QoS 1, and address exactly-once delivery later.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand your point. Unfortunately, MQTT protocol by itself doesn't provide deduplication.
Let me update the documentation to indicate QoS 0 & 1 are fully supported, not yet QoS 2.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to explicitly prohibit QoS 2 - it's not just "limited", it doesn't work. I.e. throw an exception if a user tries to set QoS 2, because we can't do what they are asking for - and document that it's explicitly not supported, and explain why.


/**
* Whether or not the publish message should be retained by the messaging engine.
* Sending a message with the retained set to {@code false} will clear the
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this: do you mean that if you send a message with retained=true it will be saved, but then if you send the same message with retained=false then it will be removed from the server? How does the server identify "the same" message - do messages have some kind of id?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When publishing MQTT messages, a publishing client has no guarantee that a message is actually received by a subscribing client. It can only make sure its message gets delivered safely to the broker. The same is true for a subscribing client. If a client is connecting and subscribing to topics it is interested in, there is no guarantee when the subscriber will get the first message, because this totally depends on a publisher on that topic. It can take a few seconds, minutes or hours until the publisher sends a new message on that topic. Until then the subscribing client is totally in the dark about the current status. This is were retained messages come into play.
So retained messages can help newly subscribed clients to get a status update immediately after subscribing to a topic and don’t have to wait until a publishing clients send the next update.
In other words a retained message on a topic is the last known good value, because it doesn’t have to be the last value, but it certainly is the last message with the retained flag set to true.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I read up a little more about retained messages, and much to my surprise, MQTT in fact stores a single retained message per topic - I thought it has a queue of them. However, http://www.hivemq.com/blog/mqtt-essentials-part-8-retained-messages seems to imply that to clear the topic's retained message, you should publish a retained message with zero-byte payload, rather than a message with retained=false - is it the case?

Also, since PCollections are unordered, it's worth mentioning to the user that there are no guarantees about which particular message in the PCollection will be retained - it can be any one of them.

Also, this is a per-transform flag, rather than per-message flag, so the comment should be adjusted to account for that - if retained=true, then all messages are published as retained, and the topic's retained message will be one of them that's published the latest (and there are no guarantees whatsoever about which of them it will be). If retained = false, then none of the messages are retained.

Actually you may want to change the API to allow specifying QoS and retained flags on individual messages. But I'm not sure.

}
}
};
thread.start();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean thread.join().

.withQoS(2));
pipeline.run();

Assert.assertEquals(100, messages.size());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it's not sufficient. I'll wait for your reply to my comments on the documentation of different QoS in the main java file.

@dhalperi
Copy link
Copy Markdown
Contributor

R: -@dhalperi

@jbonofre
Copy link
Copy Markdown
Member Author

Resuming my work on this. Update will follow soon.

@jbonofre
Copy link
Copy Markdown
Member Author

jbonofre commented Nov 2, 2016

Rebased, and use free network port to start ActiveMQ instance in the tests.

@jkff Can we sync together to see what's pending in the PR ? Thanks !

Copy link
Copy Markdown
Contributor

@jkff jkff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, let's go over these comments tomorrow over chat.

*
* <h3>Writing to a MQTT broker</h3>
*
* <p>MqttIO sink supports writing {@code byte[]} to a topic on a MQTT broker.</p>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's very surprising to me, since the official Oracle documentation about Javadoc does not use </p> - http://www.oracle.com/technetwork/java/javase/documentation/index-137868.html

* "my_client_id",
* "my_topic"))
* .withRetained(true)
* .withQoS(2)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per discussion in the rest of the PR, setting QoS to 2 doesn't make much sense (or at least doesn't do what the user thinks it does), so the example should not use it.

public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
connectionConfiguration().populateDisplayData(builder);
builder.add(DisplayData.item("maxNumRecords", maxNumRecords()));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add this only if it's not MAX_VALUE.

}

@Override
public Coder getCheckpointMarkCoder() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a checkpoint (even though you don't use it) - you are returning a non-null value from getCheckpointMark, and this value is being passed to VoidCoder - the only reason this doesn't throw a ClassCastException is that Java's type erasure didn't insert a class cast somewhere on the way, and that VoidCoder.encode ignores the supplied value (e.g. doesn't test that it's actually instanceof Void). At the very least, you'd need to make getCheckpointMark return null.

Then, it would happen to work in practice because you'd always return null for the checkpoint and VoidCoder can encode it too, and it compiles because you effectively disable type-checking by using a raw type - but formally speaking the code is not type-safe: the type of the checkpoint is UnboundedSource.CheckpointMark, but the coder is a Coder<Void>.

So: as a rule of thumb, please never use raw types, period - unless the code really does something that is provably safe but too difficult for the Java type system to express.

In this case, you can make the code type-safe by changing signature of this method to Coder<UnboundedSource.CheckpointMark> and creating a Coder<UnboundedSource.CheckpointMark> that effectively works like VoidCoder - i.e. uses 0 bytes, and can only encode null (and bails on anything else), and always decodes as null. I'm using this in KafkaIO https://github.com/apache/incubator-beam/pull/1048/files#diff-2fa38a7f8d24217f1f7bde0f5c7dbb40R1254

It might make sense to add a class like that to the SDK - e.g. "class NullOnlyCoder", and make VoidCoder derive from NullOnlyCoder<Void>.

* but should only be used for messages which are not valuable - note that
* if the server cannot process the message (for example, there
* is an authorization problem), then an
* {@link MqttCallback#deliveryComplete(IMqttDeliveryToken)} won't be called.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the documentation that users of MqttIO.Write will see, but this class does not expose MqttCallback - so mentioning it is confusing to clients. It's better to say simply "then the message will be silently dropped".


/**
* Whether or not the publish message should be retained by the messaging engine.
* Sending a message with the retained set to {@code false} will clear the
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I read up a little more about retained messages, and much to my surprise, MQTT in fact stores a single retained message per topic - I thought it has a queue of them. However, http://www.hivemq.com/blog/mqtt-essentials-part-8-retained-messages seems to imply that to clear the topic's retained message, you should publish a retained message with zero-byte payload, rather than a message with retained=false - is it the case?

Also, since PCollections are unordered, it's worth mentioning to the user that there are no guarantees about which particular message in the PCollection will be retained - it can be any one of them.

Also, this is a per-transform flag, rather than per-message flag, so the comment should be adjusted to account for that - if retained=true, then all messages are published as retained, and the topic's retained message will be one of them that's published the latest (and there are no guarantees whatsoever about which of them it will be). If retained = false, then none of the messages are retained.

Actually you may want to change the API to allow specifying QoS and retained flags on individual messages. But I'm not sure.

Thread thread = new Thread() {
public void run() {
try {
// gives time to the pipeline to start
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's investigate together. I'm not comfortable submitting an IO connector where we don't understand why its tests work or don't work.

}
}
};
thread.start();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to thread.start() to start the thread, and then thread.join() to join it (i.e. wait for it to complete). Sounds like you called join() without calling start(), which deadlocked because the thread wasn't started.

for (int i = 0; i < 10; i++) {
MqttMessage message = new MqttMessage();
message.setQos(1);
message.setRetained(true);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way I understand it, this will overwrite "the" retained message on the topic 10 times (i.e. the retained message will be message number 9), rather than queue up 10 retained messages. I'm very confused as to how this will interact with the reader thread.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the publisher should not use retained in the test.

.withQoS(2));
pipeline.run();

Assert.assertEquals(100, messages.size());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QoS 2 makes no guarantees on when the message will be delivered to the subscriber - it synchronizes the publisher against broker, and synchronized broker against subscriber, but broker is still allowed to buffer messages. By the time you do this assertEquals(), the message may still be buffered on the broker and not yet delivered to the subscriber, so you may get something less than 100 here (including getting 0).

…s and maxReadTime are now exclusive, use ByteArrayCoder instead of SerializableCoder, use poll instead of take on the blocking queue, add javadoc on the with* methods, code cleanup
… QoS (relationship between publisher and subsribers), auto generate clientId (reliability)
@asfbot
Copy link
Copy Markdown

asfbot commented Dec 23, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/6242/
--none--

@asfgit asfgit closed this in b7d8c2c Dec 23, 2016
@jbonofre jbonofre deleted the BEAM-606-MQTTIO branch December 23, 2016 07:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants