Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-1237] Create AmqpIO #1725

Closed
wants to merge 10 commits into from
Closed

Conversation

jbonofre
Copy link
Member

@jbonofre jbonofre commented Jan 2, 2017

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

  • Make sure the PR title is formatted like:
    [BEAM-<Jira issue #>] Description of pull request
  • Make sure tests pass via mvn clean verify. (Even better, enable
    Travis-CI on your fork and ensure the whole test matrix passes).
  • Replace <Jira issue #> in the title with the actual Jira issue
    number, if there is one.
  • If this contribution is large, please file an Apache
    Individual Contributor License Agreement.

@asfbot
Copy link

asfbot commented Jan 2, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/6360/
--none--

@davorbonaci
Copy link
Member

Is this ready for review? Perhaps assign a reviewer?

@jbonofre
Copy link
Member Author

I'm updating/polishing this one. It should be ready for review just after.

@jbonofre
Copy link
Member Author

jbonofre commented Jun 6, 2017

Resuming/finalizing my work on this one.

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.4%) to 69.401% when pulling f446eed on jbonofre:BEAM-1237-AMQPIO into 810db7f on apache:master.

@jbonofre
Copy link
Member Author

jbonofre commented Jun 6, 2017

R: @jkff
CC: @dhalperi

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.02%) to 70.62% when pulling eb3afd6 on jbonofre:BEAM-1237-AMQPIO into aebd3a4 on apache:master.

Copy link
Contributor

@jkff jkff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, here's a first round.

* AmqpIO supports AMQP 1.0 protocol using the Apache QPid Proton-J library.
*
* <p>If you want to use AMQP 0.9 protocol, you might also be interested in the Apache Beam
* RabbitMqIO.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which does not exist, right? So better not mention it here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's in another pull request. Ok to remove this note.

for (String address : addresses()) {
stringBuilder.append(address).append(" ");
}
builder.add(DisplayData.item("addresses", stringBuilder.toString()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Joiner.on(" ").join(addresses())

Tracker tracker = messenger.incomingTracker();
checkpointMark.trackers.add(tracker);
currentTimestamp = new Instant(message.getCreationTime());
if (currentTimestamp.isBefore(checkpointMark.watermark)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, why? This will set the watermark to the lowest message timestamp ever observed, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the watermark is the lowest message timestamp in the checkpoint (waiting finalize). Any other suggestion ?

* A {@link PTransform} to read/receive messages using AMQP 1.0 protocol.
*/
@AutoValue
public abstract static class Read extends PTransform<PBegin, PCollection<String>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this return a PCollection<Message>, just like Pubsub and Kafka do? Likewise for write.

@Override
public boolean advance() {
messenger.recv();
if (messenger.incoming() > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Invert this to reduce nesting

for (String address : addresses()) {
stringBuilder.append(address).append(" ");
}
builder.add(DisplayData.item("addresses", stringBuilder.toString()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same

Message message = Message.Factory.create();
message.setAddress(address);
if (spec.subject() != null) {
message.setSubject(spec.subject());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems a bit weird that this is the only field we allow to set except body. Would be less weird if the IO took Message's rather than String :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

public void testRead() throws Exception {
PCollection<String> output = pipeline.apply(AmqpIO.read()
.withMaxNumRecords(10)
.withAddresses(Collections.singletonList("amqp://~localhost:" + port)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this work under ipv6 too? I think I've seen trouble before with explicitly using localhost...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should work but I will double check anyway.

};
sender.start();
pipeline.run();
sender.join();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be in try/finally w.r.t. pipeline.run()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still relevant.

receiver.start();

List<String> data = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use fewer messages and make the test faster and less log-spammy?

@jbonofre jbonofre force-pushed the BEAM-1237-AMQPIO branch 2 times, most recently from cb1cf7e to 694ff7e Compare June 12, 2017 12:09
@jbonofre
Copy link
Member Author

@jkff I implemented what you suggested: dealing directly with PCollection<Message> and providing the corresponding Coder.
NB: sorry, I squashed my changes by mistake.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.02%) to 70.675% when pulling 694ff7e on jbonofre:BEAM-1237-AMQPIO into 1597f3c on apache:master.

@jbonofre
Copy link
Member Author

I'm working on:

  • the coder will encode/decode the entire message (not only the body)
  • registration of the coder.
    I will update the pull request accordingly.

@coveralls
Copy link

Coverage Status

Coverage remained the same at 70.681% when pulling e58dda8 on jbonofre:BEAM-1237-AMQPIO into dd9abc3 on apache:master.

@jbonofre
Copy link
Member Author

@jkff I updated the PR with a AmqpMessage which is a wrapper of Message in order to provide equals() method (required by the MutationDetector to check if there's no mutation in the elements of a PCollection).

@jbonofre
Copy link
Member Author

@jkff I think the PR is ready for a new review round.

@coveralls
Copy link

Coverage Status

Changes Unknown when pulling dd52740 on jbonofre:BEAM-1237-AMQPIO into ** on apache:master**.

Copy link
Contributor

@jkff jkff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. For the coder problem - is it possible that Message.encode/decode is non-deterministic? Are you familiar with the encoding format?

Tracker tracker = messenger.incomingTracker();
checkpointMark.trackers.add(tracker);
currentTimestamp = new Instant(message.getCreationTime());
if (currentTimestamp.isBefore(checkpointMark.watermark)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem like a very useful definition of watermark - "min(all seen messages)" will only go backwards; watermarks should not go backwards.

}

/**
* Define the AMQP messages subject.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need this? The user can already set the subject on the Message in the PCollection.

public void processElement(ProcessContext processContext) throws Exception {
AmqpMessage message = processContext.element();
for (String address : spec.addresses()) {
message.getMessage().setAddress(address);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mutating the current element is forbidden, you need to make a copy

import org.apache.qpid.proton.message.Message;

/**
* Extend AMQ ProtonJ Message to override the equals() method.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unnecessary per discussion on beam-dev - please remove this class and let's figure out why coder-based equality is not helping.

return new AmqpMessageCoder();
}

// private static final int[] MESSAGE_SIZES = [1 << 14 /* 16 KiB */,1 << 20 /* 1 MiB */, 1 << 26
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems unfinished.

@Override
public List<CoderProvider> getCoderProviders() {
return ImmutableList.of(
CoderProviders.forCoder(TypeDescriptor.of(Message.class),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This provides a coder for Message, but the API is now in terms of AmqpMessage. I suppose this comment can be resolved once you remove AmqpMessage.

@jbonofre
Copy link
Member Author

@jkff I updated according to your comments. Especially, I changed:

  • directly use ProtonJ Message in PCollection
  • simplify WriteFn to use address and subject defined in the message elements of the PCollection
  • change the watermark to the message creation date
    This PR is ready for a new review round. Thanks !

@coveralls
Copy link

Coverage Status

Coverage increased (+0.1%) to 70.839% when pulling 6cc24e5 on jbonofre:BEAM-1237-AMQPIO into 50acc6c on apache:master.

Copy link
Contributor

@jkff jkff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

@Override
public void encode(Message value, OutputStream outStream) throws CoderException, IOException {
byte[] data = new byte[4096];
int bytesWritten = value.encode(data, 0, data.length);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will this do if the message is bigger than 4096 bytes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will fail. I will add a test and a set of messages size that I will test.


private static final Logger LOG = LoggerFactory.getLogger(AmqpIOTest.class);

private static int port;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't be static, I think

/**
* Tests on {@link AmqpIO}.
*/
public class AmqpIOTest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RunWith(JUnit4.class)

public void testRead() throws Exception {
PCollection<Message> output = pipeline.apply(AmqpIO.read()
.withMaxNumRecords(100)
.withAddresses(Collections.singletonList("amqp://~localhost:" + port)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where does this test start an AMQP broker?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AMQP broker can be optional: as the Reader create a Messenger, the Messenger is able to directly receive messages (point to point).

@coveralls
Copy link

Coverage Status

Coverage increased (+0.1%) to 70.828% when pulling 3dba8ee on jbonofre:BEAM-1237-AMQPIO into 50acc6c on apache:master.

@jbonofre
Copy link
Member Author

@jkff I updated the Coder to support different message sizes. Do you want me to add some AmqpIO tests using ActiveMQ as AMQP broker ?

Copy link
Contributor

@jkff jkff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! OK to self-merge after addressing remaining minor comments.

// they bind the listener
List<UnboundedAmqpSource> sources = new ArrayList<>();
if (desiredNumSplits > 0) {
for (int i = 0; i < desiredNumSplits; i++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about for (int i = 0; i < Math.max(1, desiredNumSplits); ++i)


private static final int[] MESSAGE_SIZES = new int[]{
8 * 1024, // 8 KiB
64 * 1024, // 62 KiB
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not "64KB" ? :) I think actually these comments are unnecessary anyway.

encode(value, outStream, maxMessageSize);
return;
} catch (Exception e) {
if (maxMessageSize == MESSAGE_SIZES[MESSAGE_SIZES.length - 1]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can slightly simplify:
for (maxMessageSize) {try {encode; return;} catch {continue;}}
throw new CoderException("...");

(gets rid of the if)

};
sender.start();
pipeline.run();
sender.join();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still relevant.

LOG.info("Starting pipeline");
pipeline.run();
LOG.info("Join receiver thread");
receiver.join();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also should be in try/finally

message.setSubject("test");
data.add(message);
}
pipeline.apply(Create.of(data).withCoder(AmqpMessageCoder.of())).apply(AmqpIO.write());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is withCoder still necessary here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's still required on the Create().

message.setSubject("test");
AmqpMessageCoder coder = AmqpMessageCoder.of();

byte[] encoded = CoderUtils.encodeToByteArray(coder, message);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Message clone = CoderUtils.clone(coder, message)


@Test
public void encodeDecodeLargeMessage() throws Exception {
thrown.expect(CoderException.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that "cannot encode more than 64MB" is part of what the user wants from this coder. I'd prefer if you had a test that it can encode a moderately sized message, say a few MB.

@coveralls
Copy link

Coverage Status

Coverage remained the same at 70.875% when pulling f8a14b3 on jbonofre:BEAM-1237-AMQPIO into 8036001 on apache:master.

@asfgit asfgit closed this in 9df865a Jun 27, 2017
@jbonofre jbonofre deleted the BEAM-1237-AMQPIO branch June 27, 2017 13:34
@coveralls
Copy link

Coverage Status

Coverage increased (+0.02%) to 70.893% when pulling ac944e1 on jbonofre:BEAM-1237-AMQPIO into 8036001 on apache:master.

@jkff
Copy link
Contributor

jkff commented Jun 28, 2017

Seems like AmqpIOTest sometimes hangs on my machine, hanging mvn clean verify:

...
Jun 27, 2017 7:16:08 PM org.apache.beam.sdk.io.amqp.AmqpIOTest$2 run
INFO: Received: AmqpValue{Test 93}
Jun 27, 2017 7:16:08 PM org.apache.beam.sdk.io.amqp.AmqpIOTest$2 run
INFO: Received: AmqpValue{Test 94}
Jun 27, 2017 7:16:08 PM org.apache.beam.sdk.io.amqp.AmqpIOTest$2 run
INFO: Received: AmqpValue{Test 95}
Jun 27, 2017 7:16:08 PM org.apache.beam.sdk.io.amqp.AmqpIOTest findFreeNetworkPort
INFO: Finding free network port
Jun 27, 2017 7:16:10 PM org.apache.qpid.proton.messenger.impl.MessengerImpl processAllConnectors
SEVERE: Error processing connection
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at org.apache.qpid.proton.driver.impl.ConnectorImpl.process(ConnectorImpl.java:89)
at org.apache.qpid.proton.messenger.impl.MessengerImpl.processAllConnectors(MessengerImpl.java:687)
at org.apache.qpid.proton.messenger.impl.MessengerImpl.waitUntil(MessengerImpl.java:863)
at org.apache.qpid.proton.messenger.impl.MessengerImpl.waitUntil(MessengerImpl.java:844)
at org.apache.qpid.proton.messenger.impl.MessengerImpl.send(MessengerImpl.java:417)
at org.apache.qpid.proton.messenger.impl.MessengerImpl.send(MessengerImpl.java:394)
at org.apache.beam.sdk.io.amqp.AmqpIOTest$1.run(AmqpIOTest.java:82)

Would you mind rolling this back / disabling the test, investigating the failure, and rolling forward?

@jbonofre
Copy link
Member Author

Let me disable the test for now. Does it occur only on your machine (I didn't see that on Jenkins) ?

@jkff
Copy link
Contributor

jkff commented Jun 28, 2017

It occurs pretty reliably on my machine with mvn clean verify -Prelease -T 1C -fae -pl '!sdks/python' (the command I usually use to test changes before merging)

@jbonofre
Copy link
Member Author

OK, thanks, I'm disabling the tests and investigate.

@jbonofre
Copy link
Member Author

Hmmm, I don't reproduce on my machine. Let me investigate deeper.

@jbonofre
Copy link
Member Author

Can you try mvn clean install from root first, and then mvn clean verify in the amqp module ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants