From c05274058a2a7f152e668ea464e257ca9dc5aac0 Mon Sep 17 00:00:00 2001 From: Subhobrata Dey Date: Sat, 15 Oct 2016 18:09:56 -0400 Subject: [PATCH] [FLINK-4837] flink-streaming-akka source connector --- .../flink-connector-akka/pom.xml | 75 ++++++ .../streaming/connectors/akka/AkkaSource.java | 147 +++++++++++ .../connectors/akka/utils/ReceiverActor.java | 122 +++++++++ .../akka/utils/SubscribeReceiver.java | 58 +++++ .../akka/utils/UnsubscribeReceiver.java | 58 +++++ .../connectors/akka/AkkaSourceTest.java | 238 ++++++++++++++++++ .../connectors/akka/utils/FeederActor.java | 100 ++++++++ .../connectors/akka/utils/Message.java | 23 ++ .../src/test/resources/feeder_actor.conf | 33 +++ flink-streaming-connectors/pom.xml | 1 + 10 files changed, 855 insertions(+) create mode 100644 flink-streaming-connectors/flink-connector-akka/pom.xml create mode 100644 flink-streaming-connectors/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/AkkaSource.java create mode 100644 flink-streaming-connectors/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/utils/ReceiverActor.java create mode 100644 flink-streaming-connectors/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/utils/SubscribeReceiver.java create mode 100644 flink-streaming-connectors/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/utils/UnsubscribeReceiver.java create mode 100644 flink-streaming-connectors/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/AkkaSourceTest.java create mode 100644 flink-streaming-connectors/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/utils/FeederActor.java create mode 100644 flink-streaming-connectors/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/utils/Message.java create mode 100644 flink-streaming-connectors/flink-connector-akka/src/test/resources/feeder_actor.conf diff --git a/flink-streaming-connectors/flink-connector-akka/pom.xml b/flink-streaming-connectors/flink-connector-akka/pom.xml new file mode 100644 index 0000000000000..0c9ad10b037b6 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-akka/pom.xml @@ -0,0 +1,75 @@ + + + + + 4.0.0 + + + org.apache.flink + flink-streaming-connectors + 1.2-SNAPSHOT + .. + + + flink-connector-akka_2.10 + flink-connector-akka + + jar + + + + + + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${project.version} + + + com.typesafe.akka + akka-actor_${scala.binary.version} + ${akka.version} + + + com.typesafe.akka + akka-remote_${scala.binary.version} + ${akka.version} + + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + + + + + + diff --git a/flink-streaming-connectors/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/AkkaSource.java b/flink-streaming-connectors/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/AkkaSource.java new file mode 100644 index 0000000000000..1d68c914535b2 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/AkkaSource.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.akka; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.PoisonPill; +import akka.actor.Props; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.StoppableFunction; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.akka.utils.ReceiverActor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation of {@link SourceFunction} specialized to read messages + * from Akka actors. + */ +public class AkkaSource extends RichSourceFunction + implements StoppableFunction { + + private static final Logger LOG = LoggerFactory.getLogger(AkkaSource.class); + + private static final long serialVersionUID = 1L; + + // --- Fields set by the constructor + + private final Class classForActor; + + private final String actorName; + + private final String urlOfPublisher; + + // --- Runtime fields + private transient ActorSystem receiverActorSystem; + private transient ActorRef receiverActor; + private transient Object waitLock; + private transient boolean running = true; + + protected transient boolean autoAck; + + /** + * Creates {@link AkkaSource} for Streaming + * + * @param actorName Receiver Actor name + * @param urlOfPublisher tcp url of the publisher or feeder actor + */ + public AkkaSource(String actorName, + String urlOfPublisher) { + super(); + this.classForActor = ReceiverActor.class; + this.actorName = actorName; + this.urlOfPublisher = urlOfPublisher; + } + + @Override + public void open(Configuration parameters) throws Exception { + waitLock = new Object(); + receiverActorSystem = createDefaultActorSystem(); + + RuntimeContext runtimeContext = getRuntimeContext(); + if (runtimeContext instanceof StreamingRuntimeContext + && ((StreamingRuntimeContext) runtimeContext).isCheckpointingEnabled()) { + autoAck = false; + } else { + autoAck = true; + } + } + + @Override + public void run(SourceContext ctx) throws Exception { + LOG.info("Starting the Receiver actor {}", actorName); + receiverActor = receiverActorSystem.actorOf( + Props.create(classForActor, ctx, urlOfPublisher, autoAck), actorName); + + running = true; + LOG.info("Started the Receiver actor {} successfully", actorName); + + while (running) { + synchronized (waitLock) { + waitLock.wait(100L); + } + } + } + + @Override + public void close() { + this.running = false; + LOG.info("Closing source"); + if (receiverActorSystem != null) { + receiverActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); + receiverActorSystem.shutdown(); + receiverActorSystem.awaitTermination(); + } + synchronized (waitLock) { + waitLock.notify(); + } + } + + @Override + public void cancel() { + LOG.info("Cancelling akka source"); + close(); + } + + @Override + public void stop() { + LOG.info("Stopping akka source"); + close(); + } + + /** + * Creates an actor system with default configurations for Receiver actor. + * + * @return Actor System instance with default configurations + */ + private ActorSystem createDefaultActorSystem() { + String defaultActorSystemName = "receiver-actor-system"; + + String configString = "akka.actor.provider = \"akka.remote.RemoteActorRefProvider\"\n" + + "akka.remote.enabled-transports = [\"akka.remote.netty.tcp\"]"; + Config defaultConfig = ConfigFactory.parseString(configString); + + return ActorSystem.create(defaultActorSystemName, defaultConfig); + } +} diff --git a/flink-streaming-connectors/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/utils/ReceiverActor.java b/flink-streaming-connectors/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/utils/ReceiverActor.java new file mode 100644 index 0000000000000..a756662f66464 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/utils/ReceiverActor.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.akka.utils; + +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import akka.actor.UntypedActor; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; + +import java.util.Iterator; + +/** + * Generalized receiver actor which receives messages + * from the feeder or publisher actor. + */ +public class ReceiverActor extends UntypedActor { + // --- Fields set by the constructor + private final SourceContext ctx; + + private final String urlOfPublisher; + + private final boolean autoAck; + + // --- Runtime fields + private ActorSelection remotePublisher; + + public ReceiverActor(SourceContext ctx, + String urlOfPublisher, + boolean autoAck) { + this.ctx = ctx; + this.urlOfPublisher = urlOfPublisher; + this.autoAck = autoAck; + } + + @Override + public void preStart() throws Exception { + remotePublisher = getContext().actorSelection(urlOfPublisher); + remotePublisher.tell(new SubscribeReceiver(getSelf()), getSelf()); + } + + @SuppressWarnings("unchecked") + @Override + public void onReceive(Object message) + throws Exception { + if (message instanceof Iterable) { + collect((Iterable) message); + } else if (message instanceof byte[]) { + byte[] messageBytes = (byte[]) message; + collect(messageBytes); + } else if (message instanceof Tuple2) { + Tuple2 messageTuple = (Tuple2) message; + collect(messageTuple.f0, messageTuple.f1); + } else { + collect(message); + } + + if (autoAck) { + getSender().tell("ack", getSelf()); + } + } + + /** + * To handle {@link Iterable} data + * + * @param data data received from feeder actor + */ + private void collect(Iterable data) { + Iterator iterator = data.iterator(); + while (iterator.hasNext()) { + ctx.collect(iterator.next()); + } + } + + /** + * To handle byte array data + * + * @param bytes data received from feeder actor + */ + private void collect(byte[] bytes) { + ctx.collect(bytes); + } + + /** + * To handle single data + * @param data data received from feeder actor + */ + private void collect(Object data) { + ctx.collect(data); + } + + /** + * To handle data with timestamp + * + * @param data data received from feeder actor + * @param timestamp timestamp received from feeder actor + */ + private void collect(Object data, long timestamp) { + ctx.collectWithTimestamp(data, timestamp); + } + + @Override + public void postStop() throws Exception { + remotePublisher.tell(new UnsubscribeReceiver(ActorRef.noSender()), + ActorRef.noSender()); + } +} diff --git a/flink-streaming-connectors/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/utils/SubscribeReceiver.java b/flink-streaming-connectors/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/utils/SubscribeReceiver.java new file mode 100644 index 0000000000000..57827eecf299a --- /dev/null +++ b/flink-streaming-connectors/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/utils/SubscribeReceiver.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.akka.utils; + +import akka.actor.ActorRef; + +import java.io.Serializable; + +/** + * General interface used by Receiver Actor to subscribe + * to the publisher. + */ +public class SubscribeReceiver implements Serializable { + private static final long serialVersionUID = 1L; + private ActorRef receiverActor; + + public SubscribeReceiver(ActorRef receiverActor) { + this.receiverActor = receiverActor; + } + + public void setReceiverActor(ActorRef receiverActor) { + this.receiverActor = receiverActor; + } + + public ActorRef getReceiverActor() { + return receiverActor; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof SubscribeReceiver) { + SubscribeReceiver other = (SubscribeReceiver) obj; + return other.canEquals(this) && super.equals(other) + && receiverActor.equals(other.getReceiverActor()); + } else { + return false; + } + } + + public boolean canEquals(Object obj) { + return obj instanceof SubscribeReceiver; + } +} diff --git a/flink-streaming-connectors/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/utils/UnsubscribeReceiver.java b/flink-streaming-connectors/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/utils/UnsubscribeReceiver.java new file mode 100644 index 0000000000000..e3e04857b22bb --- /dev/null +++ b/flink-streaming-connectors/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/utils/UnsubscribeReceiver.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.akka.utils; + +import akka.actor.ActorRef; + +import java.io.Serializable; + +/** + * General interface used by Receiver Actor to un subscribe. + */ +public class UnsubscribeReceiver implements Serializable { + private static final long serialVersionUID = 1L; + private ActorRef receiverActor; + + public UnsubscribeReceiver(ActorRef receiverActor) { + this.receiverActor = receiverActor; + } + + public void setReceiverActor(ActorRef receiverActor) { + this.receiverActor = receiverActor; + } + + public ActorRef getReceiverActor() { + return receiverActor; + } + + + @Override + public boolean equals(Object obj) { + if (obj instanceof UnsubscribeReceiver) { + UnsubscribeReceiver other = (UnsubscribeReceiver) obj; + return other.canEquals(this) && super.equals(other) + && receiverActor.equals(other.getReceiverActor()); + } else { + return false; + } + } + + public boolean canEquals(Object obj) { + return obj instanceof UnsubscribeReceiver; + } +} diff --git a/flink-streaming-connectors/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/AkkaSourceTest.java b/flink-streaming-connectors/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/AkkaSourceTest.java new file mode 100644 index 0000000000000..2bcbfa850c0dc --- /dev/null +++ b/flink-streaming-connectors/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/AkkaSourceTest.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.akka; + +import akka.actor.ActorSystem; +import akka.actor.Props; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.connectors.akka.utils.FeederActor; +import org.apache.flink.streaming.connectors.akka.utils.Message; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; + +@RunWith(PowerMockRunner.class) +public class AkkaSourceTest { + private AkkaSource source; + + private static final String feederActorName = "JavaFeederActor"; + private static final String receiverActorName = "receiverActor"; + private static final String urlOfFeeder = + "akka.tcp://feederActorSystem@127.0.0.1:5150/user/" + feederActorName; + private ActorSystem feederActorSystem; + + private Configuration config = new Configuration(); + + private Thread sourceThread; + + private SourceFunction.SourceContext sourceContext; + + private volatile Exception exception; + + @Before + public void beforeTest() throws Exception { + feederActorSystem = ActorSystem.create("feederActorSystem", + getFeederActorConfig()); + + source = new AkkaTestSource(); + source.open(config); + + sourceContext = new DummySourceContext(); + + sourceThread = new Thread(new Runnable() { + @Override + public void run() { + try { + SourceFunction.SourceContext sourceContext = + new DummySourceContext(); + source.run(sourceContext); + } catch (Exception e) { + exception = e; + } + } + }); + } + + @After + public void afterTest() throws Exception { + feederActorSystem.shutdown(); + feederActorSystem.awaitTermination(); + + source.cancel(); + sourceThread.join(); + } + + @Test + public void testWithSingleData() throws Exception { + feederActorSystem.actorOf( + Props.create(FeederActor.class, FeederActor.MessageTypes.SINGLE_DATA), + feederActorName); + + source.autoAck = false; + sourceThread.start(); + + while (DummySourceContext.numElementsCollected != 1) { + Thread.sleep(5); + } + List message = DummySourceContext.message; + Assert.assertEquals(message.get(0).toString(), Message.WELCOME_MESSAGE); + } + + @Test + public void testWithIterableData() throws Exception { + feederActorSystem.actorOf( + Props.create(FeederActor.class, FeederActor.MessageTypes.ITERABLE_DATA), + feederActorName); + + source.autoAck = false; + sourceThread.start(); + + while (DummySourceContext.numElementsCollected != 2) { + Thread.sleep(5); + } + + List messages = DummySourceContext.message; + Assert.assertEquals(messages.get(0).toString(), Message.WELCOME_MESSAGE); + Assert.assertEquals(messages.get(1).toString(), Message.FEEDER_MESSAGE); + } + + @Test + public void testWithByteArrayData() throws Exception { + feederActorSystem.actorOf( + Props.create(FeederActor.class, FeederActor.MessageTypes.BYTES_DATA), + feederActorName); + + source.autoAck = false; + sourceThread.start(); + + while (DummySourceContext.numElementsCollected != 1) { + Thread.sleep(5); + } + + List message = DummySourceContext.message; + if (message.get(0) instanceof byte[]) { + byte[] data = (byte[]) message.get(0); + Assert.assertEquals(new String(data), Message.WELCOME_MESSAGE); + } + } + + @Test + public void testWithSingleDataWithTimestamp() throws Exception { + feederActorSystem.actorOf( + Props.create(FeederActor.class, FeederActor.MessageTypes.SINGLE_DATA_WITH_TIMESTAMP), + feederActorName); + + source.autoAck = false; + sourceThread.start(); + + while (DummySourceContext.numElementsCollected != 1) { + Thread.sleep(5); + } + + List message = DummySourceContext.message; + Assert.assertEquals(message.get(0).toString(), Message.WELCOME_MESSAGE); + } + + @Test + public void testAcksWithSingleData() throws Exception { + feederActorSystem.actorOf( + Props.create(FeederActor.class, FeederActor.MessageTypes.SINGLE_DATA), + feederActorName); + + source.autoAck = true; + sourceThread.start(); + + while (DummySourceContext.numElementsCollected != 1) { + Thread.sleep(5); + } + // assertion tested in FeederActor + } + + private class AkkaTestSource extends AkkaSource { + + public AkkaTestSource() { + super(receiverActorName, urlOfFeeder); + } + + @Override + public RuntimeContext getRuntimeContext() { + return Mockito.mock(StreamingRuntimeContext.class); + } + } + + private static class DummySourceContext implements SourceFunction.SourceContext { + private static final Object lock = new Object(); + + private static long numElementsCollected; + + private static List message; + + public DummySourceContext() { + numElementsCollected = 0; + message = new ArrayList(); + } + + @Override + public void collect(Object element) { + message.add(element); + numElementsCollected++; + } + + @Override + public void collectWithTimestamp(Object element, long timestamp) { + message.add(element); + numElementsCollected++; + } + + @Override + public void emitWatermark(Watermark mark) { + + } + + @Override + public Object getCheckpointLock() { + return lock; + } + + @Override + public void close() { + + } + } + + private Config getFeederActorConfig() { + String configFile = getClass().getClassLoader() + .getResource("feeder_actor.conf").getFile(); + Config config = ConfigFactory.parseFile(new File(configFile)); + return config; + } +} diff --git a/flink-streaming-connectors/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/utils/FeederActor.java b/flink-streaming-connectors/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/utils/FeederActor.java new file mode 100644 index 0000000000000..10cb1c35ba498 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/utils/FeederActor.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.akka.utils; + +import akka.actor.ActorRef; +import akka.actor.UntypedActor; +import org.apache.flink.api.java.tuple.Tuple2; +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +public class FeederActor extends UntypedActor { + + public enum MessageTypes { + SINGLE_DATA, ITERABLE_DATA, BYTES_DATA, + SINGLE_DATA_WITH_TIMESTAMP + } + + private static final Logger LOG = LoggerFactory.getLogger(FeederActor.class); + + private final MessageTypes messageType; + + public FeederActor(MessageTypes messageType) { + this.messageType = messageType; + } + + @Override + public void onReceive(Object message) { + if (message instanceof SubscribeReceiver) { + ActorRef receiver = ((SubscribeReceiver) message).getReceiverActor(); + + Object data; + switch (messageType) { + case SINGLE_DATA: + data = createSingleDataMessage(); + break; + case ITERABLE_DATA: + data = createIterableOfMessages(); + break; + case BYTES_DATA: + data = createByteMessages(); + break; + case SINGLE_DATA_WITH_TIMESTAMP: + data = createTimestampMessage(); + break; + default: + throw new RuntimeException("Message format specified is incorrect"); + } + receiver.tell(data, getSelf()); + } else if (message instanceof String) { + Assert.assertEquals(message.toString(), "ack"); + } else if (message instanceof UnsubscribeReceiver) { + LOG.info("Stop actor!"); + } + } + + private Object createSingleDataMessage() { + return Message.WELCOME_MESSAGE; + } + + private List createIterableOfMessages() { + List messages = new ArrayList(); + + messages.add(Message.WELCOME_MESSAGE); + messages.add(Message.FEEDER_MESSAGE); + + return messages; + } + + private byte[] createByteMessages() { + byte[] message = Message.WELCOME_MESSAGE.getBytes(); + return message; + } + + private Tuple2 createTimestampMessage() { + Tuple2 message = new Tuple2(); + message.f0 = Message.WELCOME_MESSAGE; + message.f1 = System.currentTimeMillis(); + + return message; + } +} diff --git a/flink-streaming-connectors/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/utils/Message.java b/flink-streaming-connectors/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/utils/Message.java new file mode 100644 index 0000000000000..5a4f139f9552c --- /dev/null +++ b/flink-streaming-connectors/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/utils/Message.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.akka.utils; + +public class Message { + public static final String WELCOME_MESSAGE = "welcome receiver"; + public static final String FEEDER_MESSAGE = "this is feeder"; +} diff --git a/flink-streaming-connectors/flink-connector-akka/src/test/resources/feeder_actor.conf b/flink-streaming-connectors/flink-connector-akka/src/test/resources/feeder_actor.conf new file mode 100644 index 0000000000000..a877aa349d103 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-akka/src/test/resources/feeder_actor.conf @@ -0,0 +1,33 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +akka { + loglevel = "INFO" + actor { + provider = "akka.remote.RemoteActorRefProvider" + } + remote { + enabled-transports = ["akka.remote.netty.tcp"] + netty.tcp { + hostname = 127.0.0.1 + port = 5150 + } + log-sent-messages = on + log-received-messages = on + } +} diff --git a/flink-streaming-connectors/pom.xml b/flink-streaming-connectors/pom.xml index 78e39cadb5151..bc8e8217a2cee 100644 --- a/flink-streaming-connectors/pom.xml +++ b/flink-streaming-connectors/pom.xml @@ -48,6 +48,7 @@ under the License. flink-connector-nifi flink-connector-cassandra flink-connector-redis + flink-connector-akka