From c22b22e771bb0d94a447faaef9b9143973102ec5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Baptiste=20Onofr=C3=A9?= Date: Wed, 28 Dec 2016 15:06:09 +0100 Subject: [PATCH 01/10] [BEAM-1237] Create AmqpIO --- sdks/java/io/amqp/pom.xml | 95 ++++ .../org/apache/beam/sdk/io/amqp/AmqpIO.java | 457 ++++++++++++++++++ .../beam/sdk/io/amqp/AmqpMessageCoder.java | 54 +++ .../apache/beam/sdk/io/amqp/package-info.java | 22 + .../apache/beam/sdk/io/amqp/AmqpIOTest.java | 140 ++++++ sdks/java/io/pom.xml | 1 + 6 files changed, 769 insertions(+) create mode 100644 sdks/java/io/amqp/pom.xml create mode 100644 sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java create mode 100644 sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoder.java create mode 100644 sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/package-info.java create mode 100644 sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java diff --git a/sdks/java/io/amqp/pom.xml b/sdks/java/io/amqp/pom.xml new file mode 100644 index 000000000000..fcaa9878d0a6 --- /dev/null +++ b/sdks/java/io/amqp/pom.xml @@ -0,0 +1,95 @@ + + + + + 4.0.0 + + + org.apache.beam + beam-sdks-java-io-parent + 2.1.0-SNAPSHOT + ../pom.xml + + + beam-sdks-java-io-amqp + Apache Beam :: SDKs :: Java :: IO :: AMQP + IO to read and write using AMQP 1.0 protocol (http://www.amqp.org). + + + + org.apache.beam + beam-sdks-java-core + + + + org.slf4j + slf4j-api + + + + joda-time + joda-time + + + + com.google.guava + guava + + + + com.google.code.findbugs + jsr305 + + + + org.apache.qpid + proton-j + 0.13.1 + + + + + com.google.auto.value + auto-value + provided + + + + + org.slf4j + slf4j-jdk14 + test + + + junit + junit + test + + + org.hamcrest + hamcrest-all + test + + + org.apache.beam + beam-runners-direct-java + test + + + + \ No newline at end of file diff --git a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java new file mode 100644 index 000000000000..779191b37d34 --- /dev/null +++ b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java @@ -0,0 +1,457 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.amqp; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +import com.google.auto.value.AutoValue; +import com.google.common.base.Joiner; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.NoSuchElementException; + +import javax.annotation.Nullable; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.qpid.proton.message.Message; +import org.apache.qpid.proton.messenger.Messenger; +import org.apache.qpid.proton.messenger.Tracker; +import org.joda.time.Duration; +import org.joda.time.Instant; + +/** + * AmqpIO supports AMQP 1.0 protocol using the Apache QPid Proton-J library. + * + *

It's also possible to use AMQP 1.0 protocol via Apache Qpid JMS connection factory and the + * Apache Beam JmsIO. + * + *

Binding AMQP and receive messages

+ * + *

The {@link AmqpIO} {@link Read} can bind a AMQP listener endpoint and receive messages. It can + * also connect to a AMPQ broker (such as Apache Qpid or Apache ActiveMQ). + * + *

{@link AmqpIO} {@link Read} returns an unbounded {@link PCollection} of {@link Message} + * containing the received messages. + * + *

To configure a AMQP source, you have to provide a list of addresses where it will receive + * messages. An address has the following form: {@code + * [amqp[s]://][user[:password]@]domain[/[name]]} where {@code domain} can be one of {@code + * host | host:port | ip | ip:port | name}. NB: the {@code ~} character allows to bind a AMQP + * listener instead of connecting to a remote broker. For instance {@code amqp://~0.0.0.0:1234} + * will bind a AMQP listener on any network interface on the 1234 port number. + * + *

The following example illustrates how to configure a AMQP source: + * + *

{@code
+ *
+ *  pipeline.apply(AmqpIO.read()
+ *    .withAddresses(Collections.singletonList("amqp://host:1234")))
+ *
+ * }
+ * + *

Sending messages to a AMQP endpoint

+ * + *

{@link AmqpIO} provides a sink to send {@link PCollection} elements as messages. + * + *

As for the {@link Read}, {@link AmqpIO} {@link Write} requires a list of addresses where to + * send messages. The following example illustrates how to configure the {@link AmqpIO} + * {@link Write}: + * + *

{@code
+ *
+ *  pipeline
+ *    .apply(...) // provide PCollection
+ *    .apply(AmqpIO.write()
+ *      .withAddresses(Collections.singletonList("amqp://host:1234")));
+ *
+ * }
+ */ +public class AmqpIO { + + public static Read read() { + return new AutoValue_AmqpIO_Read.Builder().setMaxNumRecords(Long.MAX_VALUE).build(); + } + + public static Write write() { + return new AutoValue_AmqpIO_Write.Builder().build(); + } + + private AmqpIO() { + } + + /** + * A {@link PTransform} to read/receive messages using AMQP 1.0 protocol. + */ + @AutoValue + public abstract static class Read extends PTransform> { + + @Nullable abstract List addresses(); + abstract long maxNumRecords(); + @Nullable abstract Duration maxReadTime(); + + abstract Builder builder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setAddresses(List addresses); + abstract Builder setMaxNumRecords(long maxNumRecords); + abstract Builder setMaxReadTime(Duration maxReadTime); + abstract Read build(); + } + + /** + * Define the AMQP addresses where to receive messages. + */ + public Read withAddresses(List addresses) { + checkArgument(addresses != null, "AmqpIO.read().withAddresses(addresses) called with null" + + " addresses"); + checkArgument(!addresses.isEmpty(), "AmqpIO.read().withAddresses(addresses) called with " + + "empty addresses list"); + return builder().setAddresses(addresses).build(); + } + + /** + * Define the max number of records received by the {@link Read}. + * When the max number of records is lower than {@code Long.MAX_VALUE}, the {@link Read} will + * provide a bounded {@link PCollection}. + */ + public Read withMaxNumRecords(long maxNumRecords) { + checkArgument(maxReadTime() == null, + "maxNumRecord and maxReadTime are exclusive"); + return builder().setMaxNumRecords(maxNumRecords).build(); + } + + /** + * Define the max read time (duration) while the {@link Read} will receive messages. + * When this max read time is not null, the {@link Read} will provide a bounded + * {@link PCollection}. + */ + public Read withMaxReadTime(Duration maxReadTime) { + checkArgument(maxNumRecords() == Long.MAX_VALUE, + "maxNumRecord and maxReadTime are exclusive"); + return builder().setMaxReadTime(maxReadTime).build(); + } + + @Override + public void validate(PipelineOptions pipelineOptions) { + checkState(addresses() != null, "AmqIO.read() requires addresses list to be set via " + + "withAddresses(addresses)"); + checkState(!addresses().isEmpty(), "AmqIO.read() requires a non-empty addresses list to be" + + " set via withAddresses(addresses)"); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add(DisplayData.item("addresses", Joiner.on(" ").join(addresses()))); + } + + @Override + public PCollection expand(PBegin input) { + org.apache.beam.sdk.io.Read.Unbounded unbounded = + org.apache.beam.sdk.io.Read.from(new UnboundedAmqpSource(this)); + + PTransform> transform = unbounded; + + if (maxNumRecords() != Long.MAX_VALUE) { + transform = unbounded.withMaxNumRecords(maxNumRecords()); + } else if (maxReadTime() != null) { + transform = unbounded.withMaxReadTime(maxReadTime()); + } + + return input.getPipeline().apply(transform); + } + + } + + private static class AmqpCheckpointMark implements UnboundedSource.CheckpointMark, Serializable { + + private transient Messenger messenger; + private transient List trackers = new ArrayList<>(); + private Instant watermark; + + public AmqpCheckpointMark() { + } + + @Override + public void finalizeCheckpoint() { + for (Tracker tracker : trackers) { + // flag as not cumulative + messenger.accept(tracker, 0); + } + trackers.clear(); + watermark = Instant.now(); + } + + // set an empty list to messages when deserialize + private void readObject(java.io.ObjectInputStream stream) + throws java.io.IOException, ClassNotFoundException { + trackers = new ArrayList<>(); + watermark = Instant.now(); + } + + } + + private static class UnboundedAmqpSource extends UnboundedSource { + + private final Read spec; + + public UnboundedAmqpSource(Read spec) { + this.spec = spec; + } + + @Override + public List split(int desiredNumSplits, + PipelineOptions pipelineOptions) { + // amqp is a queue system, so, it's possible to have multiple concurrent sources, even if + // they bind the listener + List sources = new ArrayList<>(); + if (desiredNumSplits > 0) { + for (int i = 0; i < desiredNumSplits; i++) { + sources.add(new UnboundedAmqpSource(spec)); + } + } else { + sources.add(new UnboundedAmqpSource(spec)); + } + return sources; + } + + @Override + public UnboundedReader createReader(PipelineOptions pipelineOptions, + AmqpCheckpointMark checkpointMark) { + return new UnboundedAmqpReader(this, checkpointMark); + } + + @Override + public Coder getDefaultOutputCoder() { + return new AmqpMessageCoder(); + } + + @Override + public Coder getCheckpointMarkCoder() { + return SerializableCoder.of(AmqpCheckpointMark.class); + } + + @Override + public void validate() { + spec.validate(null); + } + + } + + private static class UnboundedAmqpReader extends UnboundedSource.UnboundedReader { + + private final UnboundedAmqpSource source; + + private Messenger messenger; + private Message current; + private Instant currentTimestamp; + private AmqpCheckpointMark checkpointMark; + + public UnboundedAmqpReader(UnboundedAmqpSource source, AmqpCheckpointMark checkpointMark) { + this.source = source; + this.current = null; + if (checkpointMark != null) { + this.checkpointMark = checkpointMark; + } else { + this.checkpointMark = new AmqpCheckpointMark(); + } + } + + @Override + public Instant getWatermark() { + return checkpointMark.watermark; + } + + @Override + public Instant getCurrentTimestamp() { + if (current == null) { + throw new NoSuchElementException(); + } + return currentTimestamp; + } + + @Override + public Message getCurrent() { + if (current == null) { + throw new NoSuchElementException(); + } + return current; + } + + @Override + public UnboundedSource.CheckpointMark getCheckpointMark() { + return checkpointMark; + } + + @Override + public UnboundedAmqpSource getCurrentSource() { + return source; + } + + @Override + public boolean start() throws IOException { + Read spec = source.spec; + messenger = Messenger.Factory.create(); + messenger.start(); + for (String address : spec.addresses()) { + messenger.subscribe(address); + } + checkpointMark.messenger = messenger; + return advance(); + } + + @Override + public boolean advance() { + messenger.recv(); + if (messenger.incoming() <= 0) { + current = null; + return false; + } + Message message = messenger.get(); + Tracker tracker = messenger.incomingTracker(); + checkpointMark.trackers.add(tracker); + currentTimestamp = new Instant(message.getCreationTime()); + if (currentTimestamp.isBefore(checkpointMark.watermark)) { + checkpointMark.watermark = currentTimestamp; + } + current = message; + return true; + } + + @Override + public void close() { + if (messenger != null) { + messenger.stop(); + } + } + + } + + /** + * A {@link PTransform} to send messages using AMQP 1.0 protocol. + */ + @AutoValue + public abstract static class Write extends PTransform, PDone> { + + @Nullable abstract List addresses(); + @Nullable abstract String subject(); + + abstract Builder builder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setAddresses(List addresses); + abstract Builder setSubject(String subject); + abstract Write build(); + } + + /** + * Define the AMQP addresses where to send messages. + */ + public Write withAddresses(List addresses) { + checkArgument(addresses != null, "AmqpIO.write().withAddresses(addresses) called with " + + "null addresses"); + checkArgument(!addresses.isEmpty(), "AmpqIO.write().withAddresses(addresses) called with " + + "empty addresses list"); + return builder().setAddresses(addresses).build(); + } + + /** + * Define the AMQP messages subject. + */ + public Write withSubject(String subject) { + checkArgument(subject != null, "AmqpIO.write().withSubject(subject) called with null " + + "subject"); + return builder().setSubject(subject).build(); + } + + @Override + public PDone expand(PCollection input) { + input.apply(ParDo.of(new WriteFn(this))); + return PDone.in(input.getPipeline()); + } + + @Override + public void validate(PipelineOptions pipelineOptions) { + checkState(addresses() != null, "AmqIO.write() requires addresses list to be set via " + + "withAddresses(addresses)"); + checkState(!addresses().isEmpty(), "AmqIO.write() requires a non-empty addresses list to be" + + " set via withAddresses(addresses)"); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add(DisplayData.item("addresses", Joiner.on(" ").join(addresses()))); + builder.addIfNotNull(DisplayData.item("subject", subject())); + } + + private static class WriteFn extends DoFn { + + private final Write spec; + + private transient Messenger messenger; + + public WriteFn(Write spec) { + this.spec = spec; + } + + @Setup + public void setup() throws Exception { + messenger = Messenger.Factory.create(); + messenger.start(); + } + + @ProcessElement + public void processElement(ProcessContext processContext) throws Exception { + Message message = processContext.element(); + for (String address : spec.addresses()) { + message.setAddress(address); + if (spec.subject() != null) { + message.setSubject(spec.subject()); + } + messenger.put(message); + messenger.send(); + } + } + + @Teardown + public void teardown() throws Exception { + if (messenger != null) { + messenger.stop(); + } + } + + } + + } + +} diff --git a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoder.java b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoder.java new file mode 100644 index 000000000000..d368695a3754 --- /dev/null +++ b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoder.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.amqp; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.qpid.proton.amqp.messaging.AmqpValue; +import org.apache.qpid.proton.message.Message; + +/** + * A coder for AMQP message. + */ +public class AmqpMessageCoder extends CustomCoder { + + private final StringUtf8Coder stringCoder = StringUtf8Coder.of(); + + static AmqpMessageCoder of() { + return new AmqpMessageCoder(); + } + + @Override + public void encode(Message value, OutputStream outStream) throws CoderException, IOException { + String body = new AmqpValue(value.getBody()).toString(); + stringCoder.encode(body, outStream); + } + + @Override + public Message decode(InputStream inStream) throws CoderException, IOException { + Message message = Message.Factory.create(); + message.setBody(new AmqpValue(stringCoder.decode(inStream))); + return message; + } + +} diff --git a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/package-info.java b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/package-info.java new file mode 100644 index 000000000000..091f23424a3a --- /dev/null +++ b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Transforms for reading and writing using AMQP 1.0 protocol. + */ +package org.apache.beam.sdk.io.amqp; diff --git a/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java b/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java new file mode 100644 index 000000000000..719bb1b45789 --- /dev/null +++ b/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.amqp; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.net.ServerSocket; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.PCollection; +import org.apache.qpid.proton.amqp.messaging.AmqpValue; +import org.apache.qpid.proton.message.Message; +import org.apache.qpid.proton.messenger.Messenger; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tests on {@link AmqpIO}. + */ +public class AmqpIOTest { + + private static final Logger LOG = LoggerFactory.getLogger(AmqpIOTest.class); + + private static int port; + + @Rule public TestPipeline pipeline = TestPipeline.create(); + + @Before + public void findFreeNetworkPort() throws Exception { + LOG.info("Finding free network port"); + ServerSocket socket = new ServerSocket(0); + port = socket.getLocalPort(); + socket.close(); + } + + @Test + public void testRead() throws Exception { + PCollection output = pipeline.apply(AmqpIO.read() + .withMaxNumRecords(10) + .withAddresses(Collections.singletonList("amqp://~localhost:" + port))); + PAssert.thatSingleton(output.apply(Count.globally())).isEqualTo(10L); + + Thread sender = new Thread() { + public void run() { + try { + Thread.sleep(500); + Messenger sender = Messenger.Factory.create(); + sender.start(); + for (int i = 0; i < 10; i++) { + Message message = Message.Factory.create(); + message.setAddress("amqp://localhost:" + port); + message.setBody(new AmqpValue("Test " + i)); + sender.put(message); + sender.send(); + } + sender.stop(); + } catch (Exception e) { + LOG.error("Sender error", e); + } + } + }; + sender.start(); + pipeline.run(); + sender.join(); + } + + @Test + public void testWrite() throws Exception { + final List received = new ArrayList<>(); + Thread receiver = new Thread() { + @Override + public void run() { + try { + Messenger messenger = Messenger.Factory.create(); + messenger.start(); + messenger.subscribe("amqp://~localhost:" + port); + while (received.size() < 100) { + messenger.recv(); + while (messenger.incoming() > 0) { + Message message = messenger.get(); + LOG.info("Received: " + message.getBody().toString()); + received.add(message.getBody().toString()); + } + } + messenger.stop(); + } catch (Exception e) { + LOG.error("Receiver error", e); + } + } + }; + LOG.info("Starting AMQP receiver"); + receiver.start(); + + List data = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + Message message = Message.Factory.create(); + message.setBody(new AmqpValue("Test " + i)); + data.add(message); + } + pipeline.apply(Create.of(data).withCoder(AmqpMessageCoder.of())) + .apply(AmqpIO.write() + .withAddresses(Collections.singletonList("amqp://localhost:" + port))); + LOG.info("Starting pipeline"); + pipeline.run(); + LOG.info("Join receiver thread"); + receiver.join(); + + assertEquals(100, received.size()); + for (int i = 0; i < 100; i++) { + assertTrue(received.contains("AmqpValue{AmqpValue{AmqpValue{AmqpValue{AmqpValue" + + "{Test " + i + "}}}}}")); + } + } + +} diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml index 13cd418355da..8e5efeb21b39 100644 --- a/sdks/java/io/pom.xml +++ b/sdks/java/io/pom.xml @@ -80,6 +80,7 @@ mongodb mqtt xml + amqp From a82b44086000b4ac7660b79c47156aaf261c4dd5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Baptiste=20Onofr=C3=A9?= Date: Wed, 14 Jun 2017 22:09:22 +0200 Subject: [PATCH 02/10] [BEAM-1237] Add AmqpMessageCoderProviderRegistrar and AmqpMessageCoder deals the whole message (not only the body) TODO: fix the Coder --- sdks/java/io/amqp/pom.xml | 10 +++++ .../beam/sdk/io/amqp/AmqpMessageCoder.java | 10 +++-- .../AmqpMessageCoderProviderRegistrar.java | 44 +++++++++++++++++++ 3 files changed, 60 insertions(+), 4 deletions(-) create mode 100644 sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderProviderRegistrar.java diff --git a/sdks/java/io/amqp/pom.xml b/sdks/java/io/amqp/pom.xml index fcaa9878d0a6..67ede1f9b789 100644 --- a/sdks/java/io/amqp/pom.xml +++ b/sdks/java/io/amqp/pom.xml @@ -61,6 +61,11 @@ proton-j 0.13.1 + + commons-io + commons-io + 2.5 + @@ -68,6 +73,11 @@ auto-value provided + + com.google.auto.service + auto-service + true + diff --git a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoder.java b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoder.java index d368695a3754..bb8a778dfb04 100644 --- a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoder.java +++ b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoder.java @@ -24,7 +24,7 @@ import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.qpid.proton.amqp.messaging.AmqpValue; +import org.apache.commons.io.IOUtils; import org.apache.qpid.proton.message.Message; /** @@ -40,14 +40,16 @@ static AmqpMessageCoder of() { @Override public void encode(Message value, OutputStream outStream) throws CoderException, IOException { - String body = new AmqpValue(value.getBody()).toString(); - stringCoder.encode(body, outStream); + byte[] data = new byte[16384]; + value.encode(data, 0, data.length); + outStream.write(data); } @Override public Message decode(InputStream inStream) throws CoderException, IOException { Message message = Message.Factory.create(); - message.setBody(new AmqpValue(stringCoder.decode(inStream))); + byte[] data = IOUtils.toByteArray(inStream); + message.decode(data, 0, data.length); return message; } diff --git a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderProviderRegistrar.java b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderProviderRegistrar.java new file mode 100644 index 000000000000..bc3445cf9781 --- /dev/null +++ b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderProviderRegistrar.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.amqp; + +import com.google.auto.service.AutoService; +import com.google.common.collect.ImmutableList; + +import java.util.List; + +import org.apache.beam.sdk.coders.CoderProvider; +import org.apache.beam.sdk.coders.CoderProviderRegistrar; +import org.apache.beam.sdk.coders.CoderProviders; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.qpid.proton.message.Message; + +/** + * A {@link CoderProviderRegistrar} for standard types used with {@link AmqpIO}. + */ +@AutoService(CoderProviderRegistrar.class) +public class AmqpMessageCoderProviderRegistrar implements CoderProviderRegistrar { + + @Override + public List getCoderProviders() { + return ImmutableList.of( + CoderProviders.forCoder(TypeDescriptor.of(Message.class), + AmqpMessageCoder.of())); + } + +} From 5674be8424935f6f9c2e418a019ad6085a757104 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Baptiste=20Onofr=C3=A9?= Date: Thu, 15 Jun 2017 18:47:11 +0200 Subject: [PATCH 03/10] [BEAM-1237] Improve the AmqpMessageCoder and add a test on the coder --- sdks/java/io/amqp/pom.xml | 5 -- .../beam/sdk/io/amqp/AmqpMessageCoder.java | 31 +++++--- .../apache/beam/sdk/io/amqp/AmqpIOTest.java | 3 + .../sdk/io/amqp/AmqpMessageCoderTest.java | 70 +++++++++++++++++++ 4 files changed, 95 insertions(+), 14 deletions(-) create mode 100644 sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderTest.java diff --git a/sdks/java/io/amqp/pom.xml b/sdks/java/io/amqp/pom.xml index 67ede1f9b789..45b295dfce24 100644 --- a/sdks/java/io/amqp/pom.xml +++ b/sdks/java/io/amqp/pom.xml @@ -61,11 +61,6 @@ proton-j 0.13.1 - - commons-io - commons-io - 2.5 - diff --git a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoder.java b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoder.java index bb8a778dfb04..b6273bb1a5c1 100644 --- a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoder.java +++ b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoder.java @@ -17,14 +17,15 @@ */ package org.apache.beam.sdk.io.amqp; +import com.google.common.io.ByteStreams; + import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.CustomCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.commons.io.IOUtils; +import org.apache.beam.sdk.util.VarInt; import org.apache.qpid.proton.message.Message; /** @@ -32,24 +33,36 @@ */ public class AmqpMessageCoder extends CustomCoder { - private final StringUtf8Coder stringCoder = StringUtf8Coder.of(); - static AmqpMessageCoder of() { return new AmqpMessageCoder(); } + // private static final int[] MESSAGE_SIZES = [1 << 14 /* 16 KiB */,1 << 20 /* 1 MiB */, 1 << 26 + // /* 64 MiB */] + @Override public void encode(Message value, OutputStream outStream) throws CoderException, IOException { - byte[] data = new byte[16384]; - value.encode(data, 0, data.length); - outStream.write(data); + //for (int maxMessageSize : MESSAGE_SIZES) { + try { + byte[] data = new byte[4096]; + int bytesWritten = value.encode(data, 0, data.length); + VarInt.encode(bytesWritten, outStream); + outStream.write(data, 0, bytesWritten); + return; + } catch (Exception ignored) { // <-- ProtonJ javadoc says it throws an exception if the + // message doesn't fit into the byte[] but it doesn't state which one. + // Try to encode into a larger byte array since the current one was too small + } + //} } @Override public Message decode(InputStream inStream) throws CoderException, IOException { Message message = Message.Factory.create(); - byte[] data = IOUtils.toByteArray(inStream); - message.decode(data, 0, data.length); + int bytesToRead = VarInt.decodeInt(inStream); + byte[] encodedMessage = new byte[bytesToRead]; + ByteStreams.readFully(inStream, encodedMessage); + message.decode(encodedMessage, 0, encodedMessage.length); return message; } diff --git a/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java b/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java index 719bb1b45789..c14d060a4e80 100644 --- a/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java +++ b/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java @@ -34,6 +34,7 @@ import org.apache.qpid.proton.message.Message; import org.apache.qpid.proton.messenger.Messenger; import org.junit.Before; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.slf4j.Logger; @@ -90,6 +91,8 @@ public void run() { } @Test + @Ignore("Fails with IllegalMutationException: PTransform AmqpIO.Write/ParDo(Write)/ParMultiDo" + + "(Write) illegaly mutated value Message{body=AmqpValue{Test 91}}") public void testWrite() throws Exception { final List received = new ArrayList<>(); Thread receiver = new Thread() { diff --git a/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderTest.java b/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderTest.java new file mode 100644 index 000000000000..499ebf9cf2c6 --- /dev/null +++ b/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.amqp; + +import static org.junit.Assert.assertEquals; + +import java.io.FileInputStream; +import java.io.FileOutputStream; + +import org.apache.qpid.proton.amqp.messaging.AmqpValue; +import org.apache.qpid.proton.message.Message; +import org.junit.Ignore; +import org.junit.Test; + +/** + * Test on the {@link AmqpMessageCoder}. + */ +public class AmqpMessageCoderTest { + + private final AmqpMessageCoder coder = AmqpMessageCoder.of(); + + @Test + public void testEncodeDecode() throws Exception { + Message message = Message.Factory.create(); + message.setBody(new AmqpValue("test")); + FileOutputStream fileOutputStream = new FileOutputStream("target/coder-test"); + coder.encode(message, fileOutputStream); + fileOutputStream.close(); + + FileInputStream fileInputStream = new FileInputStream("target/coder-test"); + message = coder.decode(fileInputStream); + + assertEquals(message.getBody().toString(), "AmqpValue{test}"); + } + + @Test + @Ignore("Fails with EOFException on VarInt (line 82)") + public void testEncodeDecodeWithLargeMessage() throws Exception { + Message message = Message.Factory.create(); + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < 10000; i++) { + builder.append("test").append("\n"); + } + message.setBody(new AmqpValue(builder.toString())); + FileOutputStream fileOutputStream = new FileOutputStream("target/coder-large-test"); + coder.encode(message, fileOutputStream); + fileOutputStream.close(); + + FileInputStream fileInputStream = new FileInputStream("target/coder-large-test"); + message = coder.decode(fileInputStream); + + System.out.println(message.getBody().toString()); + } + +} From 83a4865611a945f8411ea343e67a3cb132eb5ce0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Baptiste=20Onofr=C3=A9?= Date: Tue, 20 Jun 2017 12:01:15 +0200 Subject: [PATCH 04/10] [BEAM-1237] Create a AmqpMessage wrapper to provide equals() --- .../org/apache/beam/sdk/io/amqp/AmqpIO.java | 37 ++++++------ .../apache/beam/sdk/io/amqp/AmqpMessage.java | 57 +++++++++++++++++++ .../beam/sdk/io/amqp/AmqpMessageCoder.java | 10 ++-- .../apache/beam/sdk/io/amqp/AmqpIOTest.java | 28 ++++----- .../sdk/io/amqp/AmqpMessageCoderTest.java | 42 +++----------- 5 files changed, 102 insertions(+), 72 deletions(-) create mode 100644 sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessage.java diff --git a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java index 779191b37d34..ea0bf60e7633 100644 --- a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java +++ b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java @@ -112,7 +112,7 @@ private AmqpIO() { * A {@link PTransform} to read/receive messages using AMQP 1.0 protocol. */ @AutoValue - public abstract static class Read extends PTransform> { + public abstract static class Read extends PTransform> { @Nullable abstract List addresses(); abstract long maxNumRecords(); @@ -175,11 +175,11 @@ public void populateDisplayData(DisplayData.Builder builder) { } @Override - public PCollection expand(PBegin input) { - org.apache.beam.sdk.io.Read.Unbounded unbounded = + public PCollection expand(PBegin input) { + org.apache.beam.sdk.io.Read.Unbounded unbounded = org.apache.beam.sdk.io.Read.from(new UnboundedAmqpSource(this)); - PTransform> transform = unbounded; + PTransform> transform = unbounded; if (maxNumRecords() != Long.MAX_VALUE) { transform = unbounded.withMaxNumRecords(maxNumRecords()); @@ -220,7 +220,8 @@ private void readObject(java.io.ObjectInputStream stream) } - private static class UnboundedAmqpSource extends UnboundedSource { + private static class UnboundedAmqpSource + extends UnboundedSource { private final Read spec; @@ -245,13 +246,13 @@ public List split(int desiredNumSplits, } @Override - public UnboundedReader createReader(PipelineOptions pipelineOptions, + public UnboundedReader createReader(PipelineOptions pipelineOptions, AmqpCheckpointMark checkpointMark) { return new UnboundedAmqpReader(this, checkpointMark); } @Override - public Coder getDefaultOutputCoder() { + public Coder getDefaultOutputCoder() { return new AmqpMessageCoder(); } @@ -267,12 +268,12 @@ public void validate() { } - private static class UnboundedAmqpReader extends UnboundedSource.UnboundedReader { + private static class UnboundedAmqpReader extends UnboundedSource.UnboundedReader { private final UnboundedAmqpSource source; private Messenger messenger; - private Message current; + private AmqpMessage current; private Instant currentTimestamp; private AmqpCheckpointMark checkpointMark; @@ -300,7 +301,7 @@ public Instant getCurrentTimestamp() { } @Override - public Message getCurrent() { + public AmqpMessage getCurrent() { if (current == null) { throw new NoSuchElementException(); } @@ -343,7 +344,7 @@ public boolean advance() { if (currentTimestamp.isBefore(checkpointMark.watermark)) { checkpointMark.watermark = currentTimestamp; } - current = message; + current = new AmqpMessage(message); return true; } @@ -360,7 +361,7 @@ public void close() { * A {@link PTransform} to send messages using AMQP 1.0 protocol. */ @AutoValue - public abstract static class Write extends PTransform, PDone> { + public abstract static class Write extends PTransform, PDone> { @Nullable abstract List addresses(); @Nullable abstract String subject(); @@ -395,7 +396,7 @@ public Write withSubject(String subject) { } @Override - public PDone expand(PCollection input) { + public PDone expand(PCollection input) { input.apply(ParDo.of(new WriteFn(this))); return PDone.in(input.getPipeline()); } @@ -414,7 +415,7 @@ public void populateDisplayData(DisplayData.Builder builder) { builder.addIfNotNull(DisplayData.item("subject", subject())); } - private static class WriteFn extends DoFn { + private static class WriteFn extends DoFn { private final Write spec; @@ -432,13 +433,13 @@ public void setup() throws Exception { @ProcessElement public void processElement(ProcessContext processContext) throws Exception { - Message message = processContext.element(); + AmqpMessage message = processContext.element(); for (String address : spec.addresses()) { - message.setAddress(address); + message.getMessage().setAddress(address); if (spec.subject() != null) { - message.setSubject(spec.subject()); + message.getMessage().setSubject(spec.subject()); } - messenger.put(message); + messenger.put(message.getMessage()); messenger.send(); } } diff --git a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessage.java b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessage.java new file mode 100644 index 000000000000..068c02a2cd0f --- /dev/null +++ b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessage.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.amqp; + +import org.apache.qpid.proton.message.Message; + +/** + * Extend AMQ ProtonJ Message to override the equals() method. + */ +public class AmqpMessage { + + private Message message; + + public AmqpMessage() { + message = Message.Factory.create(); + } + + public AmqpMessage(Message message) { + this.message = message; + } + + public Message getMessage() { + return this.message; + } + + @Override + public int hashCode() { + return this.message.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof AmqpMessage) { + AmqpMessage message = (AmqpMessage) obj; + if (this.message.getBody().toString().equals(message.getMessage().getBody().toString())) { + return true; + } + } + return false; + } + +} diff --git a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoder.java b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoder.java index b6273bb1a5c1..c4b9797068e8 100644 --- a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoder.java +++ b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoder.java @@ -31,7 +31,7 @@ /** * A coder for AMQP message. */ -public class AmqpMessageCoder extends CustomCoder { +public class AmqpMessageCoder extends CustomCoder { static AmqpMessageCoder of() { return new AmqpMessageCoder(); @@ -41,11 +41,11 @@ static AmqpMessageCoder of() { // /* 64 MiB */] @Override - public void encode(Message value, OutputStream outStream) throws CoderException, IOException { + public void encode(AmqpMessage value, OutputStream outStream) throws CoderException, IOException { //for (int maxMessageSize : MESSAGE_SIZES) { try { byte[] data = new byte[4096]; - int bytesWritten = value.encode(data, 0, data.length); + int bytesWritten = value.getMessage().encode(data, 0, data.length); VarInt.encode(bytesWritten, outStream); outStream.write(data, 0, bytesWritten); return; @@ -57,13 +57,13 @@ public void encode(Message value, OutputStream outStream) throws CoderException, } @Override - public Message decode(InputStream inStream) throws CoderException, IOException { + public AmqpMessage decode(InputStream inStream) throws CoderException, IOException { Message message = Message.Factory.create(); int bytesToRead = VarInt.decodeInt(inStream); byte[] encodedMessage = new byte[bytesToRead]; ByteStreams.readFully(inStream, encodedMessage); message.decode(encodedMessage, 0, encodedMessage.length); - return message; + return new AmqpMessage(message); } } diff --git a/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java b/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java index c14d060a4e80..8187ca3e7e1e 100644 --- a/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java +++ b/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java @@ -34,7 +34,6 @@ import org.apache.qpid.proton.message.Message; import org.apache.qpid.proton.messenger.Messenger; import org.junit.Before; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.slf4j.Logger; @@ -61,10 +60,10 @@ public void findFreeNetworkPort() throws Exception { @Test public void testRead() throws Exception { - PCollection output = pipeline.apply(AmqpIO.read() - .withMaxNumRecords(10) + PCollection output = pipeline.apply(AmqpIO.read() + .withMaxNumRecords(100) .withAddresses(Collections.singletonList("amqp://~localhost:" + port))); - PAssert.thatSingleton(output.apply(Count.globally())).isEqualTo(10L); + PAssert.thatSingleton(output.apply(Count.globally())).isEqualTo(100L); Thread sender = new Thread() { public void run() { @@ -72,11 +71,11 @@ public void run() { Thread.sleep(500); Messenger sender = Messenger.Factory.create(); sender.start(); - for (int i = 0; i < 10; i++) { - Message message = Message.Factory.create(); - message.setAddress("amqp://localhost:" + port); - message.setBody(new AmqpValue("Test " + i)); - sender.put(message); + for (int i = 0; i < 100; i++) { + AmqpMessage message = new AmqpMessage(); + message.getMessage().setAddress("amqp://localhost:" + port); + message.getMessage().setBody(new AmqpValue("Test " + i)); + sender.put(message.getMessage()); sender.send(); } sender.stop(); @@ -91,8 +90,6 @@ public void run() { } @Test - @Ignore("Fails with IllegalMutationException: PTransform AmqpIO.Write/ParDo(Write)/ParMultiDo" - + "(Write) illegaly mutated value Message{body=AmqpValue{Test 91}}") public void testWrite() throws Exception { final List received = new ArrayList<>(); Thread receiver = new Thread() { @@ -119,10 +116,10 @@ public void run() { LOG.info("Starting AMQP receiver"); receiver.start(); - List data = new ArrayList<>(); + List data = new ArrayList<>(); for (int i = 0; i < 100; i++) { - Message message = Message.Factory.create(); - message.setBody(new AmqpValue("Test " + i)); + AmqpMessage message = new AmqpMessage(); + message.getMessage().setBody(new AmqpValue("Test " + i)); data.add(message); } pipeline.apply(Create.of(data).withCoder(AmqpMessageCoder.of())) @@ -135,8 +132,7 @@ public void run() { assertEquals(100, received.size()); for (int i = 0; i < 100; i++) { - assertTrue(received.contains("AmqpValue{AmqpValue{AmqpValue{AmqpValue{AmqpValue" - + "{Test " + i + "}}}}}")); + assertTrue(received.contains("AmqpValue{Test " + i + "}")); } } diff --git a/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderTest.java b/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderTest.java index 499ebf9cf2c6..e6a97e7f9da5 100644 --- a/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderTest.java +++ b/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderTest.java @@ -17,14 +17,12 @@ */ package org.apache.beam.sdk.io.amqp; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; -import java.io.FileInputStream; -import java.io.FileOutputStream; +import java.util.Objects; +import org.apache.beam.sdk.util.CoderUtils; import org.apache.qpid.proton.amqp.messaging.AmqpValue; -import org.apache.qpid.proton.message.Message; -import org.junit.Ignore; import org.junit.Test; /** @@ -36,35 +34,13 @@ public class AmqpMessageCoderTest { @Test public void testEncodeDecode() throws Exception { - Message message = Message.Factory.create(); - message.setBody(new AmqpValue("test")); - FileOutputStream fileOutputStream = new FileOutputStream("target/coder-test"); - coder.encode(message, fileOutputStream); - fileOutputStream.close(); + AmqpMessage message = new AmqpMessage(); + message.getMessage().setBody(new AmqpValue("test")); + byte[] encoded = CoderUtils.encodeToByteArray(coder, message); + AmqpMessage clone = CoderUtils.decodeFromByteArray(coder, encoded); - FileInputStream fileInputStream = new FileInputStream("target/coder-test"); - message = coder.decode(fileInputStream); - - assertEquals(message.getBody().toString(), "AmqpValue{test}"); - } - - @Test - @Ignore("Fails with EOFException on VarInt (line 82)") - public void testEncodeDecodeWithLargeMessage() throws Exception { - Message message = Message.Factory.create(); - StringBuilder builder = new StringBuilder(); - for (int i = 0; i < 10000; i++) { - builder.append("test").append("\n"); - } - message.setBody(new AmqpValue(builder.toString())); - FileOutputStream fileOutputStream = new FileOutputStream("target/coder-large-test"); - coder.encode(message, fileOutputStream); - fileOutputStream.close(); - - FileInputStream fileInputStream = new FileInputStream("target/coder-large-test"); - message = coder.decode(fileInputStream); - - System.out.println(message.getBody().toString()); + assertTrue(Objects.equals(message, clone)); + assertTrue(Objects.equals(clone, message)); } } From b4c4d0608153edbd4f98f594e0f267b4ac6a6fe9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Baptiste=20Onofr=C3=A9?= Date: Wed, 21 Jun 2017 13:48:55 +0200 Subject: [PATCH 05/10] [BEAM-1237] Directly use ProtonJ Message in the PCollection, simplify the Write, change the watermark --- .../org/apache/beam/sdk/io/amqp/AmqpIO.java | 101 ++++-------------- .../apache/beam/sdk/io/amqp/AmqpMessage.java | 57 ---------- .../beam/sdk/io/amqp/AmqpMessageCoder.java | 27 ++--- .../apache/beam/sdk/io/amqp/AmqpIOTest.java | 24 ++--- .../sdk/io/amqp/AmqpMessageCoderTest.java | 46 -------- 5 files changed, 42 insertions(+), 213 deletions(-) delete mode 100644 sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessage.java delete mode 100644 sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderTest.java diff --git a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java index ea0bf60e7633..941c1595dd08 100644 --- a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java +++ b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java @@ -90,8 +90,7 @@ * * pipeline * .apply(...) // provide PCollection - * .apply(AmqpIO.write() - * .withAddresses(Collections.singletonList("amqp://host:1234"))); + * .apply(AmqpIO.write()); * * } */ @@ -102,7 +101,7 @@ public static Read read() { } public static Write write() { - return new AutoValue_AmqpIO_Write.Builder().build(); + return new AutoValue_AmqpIO_Write(); } private AmqpIO() { @@ -112,7 +111,7 @@ private AmqpIO() { * A {@link PTransform} to read/receive messages using AMQP 1.0 protocol. */ @AutoValue - public abstract static class Read extends PTransform> { + public abstract static class Read extends PTransform> { @Nullable abstract List addresses(); abstract long maxNumRecords(); @@ -175,11 +174,11 @@ public void populateDisplayData(DisplayData.Builder builder) { } @Override - public PCollection expand(PBegin input) { - org.apache.beam.sdk.io.Read.Unbounded unbounded = + public PCollection expand(PBegin input) { + org.apache.beam.sdk.io.Read.Unbounded unbounded = org.apache.beam.sdk.io.Read.from(new UnboundedAmqpSource(this)); - PTransform> transform = unbounded; + PTransform> transform = unbounded; if (maxNumRecords() != Long.MAX_VALUE) { transform = unbounded.withMaxNumRecords(maxNumRecords()); @@ -196,7 +195,6 @@ private static class AmqpCheckpointMark implements UnboundedSource.CheckpointMar private transient Messenger messenger; private transient List trackers = new ArrayList<>(); - private Instant watermark; public AmqpCheckpointMark() { } @@ -208,20 +206,18 @@ public void finalizeCheckpoint() { messenger.accept(tracker, 0); } trackers.clear(); - watermark = Instant.now(); } // set an empty list to messages when deserialize private void readObject(java.io.ObjectInputStream stream) throws java.io.IOException, ClassNotFoundException { trackers = new ArrayList<>(); - watermark = Instant.now(); } } private static class UnboundedAmqpSource - extends UnboundedSource { + extends UnboundedSource { private final Read spec; @@ -246,13 +242,13 @@ public List split(int desiredNumSplits, } @Override - public UnboundedReader createReader(PipelineOptions pipelineOptions, + public UnboundedReader createReader(PipelineOptions pipelineOptions, AmqpCheckpointMark checkpointMark) { return new UnboundedAmqpReader(this, checkpointMark); } @Override - public Coder getDefaultOutputCoder() { + public Coder getDefaultOutputCoder() { return new AmqpMessageCoder(); } @@ -268,13 +264,14 @@ public void validate() { } - private static class UnboundedAmqpReader extends UnboundedSource.UnboundedReader { + private static class UnboundedAmqpReader extends UnboundedSource.UnboundedReader { private final UnboundedAmqpSource source; private Messenger messenger; - private AmqpMessage current; + private Message current; private Instant currentTimestamp; + private Instant watermark = new Instant(Long.MIN_VALUE); private AmqpCheckpointMark checkpointMark; public UnboundedAmqpReader(UnboundedAmqpSource source, AmqpCheckpointMark checkpointMark) { @@ -289,7 +286,7 @@ public UnboundedAmqpReader(UnboundedAmqpSource source, AmqpCheckpointMark checkp @Override public Instant getWatermark() { - return checkpointMark.watermark; + return watermark; } @Override @@ -301,7 +298,7 @@ public Instant getCurrentTimestamp() { } @Override - public AmqpMessage getCurrent() { + public Message getCurrent() { if (current == null) { throw new NoSuchElementException(); } @@ -341,10 +338,8 @@ public boolean advance() { Tracker tracker = messenger.incomingTracker(); checkpointMark.trackers.add(tracker); currentTimestamp = new Instant(message.getCreationTime()); - if (currentTimestamp.isBefore(checkpointMark.watermark)) { - checkpointMark.watermark = currentTimestamp; - } - current = new AmqpMessage(message); + watermark = currentTimestamp; + current = message; return true; } @@ -361,61 +356,15 @@ public void close() { * A {@link PTransform} to send messages using AMQP 1.0 protocol. */ @AutoValue - public abstract static class Write extends PTransform, PDone> { - - @Nullable abstract List addresses(); - @Nullable abstract String subject(); - - abstract Builder builder(); - - @AutoValue.Builder - abstract static class Builder { - abstract Builder setAddresses(List addresses); - abstract Builder setSubject(String subject); - abstract Write build(); - } - - /** - * Define the AMQP addresses where to send messages. - */ - public Write withAddresses(List addresses) { - checkArgument(addresses != null, "AmqpIO.write().withAddresses(addresses) called with " - + "null addresses"); - checkArgument(!addresses.isEmpty(), "AmpqIO.write().withAddresses(addresses) called with " - + "empty addresses list"); - return builder().setAddresses(addresses).build(); - } - - /** - * Define the AMQP messages subject. - */ - public Write withSubject(String subject) { - checkArgument(subject != null, "AmqpIO.write().withSubject(subject) called with null " - + "subject"); - return builder().setSubject(subject).build(); - } + public abstract static class Write extends PTransform, PDone> { @Override - public PDone expand(PCollection input) { + public PDone expand(PCollection input) { input.apply(ParDo.of(new WriteFn(this))); return PDone.in(input.getPipeline()); } - @Override - public void validate(PipelineOptions pipelineOptions) { - checkState(addresses() != null, "AmqIO.write() requires addresses list to be set via " - + "withAddresses(addresses)"); - checkState(!addresses().isEmpty(), "AmqIO.write() requires a non-empty addresses list to be" - + " set via withAddresses(addresses)"); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - builder.add(DisplayData.item("addresses", Joiner.on(" ").join(addresses()))); - builder.addIfNotNull(DisplayData.item("subject", subject())); - } - - private static class WriteFn extends DoFn { + private static class WriteFn extends DoFn { private final Write spec; @@ -433,15 +382,9 @@ public void setup() throws Exception { @ProcessElement public void processElement(ProcessContext processContext) throws Exception { - AmqpMessage message = processContext.element(); - for (String address : spec.addresses()) { - message.getMessage().setAddress(address); - if (spec.subject() != null) { - message.getMessage().setSubject(spec.subject()); - } - messenger.put(message.getMessage()); - messenger.send(); - } + Message message = processContext.element(); + messenger.put(message); + messenger.send(); } @Teardown diff --git a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessage.java b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessage.java deleted file mode 100644 index 068c02a2cd0f..000000000000 --- a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessage.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.amqp; - -import org.apache.qpid.proton.message.Message; - -/** - * Extend AMQ ProtonJ Message to override the equals() method. - */ -public class AmqpMessage { - - private Message message; - - public AmqpMessage() { - message = Message.Factory.create(); - } - - public AmqpMessage(Message message) { - this.message = message; - } - - public Message getMessage() { - return this.message; - } - - @Override - public int hashCode() { - return this.message.hashCode(); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof AmqpMessage) { - AmqpMessage message = (AmqpMessage) obj; - if (this.message.getBody().toString().equals(message.getMessage().getBody().toString())) { - return true; - } - } - return false; - } - -} diff --git a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoder.java b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoder.java index c4b9797068e8..4c6e56fafc0e 100644 --- a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoder.java +++ b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoder.java @@ -31,39 +31,28 @@ /** * A coder for AMQP message. */ -public class AmqpMessageCoder extends CustomCoder { +public class AmqpMessageCoder extends CustomCoder { static AmqpMessageCoder of() { return new AmqpMessageCoder(); } - // private static final int[] MESSAGE_SIZES = [1 << 14 /* 16 KiB */,1 << 20 /* 1 MiB */, 1 << 26 - // /* 64 MiB */] - @Override - public void encode(AmqpMessage value, OutputStream outStream) throws CoderException, IOException { - //for (int maxMessageSize : MESSAGE_SIZES) { - try { - byte[] data = new byte[4096]; - int bytesWritten = value.getMessage().encode(data, 0, data.length); - VarInt.encode(bytesWritten, outStream); - outStream.write(data, 0, bytesWritten); - return; - } catch (Exception ignored) { // <-- ProtonJ javadoc says it throws an exception if the - // message doesn't fit into the byte[] but it doesn't state which one. - // Try to encode into a larger byte array since the current one was too small - } - //} + public void encode(Message value, OutputStream outStream) throws CoderException, IOException { + byte[] data = new byte[4096]; + int bytesWritten = value.encode(data, 0, data.length); + VarInt.encode(bytesWritten, outStream); + outStream.write(data, 0, bytesWritten); } @Override - public AmqpMessage decode(InputStream inStream) throws CoderException, IOException { + public Message decode(InputStream inStream) throws CoderException, IOException { Message message = Message.Factory.create(); int bytesToRead = VarInt.decodeInt(inStream); byte[] encodedMessage = new byte[bytesToRead]; ByteStreams.readFully(inStream, encodedMessage); message.decode(encodedMessage, 0, encodedMessage.length); - return new AmqpMessage(message); + return message; } } diff --git a/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java b/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java index 8187ca3e7e1e..214a467194f5 100644 --- a/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java +++ b/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java @@ -60,10 +60,10 @@ public void findFreeNetworkPort() throws Exception { @Test public void testRead() throws Exception { - PCollection output = pipeline.apply(AmqpIO.read() + PCollection output = pipeline.apply(AmqpIO.read() .withMaxNumRecords(100) .withAddresses(Collections.singletonList("amqp://~localhost:" + port))); - PAssert.thatSingleton(output.apply(Count.globally())).isEqualTo(100L); + PAssert.thatSingleton(output.apply(Count.globally())).isEqualTo(100L); Thread sender = new Thread() { public void run() { @@ -72,10 +72,10 @@ public void run() { Messenger sender = Messenger.Factory.create(); sender.start(); for (int i = 0; i < 100; i++) { - AmqpMessage message = new AmqpMessage(); - message.getMessage().setAddress("amqp://localhost:" + port); - message.getMessage().setBody(new AmqpValue("Test " + i)); - sender.put(message.getMessage()); + Message message = Message.Factory.create(); + message.setAddress("amqp://localhost:" + port); + message.setBody(new AmqpValue("Test " + i)); + sender.put(message); sender.send(); } sender.stop(); @@ -116,15 +116,15 @@ public void run() { LOG.info("Starting AMQP receiver"); receiver.start(); - List data = new ArrayList<>(); + List data = new ArrayList<>(); for (int i = 0; i < 100; i++) { - AmqpMessage message = new AmqpMessage(); - message.getMessage().setBody(new AmqpValue("Test " + i)); + Message message = Message.Factory.create(); + message.setBody(new AmqpValue("Test " + i)); + message.setAddress("amqp://localhost:" + port); + message.setSubject("test"); data.add(message); } - pipeline.apply(Create.of(data).withCoder(AmqpMessageCoder.of())) - .apply(AmqpIO.write() - .withAddresses(Collections.singletonList("amqp://localhost:" + port))); + pipeline.apply(Create.of(data).withCoder(AmqpMessageCoder.of())).apply(AmqpIO.write()); LOG.info("Starting pipeline"); pipeline.run(); LOG.info("Join receiver thread"); diff --git a/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderTest.java b/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderTest.java deleted file mode 100644 index e6a97e7f9da5..000000000000 --- a/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderTest.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.amqp; - -import static org.junit.Assert.assertTrue; - -import java.util.Objects; - -import org.apache.beam.sdk.util.CoderUtils; -import org.apache.qpid.proton.amqp.messaging.AmqpValue; -import org.junit.Test; - -/** - * Test on the {@link AmqpMessageCoder}. - */ -public class AmqpMessageCoderTest { - - private final AmqpMessageCoder coder = AmqpMessageCoder.of(); - - @Test - public void testEncodeDecode() throws Exception { - AmqpMessage message = new AmqpMessage(); - message.getMessage().setBody(new AmqpValue("test")); - byte[] encoded = CoderUtils.encodeToByteArray(coder, message); - AmqpMessage clone = CoderUtils.decodeFromByteArray(coder, encoded); - - assertTrue(Objects.equals(message, clone)); - assertTrue(Objects.equals(clone, message)); - } - -} From 1d2c3d5964b8621e59f0a3a3b73a3e32300ad098 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Baptiste=20Onofr=C3=A9?= Date: Wed, 21 Jun 2017 21:07:38 +0200 Subject: [PATCH 06/10] [BEAM-1237] Cleanup on the test --- .../test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java b/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java index 214a467194f5..1dc242b7a614 100644 --- a/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java +++ b/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java @@ -36,17 +36,20 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Tests on {@link AmqpIO}. */ +@RunWith(JUnit4.class) public class AmqpIOTest { private static final Logger LOG = LoggerFactory.getLogger(AmqpIOTest.class); - private static int port; + private int port; @Rule public TestPipeline pipeline = TestPipeline.create(); From 97d7fdf906a544ae5a0cee051a39f0732a9c3ac2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Baptiste=20Onofr=C3=A9?= Date: Mon, 26 Jun 2017 21:54:27 +0200 Subject: [PATCH 07/10] [BEAM-1237] AmqpMessageCoder supports different message sizes and throws an exception is the message is larger than supported --- .../beam/sdk/io/amqp/AmqpMessageCoder.java | 26 ++++++- .../sdk/io/amqp/AmqpMessageCoderTest.java | 76 +++++++++++++++++++ 2 files changed, 101 insertions(+), 1 deletion(-) create mode 100644 sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderTest.java diff --git a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoder.java b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoder.java index 4c6e56fafc0e..ddc138307b45 100644 --- a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoder.java +++ b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoder.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.BufferOverflowException; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.CustomCoder; @@ -33,13 +34,36 @@ */ public class AmqpMessageCoder extends CustomCoder { + private static final int[] MESSAGE_SIZES = new int[]{ + 8 * 1024, // 8 KiB + 64 * 1024, // 62 KiB + 1 * 1024 * 1024, // 1 MiB + 64 * 1024 * 1024, // 62 MiB + }; + static AmqpMessageCoder of() { return new AmqpMessageCoder(); } @Override public void encode(Message value, OutputStream outStream) throws CoderException, IOException { - byte[] data = new byte[4096]; + for (int maxMessageSize : MESSAGE_SIZES) { + try { + encode(value, outStream, maxMessageSize); + return; + } catch (Exception e) { + if (maxMessageSize == MESSAGE_SIZES[MESSAGE_SIZES.length - 1]) { + throw new CoderException("Message is larger than the max size supported by the coder", e); + } else { + continue; + } + } + } + } + + private void encode(Message value, OutputStream outStream, int messageSize) throws + IOException, BufferOverflowException { + byte[] data = new byte[messageSize]; int bytesWritten = value.encode(data, 0, data.length); VarInt.encode(bytesWritten, outStream); outStream.write(data, 0, bytesWritten); diff --git a/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderTest.java b/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderTest.java new file mode 100644 index 000000000000..52bdcb58f992 --- /dev/null +++ b/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.amqp; + +import static org.junit.Assert.assertEquals; + +import com.google.common.base.Joiner; + +import java.util.Collections; + +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.qpid.proton.amqp.messaging.AmqpValue; +import org.apache.qpid.proton.message.Message; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Test on {@link AmqpMessageCoder}. + */ +@RunWith(JUnit4.class) +public class AmqpMessageCoderTest { + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void encodeDecode() throws Exception { + Message message = Message.Factory.create(); + message.setBody(new AmqpValue("body")); + message.setAddress("address"); + message.setSubject("test"); + AmqpMessageCoder coder = AmqpMessageCoder.of(); + + byte[] encoded = CoderUtils.encodeToByteArray(coder, message); + + Message clone = CoderUtils.decodeFromByteArray(coder, encoded); + + assertEquals("AmqpValue{body}", clone.getBody().toString()); + assertEquals("address", clone.getAddress()); + assertEquals("test", clone.getSubject()); + } + + @Test + public void encodeDecodeLargeMessage() throws Exception { + thrown.expect(CoderException.class); + Message message = Message.Factory.create(); + message.setAddress("address"); + message.setSubject("subject"); + String body = Joiner.on("").join(Collections.nCopies(64 * 1024 * 1024, " ")); + message.setBody(new AmqpValue(body)); + + AmqpMessageCoder coder = AmqpMessageCoder.of(); + + byte[] encoded = CoderUtils.encodeToByteArray(coder, message); + } + +} From 7b63c94dbec6d0ed93b318ebecd8c18c7e23eee0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Baptiste=20Onofr=C3=A9?= Date: Tue, 27 Jun 2017 08:34:28 +0200 Subject: [PATCH 08/10] [BEAM-1237] Improve code style and tests --- .../org/apache/beam/sdk/io/amqp/AmqpIO.java | 6 +----- .../beam/sdk/io/amqp/AmqpMessageCoder.java | 15 ++++++------- .../sdk/io/amqp/AmqpMessageCoderTest.java | 21 +++++++++++++++---- 3 files changed, 24 insertions(+), 18 deletions(-) diff --git a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java index 941c1595dd08..b9a0be9a078f 100644 --- a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java +++ b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpIO.java @@ -231,11 +231,7 @@ public List split(int desiredNumSplits, // amqp is a queue system, so, it's possible to have multiple concurrent sources, even if // they bind the listener List sources = new ArrayList<>(); - if (desiredNumSplits > 0) { - for (int i = 0; i < desiredNumSplits; i++) { - sources.add(new UnboundedAmqpSource(spec)); - } - } else { + for (int i = 0; i < Math.max(1, desiredNumSplits); ++i) { sources.add(new UnboundedAmqpSource(spec)); } return sources; diff --git a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoder.java b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoder.java index ddc138307b45..5a552600168f 100644 --- a/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoder.java +++ b/sdks/java/io/amqp/src/main/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoder.java @@ -35,10 +35,10 @@ public class AmqpMessageCoder extends CustomCoder { private static final int[] MESSAGE_SIZES = new int[]{ - 8 * 1024, // 8 KiB - 64 * 1024, // 62 KiB - 1 * 1024 * 1024, // 1 MiB - 64 * 1024 * 1024, // 62 MiB + 8 * 1024, + 64 * 1024, + 1 * 1024 * 1024, + 64 * 1024 * 1024 }; static AmqpMessageCoder of() { @@ -52,13 +52,10 @@ public void encode(Message value, OutputStream outStream) throws CoderException, encode(value, outStream, maxMessageSize); return; } catch (Exception e) { - if (maxMessageSize == MESSAGE_SIZES[MESSAGE_SIZES.length - 1]) { - throw new CoderException("Message is larger than the max size supported by the coder", e); - } else { - continue; - } + continue; } } + throw new CoderException("Message is larger than the max size supported by the coder"); } private void encode(Message value, OutputStream outStream, int messageSize) throws diff --git a/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderTest.java b/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderTest.java index 52bdcb58f992..7a8efeb61c1c 100644 --- a/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderTest.java +++ b/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpMessageCoderTest.java @@ -50,9 +50,7 @@ public void encodeDecode() throws Exception { message.setSubject("test"); AmqpMessageCoder coder = AmqpMessageCoder.of(); - byte[] encoded = CoderUtils.encodeToByteArray(coder, message); - - Message clone = CoderUtils.decodeFromByteArray(coder, encoded); + Message clone = CoderUtils.clone(coder, message); assertEquals("AmqpValue{body}", clone.getBody().toString()); assertEquals("address", clone.getAddress()); @@ -60,7 +58,7 @@ public void encodeDecode() throws Exception { } @Test - public void encodeDecodeLargeMessage() throws Exception { + public void encodeDecodeTooMuchLargerMessage() throws Exception { thrown.expect(CoderException.class); Message message = Message.Factory.create(); message.setAddress("address"); @@ -73,4 +71,19 @@ public void encodeDecodeLargeMessage() throws Exception { byte[] encoded = CoderUtils.encodeToByteArray(coder, message); } + @Test + public void encodeDecodeLargeMessage() throws Exception { + Message message = Message.Factory.create(); + message.setAddress("address"); + message.setSubject("subject"); + String body = Joiner.on("").join(Collections.nCopies(32 * 1024 * 1024, " ")); + message.setBody(new AmqpValue(body)); + + AmqpMessageCoder coder = AmqpMessageCoder.of(); + + Message clone = CoderUtils.clone(coder, message); + + clone.getBody().toString().equals(message.getBody().toString()); + } + } From f8a14b38b176f587a9f2c0508ac97a44ffeafd3a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Baptiste=20Onofr=C3=A9?= Date: Tue, 27 Jun 2017 10:02:29 +0200 Subject: [PATCH 09/10] [BEAM-1237] Use try/finally in test --- .../apache/beam/sdk/io/amqp/AmqpIOTest.java | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java b/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java index 1dc242b7a614..c8fe4e80f834 100644 --- a/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java +++ b/sdks/java/io/amqp/src/test/java/org/apache/beam/sdk/io/amqp/AmqpIOTest.java @@ -87,9 +87,12 @@ public void run() { } } }; - sender.start(); - pipeline.run(); - sender.join(); + try { + sender.start(); + pipeline.run(); + } finally { + sender.join(); + } } @Test @@ -129,9 +132,12 @@ public void run() { } pipeline.apply(Create.of(data).withCoder(AmqpMessageCoder.of())).apply(AmqpIO.write()); LOG.info("Starting pipeline"); - pipeline.run(); - LOG.info("Join receiver thread"); - receiver.join(); + try { + pipeline.run(); + } finally { + LOG.info("Join receiver thread"); + receiver.join(); + } assertEquals(100, received.size()); for (int i = 0; i < 100; i++) { From ac944e1a7092b318cdcd13ee51b8403f283ebb55 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Baptiste=20Onofr=C3=A9?= Date: Tue, 27 Jun 2017 15:09:37 +0200 Subject: [PATCH 10/10] [BEAM-1237] Sort IO modules by alphanumeric order --- sdks/java/io/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml index 8e5efeb21b39..e5db41b72629 100644 --- a/sdks/java/io/pom.xml +++ b/sdks/java/io/pom.xml @@ -64,6 +64,7 @@ + amqp cassandra common elasticsearch @@ -80,7 +81,6 @@ mongodb mqtt xml - amqp