Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-1240] Create RabbitMqIO #1729

Merged
merged 1 commit into from
Oct 25, 2018
Merged

Conversation

jbonofre
Copy link
Member

@jbonofre jbonofre commented Jan 3, 2017

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

  • Make sure the PR title is formatted like:
    [BEAM-<Jira issue #>] Description of pull request
  • Make sure tests pass via mvn clean verify. (Even better, enable
    Travis-CI on your fork and ensure the whole test matrix passes).
  • Replace <Jira issue #> in the title with the actual Jira issue
    number, if there is one.
  • If this contribution is large, please file an Apache
    Individual Contributor License Agreement.

@asfbot
Copy link

asfbot commented Jan 3, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/6376/
--none--

@davorbonaci
Copy link
Member

Is this ready for review? Perhaps assign a reviewer?

@jbonofre
Copy link
Member Author

Yes. Just pinged Dan about that ;)

@reuvenlax
Copy link
Contributor

@jbonofre is this PR still useful? If so, could you suggest a reviewer so this makes progress?

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.4%) to 69.394% when pulling 63f5a3e on jbonofre:BEAM-1240-RABBITMQIO into 810db7f on apache:master.

@jbonofre
Copy link
Member Author

@reuvenlax yes it is in complement of the AMQPIO. I will link with the PR I have on my own github (as requested by Eugene).

@aaltay
Copy link
Member

aaltay commented Oct 20, 2017

Hey @jbonofre, would @jkff be a reviewer for this?

Copy link
Contributor

@jkff jkff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, a couple of questions.

  • What benefit does this provide over using AmqpIO? RabbitMQ already implements the AMQP protocol, so AmqpIO can be used to talk to RabbitMQ, right?
  • Both unit tests are currently marked "ignored", i.e. this code effectively doesn't have tests, and it will be easy for somebody to accidentally break it. What's the plan for dealing with this? I think this needs to be fixed before the PR can go in.

@jbonofre
Copy link
Member Author

I will rebase and update this PR.

About the questions:

  • RabbitMQ implements AMPQ 1.0 whereas AMPQIO (based on QPid) implements AMQP 0.9
  • I will enable and update the tests.

Thanks !

@johanhaleby
Copy link

@jbonofre It says in the AqmpIO javadoc that it supports AMQP 1.0, don't you mean the other way around (i.e. that RabbitMQ uses 0.9.1 by default)?

Anyway I'm really looking forward for this PR to be merged and released.

@johanhaleby
Copy link

johanhaleby commented Nov 5, 2017

Are you interested in getting some help to get the tests fixed? If so I might be able to help out. Writing embedded RabbitMQ tests can be quite tricky, but there might be a couple of workarounds:

  1. Use apache qpid embedded broker and make sure that it's configured to use AMQP 0.9.x. See this stackoverflow answer.
  2. Use something like TestContainers to start RabbitMQ as a Docker container. This requires that Docker is installed on the build server though and I have no insight into this? Also the test suite will be slower since the Docker image needs to be pulled.
  3. Use embedded rabbitmq which downloads an entire RabbitMQ installation when starting the test case. If Docker is not available on the build server this might be a better alternative but it'll also slow down the test significantly (at least when building from scratch) due do downloading of RabbitMQ.

WDYT?

@jkff
Copy link
Contributor

jkff commented Nov 5, 2017

Sorry, I still don't understand why a RabbitMqIO is necessary in general, as opposed to using AmqpIO pointed at the RabbitMQ endpoint? Regardless of whether RabbitMQ implements AMQP 0.9.1 or 1.0, it seems like the way to support a particular version of AMQP should be to upgrade AmqpIO? @johanhaleby are you currently blocked by something that AmqpIO can't do?

@johanhaleby
Copy link

johanhaleby commented Nov 5, 2017

it seems like the way to support a particular version of AMQP should be to upgrade AmqpIO?

Well that sounds reasonable indeed, unless there're some specific features of RabbitMQ that it cannot support?

are you currently blocked by something that AmqpIO can't do?

To be honest I haven't even tried it, I just googled for Beam and rabbitmq support which lead me to this issue. Reading the amqp javadoc suggests that it only supports AMQP 1.0 (and for some reason I thought that was intentional), please correct me if I'm wrong. If it only supports AMQP 1.0, then I'm indeed blocked since we're using 0.9.1 in RabbitMQ. But then again, if we could make the AmqpIO support 0.9.1 then that sounds better to me.

@jbonofre jbonofre force-pushed the BEAM-1240-RABBITMQIO branch 2 times, most recently from 29c3b65 to 4b127a4 Compare November 7, 2017 05:22
@jbonofre
Copy link
Member Author

jbonofre commented Nov 7, 2017

I fixed and updated RabbitMQIO.

Actually, it's the opposite in term of version: AMQPIO is AMQP 1.0/1.1 version whereas RabbitMQIO is 0.9.

@jbonofre
Copy link
Member Author

jbonofre commented Nov 7, 2017

@jkff I fixed the IO and the tests. As said, AMQP IO (with usage of proton/qpid) only supports AMQP 1.0/1.1. In order to support AMQP 0.9.x (version used by RabbitMQ), we need to use the RabbitMQ client. That's the purpose of this PR (AMQP 0.9.x is very RabbitMQ specific).

@johanhaleby the tests are now OK and the IO is ready to be tested.

@johanhaleby
Copy link

johanhaleby commented Nov 7, 2017 via email

@jbonofre
Copy link
Member Author

jbonofre commented Nov 7, 2017

That's another option: I can gather both lib in AmqpIO and use a configuration to let the user decide the AMQP protocol version.

As AMQP 0.9 is "only" RabbitMQ, I think it gives more visibility to have a dedicated IO.

@jkff thoughts ?

@jkff
Copy link
Contributor

jkff commented Nov 7, 2017

OK, I think I agree that RabbitMqIO makes sense for AMQP 0-9-1 because AFAIK RabbitMQ is the only system that implements AMQP 0-9-1 (all other "generic AMQP implementations" implement AMQP 1.0) so if people need it, it's probably because they are using RabbitMQ, and in that case they'll find it easier to locate it by RabbitMqIO. I'll take a look at the PR contents soon.


@Override
public void validate(PipelineOptions pipelineOptions) {
checkState(connectionConfig() != null, "RabbitMqIO.read() requires a connection config "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be in expand() because it doesn't use the PipelineOptions. Also should be rephrased:

checkState(..., "withConnectionConfig() is required.")

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, it's an "old" PR and I have to update to match the IO style guide. I'm doing it now, thanks !

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, the checkState doesn't make sense anymore as a default ConnectionConfig is created in the constructor method. I remove this.

}

@Override
public Coder<byte[]> getDefaultOutputCoder() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is deprecated, override getOutputCoder instead

}

@Override
public void validate() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a no-op, so this method can be omitted

// RabbitMQ uses queue, so, we can have several concurrent consumers as source
List<RabbitMQSource> sources = new ArrayList<>();
for (int i = 0; i < desiredNumSplits; i++) {
sources.add(new RabbitMQSource(spec));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be simply sources.add(this)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it not a new source for each split (to have concurrent consumers) ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RabbitMQSource is simply an immutable java object, there's no difference between returning one object or returning another object that is completely equivalent to it. A Beam runner will anyway serialize them at construction time and deserialize each split on the respective worker.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

if (channel == null) {
throw new IOException("No RabbitMQ channel available");
}
channel.queueDeclare(source.spec.connectionConfig().queue(), false, false, false, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please annotate the false and null parameters with names for readability

if (channel == null) {
throw new IOException("No RabbitMQ channel available");
}
channel.queueDeclare(spec.connectionConfig().queue(), false, false, false, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what this code is trying to do. I believe RabbitMQ producers should publish to an exchange rather than a queue, and they shouldn't even care whether any queues bound to this exchange currently exist:

https://www.rabbitmq.com/tutorials/tutorial-three-python.html

The core idea in the messaging model in RabbitMQ is that the producer never sends any messages directly to a queue. Actually, quite often the producer doesn't even know if a message will be delivered to any queue at all. Instead, the producer can only send messages to an exchange....

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, forgot to merge that change. Merging.

broker = new Broker();
BrokerOptions options = new BrokerOptions();
options.setConfigProperty("qpid.amqp_port", String.valueOf(PORT));
options.setConfigProperty("qpid.work_dir", Files.createTempDirectory("qpid").toString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a @Rule TemporaryFolder

*/
public class RabbitMqIOTest {

public static final int PORT = 5672;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if this port is taken?

connectionFactory.setUri("amqp://guest:guest@localhost:" + PORT);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("READ", false, false, false, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here too, the publisher should not be declaring queues, and rather it should be publishing to an exchange.

# specific language governing permissions and limitations
# under the License.
#
guest:guest
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to specify this programmatically rather than via a configuration file?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it should be do-able. I'm doing this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any progress?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping?

@jkff
Copy link
Contributor

jkff commented Nov 9, 2017

OK, seems that the changes are not done yet. Please ping this thread when ready for another round.

@asfgit
Copy link

asfgit commented Nov 12, 2017

SUCCESS

--none--

@jbonofre
Copy link
Member Author

@jkff I addressed most of your comments. Just checking to configuration the plain authentication provider programmatically in order to avoid the passwd file.

@asfgit
Copy link

asfgit commented Nov 12, 2017

FAILURE

--none--

@asfgit
Copy link

asfgit commented Nov 12, 2017

SUCCESS

--none--

@jbonofre
Copy link
Member Author

retest this please

@asfgit
Copy link

asfgit commented Nov 13, 2017

SUCCESS

--none--

@jbonofre
Copy link
Member Author

@echauchot great thanks !

Copy link
Contributor

@jkff jkff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! (my last review on this one - deferring to other reviewers afterwards)

*
* <pre>{@code
* pipeline.apply(
* RabbitMqIO.read().withUri("amqp://user:password@localhost:5672").withQueue("QUEUE")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing ), and missing "PCollection messages = ..."

*
* <h3>Publishing messages to RabbitMQ server</h3>
*
* <p>{@link RabbitMqIO} {@link Write} can send {@code byte[]} to a RabbitMQ server queue or
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's actually RabbitMqMessage

@Override
public boolean advance() throws IOException {
try {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still relevant

new Thread(
() -> {
try {
Thread.sleep(5000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a simple while with timeout to check the Qpid status to avoid flaky tests in case 5 sec is not enough for the QPid init time on an overloaded jenkins server.

# specific language governing permissions and limitations
# under the License.
#
guest:guest
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping?

@jbonofre
Copy link
Member Author

@jkff thanks for the update, I will address your comments !

@akankshajain18
Copy link

akankshajain18 commented Jul 20, 2018

Hi All

Thanks for RmqIO :)
But It will be great if setDurable(boolean durable) API get exposed to end user
As of now, non-durable queue get created everytime.
[
// channel.queueDeclare(queueName, durable, exclusive, autoDelete, arguments);
channel.queueDeclare(queueName, false, false, false, null)
]

Copy link
Contributor

@echauchot echauchot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jbonofre, this IO seems to be in very good shape, thanks !
Since @jkff resumed his review I did just a quick review focusing on tests. I also have a comment on the watermark management.

@jbonofre
Copy link
Member Author

jbonofre commented Jul 21, 2018

I exposed durable to end users for queue declaration (as requested by @akankshajain18).

@jbonofre
Copy link
Member Author

retest this please

@akankshajain18
Copy link

Thanks @jbonofre

@akankshajain18
Copy link

Hello Folks

While working with RmqIO and Flink local runner of the beam, I observed, Even if Rmq has data to consume, Beam pipeline Job execution switched to status FINISHED without any exception.
In logs, RabbitMqIO$UnboundedRabbitMqReader close() method get called, which led to stopping the connection between RMQ and my beam job(running on Flink runner).

Code :
[@OverRide
public void close() throws IOException {
if (connectionHandler != null) {
connectionHandler.stop();
}
}]

It will be great if someone can help me out about this random behavior.

@jbonofre
Copy link
Member Author

@akankshajain18 that's probably related to the watermark behavior which is not correct for now. I'm fixing that and other comments. I'm moving forward on this PR. Sorry for the delay.

@jbonofre jbonofre force-pushed the BEAM-1240-RABBITMQIO branch 2 times, most recently from 424bea6 to 84c948e Compare October 11, 2018 06:58
Copy link
Contributor

@echauchot echauchot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks JB !
minor changes on historical comments + my first round comments and we are done !

@jbonofre jbonofre force-pushed the BEAM-1240-RABBITMQIO branch 2 times, most recently from bfa77c6 to a4d9e5b Compare October 14, 2018 16:16
Copy link
Contributor

@echauchot echauchot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing my comments JB! You just missed the one on equals impl and the one on Thread.sleep for which I added 2 suggestions. Also suggested to use maxNumRecords in place of absolute 10 in one of the tests.

p.run();

while (received.size() < maxNumRecords) {
Thread.sleep(500);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

p.run();

while (received.size() < maxNumRecords) {
Thread.sleep(500);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no problem with this sleep because it is not a wait for an absolute period of time but more a way to temporise the check of the receive queue depth. As a very minor suggestion (only if you feel the envy :) ), I would implement it with a new CountDownLatch(maxNumRecords); countdown.await() and countdown.countDown in the TestConsumer when it receives a message.
But once again it is very minor way to reduce test time a tiny bit but I'm ok with current impl

Copy link
Contributor

@echauchot echauchot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM ! Thanks for your work JB ! Please squash the commits into the first one and ping me when it's done so that I can merge the PR

@jbonofre
Copy link
Member Author

@echauchot Squash & rebase done.

@echauchot echauchot merged commit 6c1d92a into apache:master Oct 25, 2018
@alafanechere
Copy link

I tried to use the RabbitMqIO with the direct runner to generate an unbounded PCollection from a queue. I encounter a NPE :

java.lang.NullPointerException
    at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement (UnboundedReadEvaluatorFactory.java:169)

...

After investigation it looks like it's caused by the fact that no default is given to
checkpointMark.oldestTimestamp. getWatermark() is called before the mutation of the currentTimestamp variable, raising a NPE. I fixed the problem on my side, reimplementing the class and overriding getWatermark to return Instant.now() if checkpointMark.oldestTimestamp is null :

@Override
publicInstantgetWatermark() {
  if (checkpointMark.oldestTimestamp == null) {
    returnInstant.now();
  }
  return checkpointMark.oldestTimestamp;
}

It looks likes this bug as already been raised on Jira and on Stack Overflow

Do you think it's a problem that needs to be corrected ? Is there any workaround you might think of ?

@jbonofre jbonofre deleted the BEAM-1240-RABBITMQIO branch January 14, 2019 15:32
@jbonofre
Copy link
Member Author

@alafanechere agree, can you please create a dedicated Jira about that ? I will fix it.

@jbonofre
Copy link
Member Author

@alafanechere by the way, I can create the Jira for you (and I will assign the Jira to me). Please let me know.

@alafanechere
Copy link

alafanechere commented Jan 14, 2019

Thanks ! I juste created the JIRA : https://jira.apache.org/jira/browse/BEAM-6424

@alafanechere
Copy link

alafanechere commented Mar 15, 2019

Hello @jbonofre , I still have this issue with the IO. Is there a specific configuration of the queue to make the consumption / ack of message work seamless with the Beam ? I tried to bypass the problem by patching the IO and using autoack on arrival but it looks like bad design...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet