Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.

Conversation

@jbonofre
Copy link

Work in progress to provide a JMS IO to Dataflow, allowing to consume and produce messages from a JMS queue or topic.

@googlebot
Copy link

Thanks for your pull request. It looks like this may be your first contribution to a Google open source project. Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA).

📝 Please visit https://cla.developers.google.com/ to sign.

Once you've signed, please reply here (e.g. I signed it!) and we'll verify. Thanks.


  • If you've already signed a CLA, it's possible we don't have your GitHub username or you're using a different email address. Check your existing CLA data and verify that your email is set on your git commits.
  • If you signed the CLA as a corporation, please let us know the company's name.

@jbonofre
Copy link
Author

I signed it !

On 02/17/2016 02:47 PM, googlebot wrote:

Thanks for your pull request. It looks like this may be your first
contribution to a Google open source project. Before we can look at your
pull request, you'll need to sign a Contributor License Agreement (CLA).

📝 Please visit https://cla.developers.google.com/ to sign.

Once you've signed, please reply here (e.g. |I signed it!|) and we'll
verify. Thanks.



Reply to this email directly or view it on GitHub
#115 (comment).

Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

@googlebot
Copy link

CLAs look good, thanks!

@@ -0,0 +1,45 @@
# JMS module

This library provides Dataflow sources and sinkgs to make it possible to read
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sinks

@dhalperi
Copy link
Contributor

Hi JB,

Under the assumption that as a core committer you want a "lot" of review, I'll try to overdo giving you comments. Feel free to ignore (with a comment) ones that you feel aren't appropriate.

Thanks!
Dan

@jbonofre
Copy link
Author

Awesome ! Thanks a lot, much appreciate. I take all comments !!

The `DirectPipelineRunner` and the batch mode of the Dataflow service do not support unbounded `PCollections`.
To use JmsIO as source in these contexts, you need to supply a bound on the amount of messages to consume.

You can specify the `.maxNumMessages` option to read a fixed maximum number of messages.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find it interesting that you separated out the above two sections. Is there a good reason for this?

I would have kept it simpler, and more unified, something like this:

Reading from JMS

The JmsIO.Read transform produces an unbounded PCollection containing messages received from a JMS queue. By default, each element in the output collection is a UTF-8-encoded string. You can override the default encoding using JmsIO.Read.withCoder.

Pipeline p = ...;

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");

// Producing Strings by default
JmsIO.Read read = JmsIO.Read.connectionFactory(connectionFactory).queue("my-queue");
PCollection<String> data = p.apply("Read from ActiveMQ", read);

// Producing a byte[] using a different Coder
JmsIO.Read read = JmsIO.Read.connectionFactory(connectionFactory).queue("my-queue").withCoder(ByteArrayCoder.class);

JmsIO can also produce a bounded PCollection by limiting either the number of records returned (Read.withMaxNumRecords) or the time for which records are consumed (Read.withMaxReadTime). Note that if only a record limit is configured, your pipeline will not finish executing until the desired number of records is received.

JmsIO.Read read = JmsIO.Read.connectionFactory(connectionFactory).queue("my-queue").withMaxNumRecords(5);

There should probably be a type variable on JmsIO.Read, like JmsIO.Read<String>, JmsIO.Read<byte[]>, but I haven't gotten to that code yet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would keep the unbounded -> bounded method names unchanged from BoundedReadFromUnboundedSource: withMaxNumRecords and withMaxReadTime.

More unified API.

I'd also suggest copying both methods, not just limiting records, because the blocking semantics or limiting records only has tripped users up in the past.


@Before
public void startBroker() throws Exception {
System.out.println("Starting ActiveMQ broker on " + BROKER_URL);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use java.slf4j.Logger rather than stderr/stdout?

@dhalperi
Copy link
Contributor

Hi JB,

I know you are still making changes here based on our last discussions. When you're ready, would you prefer to continue here or to make a PR against the main incubator-beam repo? If the latter, let's close this and reopen once you're ready.

Thanks.
Dan

@jbonofre
Copy link
Author

Hi Dan,

I will create a new PR at Beam. Let me close this PR then.

Thanks !
Regards
JB

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants