From 38c697057f93fe771b003c5dd65328a74c61b4a1 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Tue, 16 Jun 2015 17:31:50 +0800 Subject: [PATCH 01/12] Add "streaming-akka" project This PR includes the following changes: 1. Add "streaming-akka" project and org.apache.spark.streaming.akka.AkkaUtils for creating an actorStream 2. Deprecate "StreamingContext.actorStream" and "JavaStreamingContext.actorStream" 3. Add JavaActorHelper for the Java API user 4. Update the ActorWordCount example and add the JavaActorWordCount example 5. Make "streaming-zeromq" depend on "streaming-akka" and update the codes accordingly --- .../streaming/JavaActorWordCount.java | 182 ++++++++++++ .../examples/streaming/ActorWordCount.scala | 59 ++-- .../examples/streaming/ZeroMQWordCount.scala | 27 +- external/akka/pom.xml | 75 +++++ .../spark/streaming/akka/ActorReceiver.scala | 258 ++++++++++++++++++ .../spark/streaming/akka/AkkaUtils.scala | 147 ++++++++++ .../akka/src/test/resources/log4j.properties | 28 ++ .../streaming/akka/AkkaStreamSuite.scala | 116 ++++++++ external/zeromq/pom.xml | 5 + .../streaming/zeromq/ZeroMQReceiver.scala | 2 +- .../spark/streaming/zeromq/ZeroMQUtils.scala | 22 +- .../zeromq/JavaZeroMQStreamSuite.java | 41 ++- .../streaming/zeromq/ZeroMQStreamSuite.scala | 7 +- pom.xml | 1 + project/SparkBuild.scala | 8 +- .../spark/streaming/StreamingContext.scala | 1 + .../api/java/JavaStreamingContext.scala | 6 + .../streaming/receiver/ActorReceiver.scala | 3 + 18 files changed, 939 insertions(+), 49 deletions(-) create mode 100644 examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java create mode 100644 external/akka/pom.xml create mode 100644 external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala create mode 100644 external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala create mode 100644 external/akka/src/test/resources/log4j.properties create mode 100644 external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaStreamSuite.scala diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java new file mode 100644 index 0000000000000..c241abc30cb68 --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import scala.Tuple2; + +import akka.actor.ActorSelection; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.actor.UntypedActor; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.akka.ActorSystemFactory; +import org.apache.spark.streaming.akka.AkkaUtils; +import org.apache.spark.streaming.akka.JavaActorHelper; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; + +/** + * A sample actor as receiver, is also simplest. This receiver actor + * goes and subscribe to a typical publisher/feeder actor and receives + * data. + * + * @see [[org.apache.spark.examples.streaming.FeederActor]] + */ +class JavaSampleActorReceiver extends UntypedActor { + + private final String urlOfPublisher; + + private final JavaActorHelper helper = new JavaActorHelper(this); + + public JavaSampleActorReceiver(String urlOfPublisher) { + this.urlOfPublisher = urlOfPublisher; + } + + private ActorSelection remotePublisher; + + @Override + public void preStart() { + remotePublisher = getContext().actorSelection(urlOfPublisher); + remotePublisher.tell(new SubscribeReceiver(getSelf()), getSelf()); + } + + public void onReceive(Object msg) throws Exception { + helper.store((T) msg); + } + + @Override + public void postStop() { + remotePublisher.tell(new UnsubscribeReceiver(getSelf()), getSelf()); + } +} + +/** + * A sample word count program demonstrating the use of plugging in + * Actor as Receiver + * Usage: JavaActorWordCount + * and describe the AkkaSystem that Spark Sample feeder is running on. + * + * To run this example locally, you may run Feeder Actor as + *
+ *     $ bin/run-example org.apache.spark.examples.streaming.FeederActor 127.0.0.1 9999
+ * 
+ * and then run the example + *
+ *     $ bin/run-example org.apache.spark.examples.streaming.JavaActorWordCount 127.0.0.1 9999
+ * 
+ */ +public class JavaActorWordCount { + + public static void main(String[] args) { + if (args.length < 2) { + System.err.println("Usage: JavaActorWordCount "); + System.exit(1); + } + + StreamingExamples.setStreamingLogLevels(); + + final String host = args[0]; + final String port = args[1]; + SparkConf sparkConf = new SparkConf().setAppName("JavaActorWordCount"); + // Create the context and set the batch size + JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); + + /* + * Following is the use of AkkaUtils.createStream to plug in custom actor as receiver + * + * An important point to note: + * Since Actor may exist outside the spark framework, It is thus user's responsibility + * to ensure the type safety, i.e type of data received and InputDStream + * should be same. + * + * For example: Both AkkaUtils.createStream and JavaSampleActorReceiver are parameterized + * to same type to ensure type safety. + */ + + ActorSystemFactory actorSystemFactory = new ActorSystemFactory() { + @Override + public ActorSystem create() { + return JavaGlobalActorSystem.getActorSystem(); + } + }; + + String feederActorURI = "akka.tcp://test@" + host + ":" + port + "/user/FeederActor"; + + JavaDStream lines1 = AkkaUtils.createStream( + jssc, + actorSystemFactory, + Props.create(JavaSampleActorReceiver.class, feederActorURI), "SampleReceiver1"); + JavaDStream lines2 = AkkaUtils.createStream( + jssc, + actorSystemFactory, + Props.create(JavaSampleActorReceiver.class, feederActorURI), "SampleReceiver2"); + + JavaDStream lines = lines1.union(lines2); + + // compute wordcount + lines.flatMap(new FlatMapFunction() { + @Override + public Iterable call(String s) { + return Arrays.asList(s.split("\\s+")); + } + }).mapToPair(new PairFunction() { + @Override + public Tuple2 call(String s) { + return new Tuple2(s, 1); + } + }).reduceByKey(new Function2() { + @Override + public Integer call(Integer i1, Integer i2) { + return i1 + i2; + } + }).print(); + + jssc.start(); + jssc.awaitTermination(); + } +} + +/** + * A global `ActorSystem` to avoid creating multiple `ActorSystem`s in an executor. + */ +class JavaGlobalActorSystem { + + private static ActorSystem actorSystem = null; + + public synchronized static ActorSystem getActorSystem() { + if (actorSystem == null) { + Map akkaConf = new HashMap(); + akkaConf.put("akka.actor.provider", "akka.remote.RemoteActorRefProvider"); + akkaConf.put( + "akka.remote.netty.tcp.transport-class", "akka.remote.transport.netty.NettyTransport"); + actorSystem = ActorSystem.create("test", ConfigFactory.parseMap(akkaConf)); + } + return actorSystem; + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala index 016de4c63d1d2..97488e56442f0 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala @@ -21,13 +21,12 @@ import scala.collection.mutable.LinkedList import scala.reflect.ClassTag import scala.util.Random -import akka.actor.{Actor, ActorRef, Props, actorRef2Scala} +import akka.actor._ +import com.typesafe.config.ConfigFactory -import org.apache.spark.{SparkConf, SecurityManager} +import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} -import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions -import org.apache.spark.util.AkkaUtils -import org.apache.spark.streaming.receiver.ActorHelper +import org.apache.spark.streaming.akka.{AkkaUtils, ActorHelper} case class SubscribeReceiver(receiverActor: ActorRef) case class UnsubscribeReceiver(receiverActor: ActorRef) @@ -110,9 +109,13 @@ object FeederActor { } val Seq(host, port) = args.toSeq - val conf = new SparkConf - val actorSystem = AkkaUtils.createActorSystem("test", host, port.toInt, conf = conf, - securityManager = new SecurityManager(conf))._1 + val akkaConf = ConfigFactory.parseString( + s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider" + |akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport" + |akka.remote.netty.tcp.hostname = "$host" + |akka.remote.netty.tcp.port = $port + """.stripMargin) + val actorSystem = ActorSystem("test", akkaConf) val feeder = actorSystem.actorOf(Props[FeederActor], "FeederActor") println("Feeder started as:" + feeder) @@ -128,9 +131,9 @@ object FeederActor { * and describe the AkkaSystem that Spark Sample feeder is running on. * * To run this example locally, you may run Feeder Actor as - * `$ bin/run-example org.apache.spark.examples.streaming.FeederActor 127.0.1.1 9999` + * `$ bin/run-example org.apache.spark.examples.streaming.FeederActor 127.0.0.1 9999` * and then run the example - * `$ bin/run-example org.apache.spark.examples.streaming.ActorWordCount 127.0.1.1 9999` + * `$ bin/run-example org.apache.spark.examples.streaming.ActorWordCount 127.0.0.1 9999` */ object ActorWordCount { def main(args: Array[String]) { @@ -148,21 +151,30 @@ object ActorWordCount { val ssc = new StreamingContext(sparkConf, Seconds(2)) /* - * Following is the use of actorStream to plug in custom actor as receiver + * Following is the use of AkkaUtils.createStream to plug in custom actor as receiver * * An important point to note: * Since Actor may exist outside the spark framework, It is thus user's responsibility - * to ensure the type safety, i.e type of data received and InputDstream + * to ensure the type safety, i.e type of data received and InputDStream * should be same. * - * For example: Both actorStream and SampleActorReceiver are parameterized + * For example: Both AkkaUtils.createStream and SampleActorReceiver are parameterized * to same type to ensure type safety. */ - val lines = ssc.actorStream[String]( - Props(new SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format( - host, port.toInt))), "SampleReceiver") + val lines1 = AkkaUtils.createStream[String]( + ssc, + () => GlobalActorSystem.actorSystem, + Props(new SampleActorReceiver[String](s"akka.tcp://test@$host:$port/user/FeederActor")), + "SampleReceiver1") + val lines2 = AkkaUtils.createStream[String]( + ssc, + () => GlobalActorSystem.actorSystem, + Props(new SampleActorReceiver[String](s"akka.tcp://test@$host:$port/user/FeederActor")), + "SampleReceiver2") + + val lines = lines1.union(lines2) // compute wordcount lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).print() @@ -170,3 +182,18 @@ object ActorWordCount { ssc.awaitTermination() } } + +/** + * A global `ActorSystem` to avoid creating multiple `ActorSystem`s in an executor. + */ +object GlobalActorSystem { + + lazy val actorSystem = { + val akkaConf = ConfigFactory.parseString( + s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider" + |akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport" + """.stripMargin) + ActorSystem("test", akkaConf) + } + +} diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala index e99d1baa72b9f..822e6cce7c0b4 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala @@ -17,18 +17,19 @@ package org.apache.spark.examples.streaming +import scala.language.implicitConversions + import akka.actor.ActorSystem import akka.actor.actorRef2Scala import akka.zeromq._ import akka.zeromq.Subscribe import akka.util.ByteString +import com.typesafe.config.ConfigFactory +import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.zeromq._ -import scala.language.implicitConversions -import org.apache.spark.SparkConf - /** * A simple publisher for demonstration purposes, repeatedly publishes random Messages * every one second. @@ -89,7 +90,9 @@ object ZeroMQWordCount { def bytesToStringIterator(x: Seq[ByteString]): Iterator[String] = x.map(_.utf8String).iterator // For this stream, a zeroMQ publisher should be running. - val lines = ZeroMQUtils.createStream(ssc, url, Subscribe(topic), bytesToStringIterator _) + val lines = ZeroMQUtils.createStream( + ssc, () => ZeroMQGlobalActorSystem.actorSystem, + url, Subscribe(topic), bytesToStringIterator _) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() @@ -97,3 +100,19 @@ object ZeroMQWordCount { ssc.awaitTermination() } } + + +/** + * A global `ActorSystem` to avoid creating multiple `ActorSystem`s in an executor. + */ +object ZeroMQGlobalActorSystem { + + lazy val actorSystem = { + val akkaConf = ConfigFactory.parseString( + s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider" + |akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport" + """.stripMargin) + ActorSystem("test", akkaConf) + } + +} diff --git a/external/akka/pom.xml b/external/akka/pom.xml new file mode 100644 index 0000000000000..16dd2e686ec8a --- /dev/null +++ b/external/akka/pom.xml @@ -0,0 +1,75 @@ + + + + + 4.0.0 + + org.apache.spark + spark-parent_2.10 + 1.5.0-SNAPSHOT + ../../pom.xml + + + org.apache.spark + spark-streaming-akka_2.10 + + streaming-akka + + jar + Spark Project External Akka + http://spark.apache.org/ + + + + org.apache.spark + spark-streaming_${scala.binary.version} + ${project.version} + provided + + + org.apache.spark + spark-core_${scala.binary.version} + ${project.version} + test-jar + test + + + ${akka.group} + akka-actor_${scala.binary.version} + ${akka.version} + + + ${akka.group} + akka-remote_${scala.binary.version} + ${akka.version} + + + + com.typesafe + config + 1.3.0 + test + + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + diff --git a/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala b/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala new file mode 100644 index 0000000000000..c6564c6e85972 --- /dev/null +++ b/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.akka + +import java.nio.ByteBuffer +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.JavaConversions._ +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.reflect.ClassTag + +import akka.actor._ +import akka.actor.SupervisorStrategy.{Escalate, Restart} + +import org.apache.spark.Logging +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.receiver.Receiver + +/** + * :: DeveloperApi :: + * A helper with set of defaults for supervisor strategy + */ +@DeveloperApi +object ActorSupervisorStrategy { + + val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = + 15 millis) { + case _: RuntimeException => Restart + case _: Exception => Escalate + } +} + +/** + * :: DeveloperApi :: + * A receiver trait to be mixed in with your Actor to gain access to + * the API for pushing received data into Spark Streaming for being processed. + * + * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html + * + * @example {{{ + * class MyActor extends Actor with ActorHelper{ + * def receive { + * case anything: String => store(anything) + * } + * } + * + * // Can be used with an actorStream as follows + * ssc.actorStream[String](Props(new MyActor),"MyActorReceiver") + * + * }}} + * + * @note Since Actor may exist outside the spark framework, It is thus user's responsibility + * to ensure the type safety, i.e parametrized type of push block and InputDStream + * should be same. + */ +@DeveloperApi +trait ActorHelper extends Logging{ + + self: Actor => // to ensure that this can be added to Actor classes only + + /** Store an iterator of received data as a data block into Spark's memory. */ + def store[T](iter: Iterator[T]) { + logDebug("Storing iterator") + context.parent ! IteratorData(iter) + } + + /** + * Store the bytes of received data as a data block into Spark's memory. Note + * that the data in the ByteBuffer must be serialized using the same serializer + * that Spark is configured to use. + */ + def store(bytes: ByteBuffer) { + logDebug("Storing Bytes") + context.parent ! ByteBufferData(bytes) + } + + /** + * Store a single item of received data to Spark's memory. + * These single items will be aggregated together into data blocks before + * being pushed into Spark's memory. + */ + def store[T](item: T) { + logDebug("Storing item") + context.parent ! SingleItemData(item) + } +} + +/** + * :: DeveloperApi :: + * A helper class for your Actor to gain access to + * the API for pushing received data into Spark Streaming for being processed. + * + * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html + * + * @example {{{ + * class MyActor extends UntypedActor { + * + * private final JavaActorHelper helper = new JavaActorHelper(this); + * + * public void onReceive(Object msg) throws Exception { + * helper.store(msg); + * } + * + * } + * + * // Can be used with an actorStream as follows + * AkkaUtils.createStream( + * jssc, actorSystemFactory, Props.create(new MyActor()), "MyActorReceiver"); + * + * }}} + * + * @note Since Actor may exist outside the spark framework, It is thus user's responsibility + * to ensure the type safety, i.e parametrized type of push block and InputDStream + * should be same. + */ +@DeveloperApi +class JavaActorHelper(actor: Actor) { + /** Store an iterator of received data as a data block into Spark's memory. */ + def store[T](iter: java.util.Iterator[T]) { + actor.context.parent ! IteratorData(iter) + } + + /** + * Store the bytes of received data as a data block into Spark's memory. Note + * that the data in the ByteBuffer must be serialized using the same serializer + * that Spark is configured to use. + */ + def store(bytes: ByteBuffer) { + actor.context.parent ! ByteBufferData(bytes) + } + + /** + * Store a single item of received data to Spark's memory. + * These single items will be aggregated together into data blocks before + * being pushed into Spark's memory. + */ + def store[T](item: T) { + actor.context.parent ! SingleItemData(item) + } +} + +/** + * :: DeveloperApi :: + * Statistics for querying the supervisor about state of workers. Used in + * conjunction with `StreamingContext.actorStream` and + * [[org.apache.spark.streaming.akka.ActorHelper]]. + */ +@DeveloperApi +case class Statistics(numberOfMsgs: Int, + numberOfWorkers: Int, + numberOfHiccups: Int, + otherInfo: String) + +/** Case class to receive data sent by child actors */ +private[streaming] sealed trait ActorReceiverData +private[streaming] case class SingleItemData[T](item: T) extends ActorReceiverData +private[streaming] case class IteratorData[T](iterator: Iterator[T]) extends ActorReceiverData +private[streaming] case class ByteBufferData(bytes: ByteBuffer) extends ActorReceiverData + +/** + * Provides Actors as receivers for receiving stream. + * + * As Actors can also be used to receive data from almost any stream source. + * A nice set of abstraction(s) for actors as receivers is already provided for + * a few general cases. It is thus exposed as an API where user may come with + * their own Actor to run as receiver for Spark Streaming input source. + * + * This starts a supervisor actor which starts workers and also provides + * [http://doc.akka.io/docs/akka/snapshot/scala/fault-tolerance.html fault-tolerance]. + * + * Here's a way to start more supervisor/workers as its children. + * + * @example {{{ + * context.parent ! Props(new Supervisor) + * }}} OR {{{ + * context.parent ! Props(new Worker, "Worker") + * }}} + */ +private[streaming] class ActorReceiver[T: ClassTag]( + actorSystemCreator: () => ActorSystem, + props: Props, + name: String, + storageLevel: StorageLevel, + receiverSupervisorStrategy: SupervisorStrategy + ) extends Receiver[T](storageLevel) with Logging { + + protected lazy val supervisor = actorSystemCreator().actorOf(Props(new Supervisor), + "Supervisor" + streamId) + + class Supervisor extends Actor { + + override val supervisorStrategy = receiverSupervisorStrategy + private val worker = context.actorOf(props, name) + logInfo("Started receiver worker at:" + worker.path) + + private val n: AtomicInteger = new AtomicInteger(0) + private val hiccups: AtomicInteger = new AtomicInteger(0) + + override def receive: PartialFunction[Any, Unit] = { + + case IteratorData(iterator) => + logDebug("received iterator") + store(iterator.asInstanceOf[Iterator[T]]) + + case SingleItemData(msg) => + logDebug("received single") + store(msg.asInstanceOf[T]) + n.incrementAndGet + + case ByteBufferData(bytes) => + logDebug("received bytes") + store(bytes) + + case props: Props => + val worker = context.actorOf(props) + logInfo("Started receiver worker at:" + worker.path) + sender ! worker + + case (props: Props, name: String) => + val worker = context.actorOf(props, name) + logInfo("Started receiver worker at:" + worker.path) + sender ! worker + + case _: PossiblyHarmful => hiccups.incrementAndGet() + + case _: Statistics => + val workers = context.children + sender ! Statistics(n.get, workers.size, hiccups.get, workers.mkString("\n")) + + } + } + + def onStart(): Unit = { + supervisor + logInfo("Supervision tree for receivers initialized at:" + supervisor.path) + } + + def onStop(): Unit = { + supervisor ! PoisonPill + } +} diff --git a/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala b/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala new file mode 100644 index 0000000000000..c8166adfc95e9 --- /dev/null +++ b/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.akka + +import scala.reflect.ClassTag + +import akka.actor.{ActorSystem, SupervisorStrategy, Props} + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaReceiverInputDStream} +import org.apache.spark.streaming.dstream.ReceiverInputDStream + +/** + * Factory interface for creating a new ActorSystem in executors. + */ +trait ActorSystemFactory extends Serializable { + def create(): ActorSystem +} + +object AkkaUtils { + + /** + * Create an input stream with any arbitrary user implemented actor receiver. + * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html + * + * @param ssc the StreamingContext instance + * @param actorSystemCreator a function to create ActorSystem in executors + * @param props Props object defining creation of the actor + * @param name Name of the actor + * @param storageLevel RDD storage level (default: StorageLevel.MEMORY_AND_DISK_SER_2) + * @param supervisorStrategy the supervisor strategy (default: + * ActorSupervisorStrategy.defaultStrategy) + * + * @note An important point to note: + * Since Actor may exist outside the spark framework, It is thus user's responsibility + * to ensure the type safety, i.e parametrized type of data received and actorStream + * should be same. + */ + def createStream[T: ClassTag]( + ssc: StreamingContext, + actorSystemCreator: () => ActorSystem, + props: Props, + name: String, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2, + supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy + ): ReceiverInputDStream[T] = ssc.withNamedScope("actor stream") { + val cleanF = ssc.sc.clean(actorSystemCreator) + ssc.receiverStream(new ActorReceiver[T](cleanF, props, name, storageLevel, supervisorStrategy)) + } + + /** + * Create an input stream with any arbitrary user implemented actor receiver. + * + * @param jssc the StreamingContext instance + * @param actorSystemFactory an ActorSystemFactory to create ActorSystem in executors + * @param props Props object defining creation of the actor + * @param name Name of the actor + * @param storageLevel Storage level to use for storing the received objects + * @param supervisorStrategy the supervisor strategy + * + * @note An important point to note: + * Since Actor may exist outside the spark framework, It is thus user's responsibility + * to ensure the type safety, i.e parametrized type of data received and actorStream + * should be same. + */ + def createStream[T]( + jssc: JavaStreamingContext, + actorSystemFactory: ActorSystemFactory, + props: Props, + name: String, + storageLevel: StorageLevel, + supervisorStrategy: SupervisorStrategy + ): JavaReceiverInputDStream[T] = { + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] + createStream[T]( + jssc.ssc, () => actorSystemFactory.create(), props, name, storageLevel, supervisorStrategy) + } + + /** + * Create an input stream with any arbitrary user implemented actor receiver. + * + * @param jssc the StreamingContext instance + * @param actorSystemFactory an ActorSystemFactory to create ActorSystem in executors + * @param props Props object defining creation of the actor + * @param name Name of the actor + * @param storageLevel Storage level to use for storing the received objects + * + * @note An important point to note: + * Since Actor may exist outside the spark framework, It is thus user's responsibility + * to ensure the type safety, i.e parametrized type of data received and actorStream + * should be same. + */ + def createStream[T]( + jssc: JavaStreamingContext, + actorSystemFactory: ActorSystemFactory, + props: Props, + name: String, + storageLevel: StorageLevel + ): JavaReceiverInputDStream[T] = { + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] + createStream[T](jssc.ssc, () => actorSystemFactory.create(), props, name, storageLevel) + } + + /** + * Create an input stream with any arbitrary user implemented actor receiver. + * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2. + * + * @param jssc the StreamingContext instance + * @param actorSystemFactory an ActorSystemFactory to create ActorSystem in executors + * @param props Props object defining creation of the actor + * @param name Name of the actor + * + * @note An important point to note: + * Since Actor may exist outside the spark framework, It is thus user's responsibility + * to ensure the type safety, i.e parametrized type of data received and actorStream + * should be same. + */ + def createStream[T]( + jssc: JavaStreamingContext, + actorSystemFactory: ActorSystemFactory, + props: Props, + name: String + ): JavaReceiverInputDStream[T] = { + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] + createStream[T](jssc.ssc, () => actorSystemFactory.create(), props, name) + } + +} diff --git a/external/akka/src/test/resources/log4j.properties b/external/akka/src/test/resources/log4j.properties new file mode 100644 index 0000000000000..75e3b53a093f6 --- /dev/null +++ b/external/akka/src/test/resources/log4j.properties @@ -0,0 +1,28 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file target/unit-tests.log +log4j.rootCategory=INFO, file +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=true +log4j.appender.file.file=target/unit-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n + +# Ignore messages below warning level from Jetty, because it's a bit verbose +log4j.logger.org.spark-project.jetty=WARN + diff --git a/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaStreamSuite.scala b/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaStreamSuite.scala new file mode 100644 index 0000000000000..930cd49249deb --- /dev/null +++ b/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaStreamSuite.scala @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.akka + +import scala.collection.JavaConversions._ +import scala.collection.mutable +import scala.concurrent.duration._ + +import akka.actor._ +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfterAll +import org.scalatest.concurrent.Eventually + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.streaming.{Milliseconds, StreamingContext} + +class AkkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfterAll { + + private var ssc: StreamingContext = _ + + private var actorSystem: ActorSystem = _ + + override def afterAll(): Unit = { + if (ssc != null) { + ssc.stop() + ssc = null + } + if (actorSystem != null) { + actorSystem.shutdown() + actorSystem.awaitTermination(30.seconds) + actorSystem = null + } + } + + test("actor input stream") { + val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName) + ssc = new StreamingContext(sparkConf, Milliseconds(500)) + + val akkaConf = ConfigFactory.parseMap( + Map("akka.actor.provider" -> "akka.remote.RemoteActorRefProvider", + "akka.remote.netty.tcp.transport-class" -> "akka.remote.transport.netty.NettyTransport")) + actorSystem = ActorSystem("test", akkaConf) + CachedActorSystem.set(actorSystem) + actorSystem.actorOf(Props(classOf[FeederActor]), "FeederActor") + val feederUri = + actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress + "/user/FeederActor" + val actorStream = AkkaUtils.createStream[String](ssc, () => CachedActorSystem.get, + Props(classOf[TestActorReceiver], feederUri), "TestActorReceiver") + + val result = new mutable.ArrayBuffer[String] with mutable.SynchronizedBuffer[String] + actorStream.foreachRDD { rdd => + result ++= rdd.collect() + } + ssc.start() + + eventually(timeout(10.seconds), interval(100.milliseconds)) { + assert((1 to 10).map(_.toString) === result) + } + } +} + +/** + * Provide a global class to reuse the ActorSystem in the unit test + */ +object CachedActorSystem { + + private var actorSystem: ActorSystem = null + + def set(actorSystem: ActorSystem): Unit = synchronized { + this.actorSystem = actorSystem + } + + def get: ActorSystem = synchronized { + actorSystem + } +} + +case class SubscribeReceiver(receiverActor: ActorRef) + +class FeederActor extends Actor { + + def receive: Receive = { + case SubscribeReceiver(receiverActor: ActorRef) => + println(receiverActor) + (1 to 10).foreach(i => receiverActor ! i.toString()) + } +} + +class TestActorReceiver(uriOfPublisher: String) extends Actor with ActorHelper { + + lazy private val remotePublisher = context.actorSelection(uriOfPublisher) + + override def preStart(): Unit = { + remotePublisher ! SubscribeReceiver(self) + } + + def receive: PartialFunction[Any, Unit] = { + case msg: String => store(msg) + } + +} diff --git a/external/zeromq/pom.xml b/external/zeromq/pom.xml index 37bfd10d43663..ff3e435b2f35f 100644 --- a/external/zeromq/pom.xml +++ b/external/zeromq/pom.xml @@ -41,6 +41,11 @@ ${project.version} provided + + org.apache.spark + spark-streaming-akka_${scala.binary.version} + ${project.version} + org.apache.spark spark-core_${scala.binary.version} diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala index 588e6bac7b14a..c9b7a62bea88b 100644 --- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala +++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala @@ -24,7 +24,7 @@ import akka.util.ByteString import akka.zeromq._ import org.apache.spark.Logging -import org.apache.spark.streaming.receiver.ActorHelper +import org.apache.spark.streaming.akka.ActorHelper /** * A receiver to subscribe to ZeroMQ stream. diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala index 0469d0af8864a..5c2f418162704 100644 --- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala +++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala @@ -19,15 +19,17 @@ package org.apache.spark.streaming.zeromq import scala.reflect.ClassTag import scala.collection.JavaConversions._ -import akka.actor.{Props, SupervisorStrategy} + +import akka.actor.{ActorSystem, Props, SupervisorStrategy} import akka.util.ByteString import akka.zeromq.Subscribe + import org.apache.spark.api.java.function.{Function => JFunction} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext} import org.apache.spark.streaming.dstream.{ReceiverInputDStream} -import org.apache.spark.streaming.receiver.ActorSupervisorStrategy +import org.apache.spark.streaming.akka.{ActorSystemFactory, ActorSupervisorStrategy, AkkaUtils} object ZeroMQUtils { /** @@ -44,13 +46,16 @@ object ZeroMQUtils { */ def createStream[T: ClassTag]( ssc: StreamingContext, + actorSystemCreator: () => ActorSystem, publisherUrl: String, subscribe: Subscribe, bytesToObjects: Seq[ByteString] => Iterator[T], storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2, supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy ): ReceiverInputDStream[T] = { - ssc.actorStream(Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)), + val cleanF = ssc.sc.clean(bytesToObjects) + AkkaUtils.createStream[T](ssc, actorSystemCreator, + Props(new ZeroMQReceiver(publisherUrl, subscribe, cleanF)), "ZeroMQReceiver", storageLevel, supervisorStrategy) } @@ -67,6 +72,7 @@ object ZeroMQUtils { */ def createStream[T]( jssc: JavaStreamingContext, + actorSystemFactory: ActorSystemFactory, publisherUrl: String, subscribe: Subscribe, bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]], @@ -76,7 +82,8 @@ object ZeroMQUtils { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] val fn = (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).toIterator - createStream[T](jssc.ssc, publisherUrl, subscribe, fn, storageLevel, supervisorStrategy) + createStream[T](jssc.ssc, () => actorSystemFactory.create(), + publisherUrl, subscribe, fn, storageLevel, supervisorStrategy) } /** @@ -92,6 +99,7 @@ object ZeroMQUtils { */ def createStream[T]( jssc: JavaStreamingContext, + actorSystemFactory: ActorSystemFactory, publisherUrl: String, subscribe: Subscribe, bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]], @@ -100,7 +108,8 @@ object ZeroMQUtils { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] val fn = (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).toIterator - createStream[T](jssc.ssc, publisherUrl, subscribe, fn, storageLevel) + createStream[T]( + jssc.ssc, () => actorSystemFactory.create(), publisherUrl, subscribe, fn, storageLevel) } /** @@ -116,6 +125,7 @@ object ZeroMQUtils { */ def createStream[T]( jssc: JavaStreamingContext, + actorSystemFactory: ActorSystemFactory, publisherUrl: String, subscribe: Subscribe, bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]] @@ -123,6 +133,6 @@ object ZeroMQUtils { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] val fn = (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).toIterator - createStream[T](jssc.ssc, publisherUrl, subscribe, fn) + createStream[T](jssc.ssc, () => actorSystemFactory.create(), publisherUrl, subscribe, fn) } } diff --git a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java index 417b91eecb0ee..dc93536c14660 100644 --- a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java +++ b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java @@ -17,14 +17,17 @@ package org.apache.spark.streaming.zeromq; -import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; -import org.junit.Test; +import akka.actor.ActorSystem; import akka.actor.SupervisorStrategy; import akka.util.ByteString; import akka.zeromq.Subscribe; +import org.junit.Test; + import org.apache.spark.api.java.function.Function; -import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.LocalJavaStreamingContext; +import org.apache.spark.streaming.akka.ActorSystemFactory; +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; +import org.apache.spark.storage.StorageLevel; public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext { @@ -32,19 +35,29 @@ public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext { public void testZeroMQStream() { String publishUrl = "abc"; Subscribe subscribe = new Subscribe((ByteString)null); - Function> bytesToObjects = new Function>() { - @Override - public Iterable call(byte[][] bytes) throws Exception { - return null; - } - }; - + Function> bytesToObjects = new BytesToObjects(); + ActorSystemFactory actorSystemFactory = new ActorSystemFactoryForTest(); JavaReceiverInputDStream test1 = ZeroMQUtils.createStream( - ssc, publishUrl, subscribe, bytesToObjects); + ssc, actorSystemFactory, publishUrl, subscribe, bytesToObjects); JavaReceiverInputDStream test2 = ZeroMQUtils.createStream( - ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2()); + ssc, actorSystemFactory, publishUrl, subscribe, bytesToObjects, + StorageLevel.MEMORY_AND_DISK_SER_2()); JavaReceiverInputDStream test3 = ZeroMQUtils.createStream( - ssc,publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(), - SupervisorStrategy.defaultStrategy()); + ssc, actorSystemFactory, publishUrl, subscribe, bytesToObjects, + StorageLevel.MEMORY_AND_DISK_SER_2(), SupervisorStrategy.defaultStrategy()); + } +} + +class BytesToObjects implements Function> { + @Override + public Iterable call(byte[][] bytes) throws Exception { + return null; + } +} + +class ActorSystemFactoryForTest implements ActorSystemFactory { + @Override + public ActorSystem create() { + return null; } } diff --git a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala index 35d2e62c68480..284d2a459c39e 100644 --- a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala +++ b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala @@ -39,14 +39,13 @@ class ZeroMQStreamSuite extends SparkFunSuite { val publishUrl = "abc" val subscribe = new Subscribe(null.asInstanceOf[ByteString]) val bytesToObjects = (bytes: Seq[ByteString]) => null.asInstanceOf[Iterator[String]] - // tests the API, does not actually test data receiving val test1: ReceiverInputDStream[String] = - ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects) + ZeroMQUtils.createStream(ssc, () => null, publishUrl, subscribe, bytesToObjects) val test2: ReceiverInputDStream[String] = ZeroMQUtils.createStream( - ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2) + ssc, () => null, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2) val test3: ReceiverInputDStream[String] = ZeroMQUtils.createStream( - ssc, publishUrl, subscribe, bytesToObjects, + ssc, () => null, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2, SupervisorStrategy.defaultStrategy) // TODO: Actually test data receiving diff --git a/pom.xml b/pom.xml index 6d4f717d4931b..f9e80d6deff96 100644 --- a/pom.xml +++ b/pom.xml @@ -99,6 +99,7 @@ sql/hive unsafe assembly + external/akka external/twitter external/flume external/flume-sink diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 41b7eba3a06c2..f4efe99c3c29f 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -32,11 +32,11 @@ object BuildCommons { private val buildLocation = file(".").getAbsoluteFile.getParentFile val allProjects@Seq(bagel, catalyst, core, graphx, hive, hiveThriftServer, mllib, repl, - sql, networkCommon, networkShuffle, streaming, streamingFlumeSink, streamingFlume, streamingKafka, + sql, networkCommon, networkShuffle, streaming, streamingFlumeSink, streamingAkka, streamingFlume, streamingKafka, streamingMqtt, streamingTwitter, streamingZeromq, launcher, unsafe) = Seq("bagel", "catalyst", "core", "graphx", "hive", "hive-thriftserver", "mllib", "repl", "sql", "network-common", "network-shuffle", "streaming", "streaming-flume-sink", - "streaming-flume", "streaming-kafka", "streaming-mqtt", "streaming-twitter", + "streaming-akka", "streaming-flume", "streaming-kafka", "streaming-mqtt", "streaming-twitter", "streaming-zeromq", "launcher", "unsafe").map(ProjectRef(buildLocation, _)) val optionallyEnabledProjects@Seq(yarn, yarnStable, java8Tests, sparkGangliaLgpl, @@ -273,7 +273,7 @@ object OldDeps { retrieveManaged := true, retrievePattern := "[type]s/[artifact](-[revision])(-[classifier]).[ext]", libraryDependencies := Seq("spark-streaming-mqtt", "spark-streaming-zeromq", - "spark-streaming-flume", "spark-streaming-kafka", "spark-streaming-twitter", + "spark-streaming-akka", "spark-streaming-flume", "spark-streaming-kafka", "spark-streaming-twitter", "spark-streaming", "spark-mllib", "spark-bagel", "spark-graphx", "spark-core").map(versionArtifact(_).get intransitive()) ) @@ -469,7 +469,7 @@ object Unidoc { "-public", "-group", "Core Java API", packageList("api.java", "api.java.function"), "-group", "Spark Streaming", packageList( - "streaming.api.java", "streaming.flume", "streaming.kafka", + "streaming.api.java", "streaming.akka", "streaming.flume", "streaming.kafka", "streaming.mqtt", "streaming.twitter", "streaming.zeromq", "streaming.kinesis" ), "-group", "MLlib", packageList( diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 9cd9684d36404..0d41908ae0847 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -304,6 +304,7 @@ class StreamingContext private[streaming] ( * to ensure the type safety, i.e parametrized type of data received and actorStream * should be same. */ + @deprecated("Use org.apache.spark.streaming.akka.AkkaUtils.createStream instead", "1.5.0") def actorStream[T: ClassTag]( props: Props, name: String, diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 989e3a729ebc2..dc3995fdd1a86 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -362,7 +362,9 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { * Since Actor may exist outside the spark framework, It is thus user's responsibility * to ensure the type safety, i.e parametrized type of data received and actorStream * should be same. + * @deprecated As of 1.5.0, replaced by `org.apache.spark.streaming.akka.AkkaUtils.createStream)`. */ + @deprecated("Use org.apache.spark.streaming.akka.AkkaUtils.createStream instead", "1.5.0") def actorStream[T]( props: Props, name: String, @@ -384,7 +386,9 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { * Since Actor may exist outside the spark framework, It is thus user's responsibility * to ensure the type safety, i.e parametrized type of data received and actorStream * should be same. + * @deprecated As of 1.5.0, replaced by `org.apache.spark.streaming.akka.AkkaUtils.createStream)`. */ + @deprecated("Use org.apache.spark.streaming.akka.AkkaUtils.createStream instead", "1.5.0") def actorStream[T]( props: Props, name: String, @@ -405,7 +409,9 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { * Since Actor may exist outside the spark framework, It is thus user's responsibility * to ensure the type safety, i.e parametrized type of data received and actorStream * should be same. + * @deprecated As of 1.5.0, replaced by `org.apache.spark.streaming.akka.AkkaUtils.createStream)`. */ + @deprecated("Use org.apache.spark.streaming.akka.AkkaUtils.createStream instead", "1.5.0") def actorStream[T]( props: Props, name: String diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala index cd309788a7717..435386b9817a1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala @@ -36,6 +36,7 @@ import org.apache.spark.storage.StorageLevel * A helper with set of defaults for supervisor strategy */ @DeveloperApi +@deprecated("Use org.apache.spark.streaming.akka.ActorSupervisorStrategy instead", "1.5.0") object ActorSupervisorStrategy { val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = @@ -69,6 +70,7 @@ object ActorSupervisorStrategy { * should be same. */ @DeveloperApi +@deprecated("Use org.apache.spark.streaming.akka.ActorHelper instead", "1.5.0") trait ActorHelper extends Logging{ self: Actor => // to ensure that this can be added to Actor classes only @@ -107,6 +109,7 @@ trait ActorHelper extends Logging{ * [[org.apache.spark.streaming.receiver.ActorHelper]]. */ @DeveloperApi +@deprecated("Use org.apache.spark.streaming.akka.Statistics instead", "1.5.0") case class Statistics(numberOfMsgs: Int, numberOfWorkers: Int, numberOfHiccups: Int, From 93bdee9b89a572aed0b01603f1ebe99dba732177 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Tue, 16 Jun 2015 22:01:09 +0800 Subject: [PATCH 02/12] Fix the MiMa failure --- project/SparkBuild.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index f4efe99c3c29f..6e181366c0e14 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -273,7 +273,7 @@ object OldDeps { retrieveManaged := true, retrievePattern := "[type]s/[artifact](-[revision])(-[classifier]).[ext]", libraryDependencies := Seq("spark-streaming-mqtt", "spark-streaming-zeromq", - "spark-streaming-akka", "spark-streaming-flume", "spark-streaming-kafka", "spark-streaming-twitter", + "spark-streaming-flume", "spark-streaming-kafka", "spark-streaming-twitter", "spark-streaming", "spark-mllib", "spark-bagel", "spark-graphx", "spark-core").map(versionArtifact(_).get intransitive()) ) From 0421a4a8f27242f485ee48c412b48d07bb33f7ab Mon Sep 17 00:00:00 2001 From: zsxwing Date: Wed, 17 Jun 2015 09:12:14 +0800 Subject: [PATCH 03/12] Fix the MiMa failure --- project/MimaExcludes.scala | 10 ++++++++++ project/SparkBuild.scala | 3 ++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 8a93ca2999510..8174887b7dbb4 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -52,6 +52,16 @@ object MimaExcludes { "org.apache.spark.streaming.kafka.KafkaTestUtils.waitUntilLeaderOffset"), // SQL execution is considered private. excludePackage("org.apache.spark.sql.execution") + ) ++ Seq( + // SPARK-7799 + ProblemFilters.exclude[IncompatibleMethTypeProblem]( + "org.apache.spark.streaming.zeromq.ZeroMQUtils.createStream"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.streaming.zeromq.ZeroMQUtils.createStream"), + ProblemFilters.exclude[IncompatibleResultTypeProblem]( + "org.apache.spark.streaming.zeromq.ZeroMQUtils.createStream$default$6"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.streaming.zeromq.ZeroMQUtils.createStream$default$5") ) case v if v.startsWith("1.4") => Seq( diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 6e181366c0e14..becd5c5da5935 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -168,8 +168,9 @@ object SparkBuild extends PomBuild { (allProjects ++ optionallyEnabledProjects).foreach(enable(TestSettings.settings)) // TODO: remove launcher from this list after 1.4.0 + // TODO: remove streamingAkka from this list after 1.5.0 allProjects.filterNot(x => Seq(spark, hive, hiveThriftServer, catalyst, repl, - networkCommon, networkShuffle, networkYarn, launcher, unsafe).contains(x)).foreach { + networkCommon, networkShuffle, networkYarn, launcher, unsafe, streamingAkka).contains(x)).foreach { x => enable(MimaBuild.mimaSettings(sparkHome, x))(x) } From 2be2090703c7fe56457de8057b4b19483feacbce Mon Sep 17 00:00:00 2001 From: zsxwing Date: Wed, 17 Jun 2015 14:32:12 +0800 Subject: [PATCH 04/12] Ignore the unit test --- external/akka/pom.xml | 9 --------- .../apache/spark/streaming/akka/AkkaStreamSuite.scala | 8 +++++++- 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/external/akka/pom.xml b/external/akka/pom.xml index 16dd2e686ec8a..0568a1bcda7db 100644 --- a/external/akka/pom.xml +++ b/external/akka/pom.xml @@ -58,15 +58,6 @@ akka-remote_${scala.binary.version} ${akka.version} - - - com.typesafe - config - 1.3.0 - test - target/scala-${scala.binary.version}/classes diff --git a/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaStreamSuite.scala b/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaStreamSuite.scala index 930cd49249deb..b9dad3f95b76b 100644 --- a/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaStreamSuite.scala +++ b/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaStreamSuite.scala @@ -47,7 +47,13 @@ class AkkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfterA } } - test("actor input stream") { + ignore("actor input stream") { + // Because unit tests run with "-Dsun.io.serialization.extendedDebugInfo=true", this test will + // fail due to https://github.com/typesafehub/config/issues/176 + // Setting "sun.io.serialization.extendedDebugInfo" to false at runtime doesn't work because + // ObjectOutputStream.extendedDebugInfo is a static field and cannot be changed at runtime. + // And the next version of Config is 1.3.0 but it only supports Java 8. + // So now just ignore this test. val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName) ssc = new StreamingContext(sparkConf, Milliseconds(500)) From 6880b5cb0538cf616667abd73312c9bc3f47f987 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Mon, 27 Jul 2015 19:46:53 +0800 Subject: [PATCH 05/12] Add old methods back to ZeroMQUtils and deprecate them --- .../examples/streaming/ZeroMQWordCount.scala | 2 - .../spark/streaming/zeromq/ZeroMQUtils.scala | 130 ++++++++++++++++-- .../streaming/zeromq/ZeroMQStreamSuite.scala | 4 +- 3 files changed, 123 insertions(+), 13 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala index b97b075cec863..60971867cccbf 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala @@ -57,7 +57,6 @@ object SimpleZeroMQPublisher { } } -// scalastyle:off /** * A sample wordcount with ZeroMQStream stream * @@ -75,7 +74,6 @@ object SimpleZeroMQPublisher { * `$ bin/run-example \ * org.apache.spark.examples.streaming.ZeroMQWordCount tcp://127.0.1.1:1234 foo` */ -// scalastyle:on object ZeroMQWordCount { def main(args: Array[String]) { if (args.length < 2) { diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala index 5c2f418162704..731c428805115 100644 --- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala +++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala @@ -24,6 +24,7 @@ import akka.actor.{ActorSystem, Props, SupervisorStrategy} import akka.util.ByteString import akka.zeromq.Subscribe +import org.apache.spark.SparkEnv import org.apache.spark.api.java.function.{Function => JFunction} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext @@ -34,25 +35,55 @@ import org.apache.spark.streaming.akka.{ActorSystemFactory, ActorSupervisorStrat object ZeroMQUtils { /** * Create an input stream that receives messages pushed by a zeromq publisher. - * @param ssc StreamingContext object - * @param publisherUrl Url of remote zeromq publisher - * @param subscribe Topic to subscribe to + * @param ssc StreamingContext object + * @param publisherUrl Url of remote zeromq publisher + * @param subscribe Topic to subscribe to * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic * and each frame has sequence of byte thus it needs the converter * (which might be deserializer of bytes) to translate from sequence * of sequence of bytes, where sequence refer to a frame * and sub sequence refer to its payload. - * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2. + * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2. + * @param supervisorStrategy the supervisor strategy (default: + * ActorSupervisorStrategy.defaultStrategy) */ + @deprecated("Use createStream with actorSystemCreator instead", "1.5.0") def createStream[T: ClassTag]( ssc: StreamingContext, - actorSystemCreator: () => ActorSystem, publisherUrl: String, subscribe: Subscribe, bytesToObjects: Seq[ByteString] => Iterator[T], storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2, supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy ): ReceiverInputDStream[T] = { + createStream(ssc, () => SparkEnv.get.actorSystem, publisherUrl, subscribe, bytesToObjects, + storageLevel, supervisorStrategy) + } + + /** + * Create an input stream that receives messages pushed by a zeromq publisher. + * @param ssc StreamingContext object + * @param actorSystemCreator a function to create ActorSystem in executors + * @param publisherUrl Url of remote zeromq publisher + * @param subscribe Topic to subscribe to + * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic + * and each frame has sequence of byte thus it needs the converter + * (which might be deserializer of bytes) to translate from sequence + * of sequence of bytes, where sequence refer to a frame + * and sub sequence refer to its payload. + * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2. + * @param supervisorStrategy the supervisor strategy (default: + * ActorSupervisorStrategy.defaultStrategy) + */ + def createStream[T: ClassTag]( + ssc: StreamingContext, + actorSystemCreator: () => ActorSystem, + publisherUrl: String, + subscribe: Subscribe, + bytesToObjects: Seq[ByteString] => Iterator[T], + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2, + supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy + ): ReceiverInputDStream[T] = { val cleanF = ssc.sc.clean(bytesToObjects) AkkaUtils.createStream[T](ssc, actorSystemCreator, Props(new ZeroMQReceiver(publisherUrl, subscribe, cleanF)), @@ -61,14 +92,44 @@ object ZeroMQUtils { /** * Create an input stream that receives messages pushed by a zeromq publisher. - * @param jssc JavaStreamingContext object - * @param publisherUrl Url of remote ZeroMQ publisher - * @param subscribe Topic to subscribe to + * @param jssc JavaStreamingContext object + * @param publisherUrl Url of remote ZeroMQ publisher + * @param subscribe Topic to subscribe to * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each * frame has sequence of byte thus it needs the converter(which might be * deserializer of bytes) to translate from sequence of sequence of bytes, * where sequence refer to a frame and sub sequence refer to its payload. - * @param storageLevel Storage level to use for storing the received objects + * @param storageLevel Storage level to use for storing the received objects + * @param supervisorStrategy the supervisor strategy + */ + @deprecated("Use createStream with actorSystemFactory instead", "1.5.0") + def createStream[T]( + jssc: JavaStreamingContext, + publisherUrl: String, + subscribe: Subscribe, + bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]], + storageLevel: StorageLevel, + supervisorStrategy: SupervisorStrategy + ): JavaReceiverInputDStream[T] = { + val actorSystemFactory = new ActorSystemFactory { + override def create(): ActorSystem = SparkEnv.get.actorSystem + } + createStream(jssc, actorSystemFactory, publisherUrl, subscribe, bytesToObjects, storageLevel, + supervisorStrategy) + } + + /** + * Create an input stream that receives messages pushed by a zeromq publisher. + * @param jssc JavaStreamingContext object + * @param actorSystemFactory an ActorSystemFactory to create ActorSystem in executors + * @param publisherUrl Url of remote ZeroMQ publisher + * @param subscribe Topic to subscribe to + * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each + * frame has sequence of byte thus it needs the converter(which might be + * deserializer of bytes) to translate from sequence of sequence of bytes, + * where sequence refer to a frame and sub sequence refer to its payload. + * @param storageLevel Storage level to use for storing the received objects + * @param supervisorStrategy the supervisor strategy */ def createStream[T]( jssc: JavaStreamingContext, @@ -97,6 +158,32 @@ object ZeroMQUtils { * where sequence refer to a frame and sub sequence refer to its payload. * @param storageLevel RDD storage level. */ + @deprecated("Use createStream with actorSystemFactory instead", "1.5.0") + def createStream[T]( + jssc: JavaStreamingContext, + publisherUrl: String, + subscribe: Subscribe, + bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]], + storageLevel: StorageLevel + ): JavaReceiverInputDStream[T] = { + val actorSystemFactory = new ActorSystemFactory { + override def create(): ActorSystem = SparkEnv.get.actorSystem + } + createStream(jssc, actorSystemFactory, publisherUrl, subscribe, bytesToObjects, storageLevel) + } + + /** + * Create an input stream that receives messages pushed by a zeromq publisher. + * @param jssc JavaStreamingContext object + * @param actorSystemFactory an ActorSystemFactory to create ActorSystem in executors + * @param publisherUrl Url of remote zeromq publisher + * @param subscribe Topic to subscribe to + * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each + * frame has sequence of byte thus it needs the converter(which might be + * deserializer of bytes) to translate from sequence of sequence of bytes, + * where sequence refer to a frame and sub sequence refer to its payload. + * @param storageLevel RDD storage level. + */ def createStream[T]( jssc: JavaStreamingContext, actorSystemFactory: ActorSystemFactory, @@ -123,6 +210,31 @@ object ZeroMQUtils { * bytes, where sequence refer to a frame and sub sequence refer to its * payload. */ + @deprecated("Use createStream with actorSystemFactory instead", "1.5.0") + def createStream[T]( + jssc: JavaStreamingContext, + publisherUrl: String, + subscribe: Subscribe, + bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]] + ): JavaReceiverInputDStream[T] = { + val actorSystemFactory = new ActorSystemFactory { + override def create(): ActorSystem = SparkEnv.get.actorSystem + } + createStream(jssc, actorSystemFactory, publisherUrl, subscribe, bytesToObjects) + } + + /** + * Create an input stream that receives messages pushed by a zeromq publisher. + * @param jssc JavaStreamingContext object + * @param actorSystemFactory an ActorSystemFactory to create ActorSystem in executors + * @param publisherUrl Url of remote zeromq publisher + * @param subscribe Topic to subscribe to + * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each + * frame has sequence of byte thus it needs the converter(which might + * be deserializer of bytes) to translate from sequence of sequence of + * bytes, where sequence refer to a frame and sub sequence refer to its + * payload. + */ def createStream[T]( jssc: JavaStreamingContext, actorSystemFactory: ActorSystemFactory, diff --git a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala index 284d2a459c39e..46baa03df7fbd 100644 --- a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala +++ b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala @@ -38,10 +38,10 @@ class ZeroMQStreamSuite extends SparkFunSuite { val ssc = new StreamingContext(master, framework, batchDuration) val publishUrl = "abc" val subscribe = new Subscribe(null.asInstanceOf[ByteString]) - val bytesToObjects = (bytes: Seq[ByteString]) => null.asInstanceOf[Iterator[String]] + val bytesToObjects = (bytes: Seq[ByteString]) => null: Iterator[String] // tests the API, does not actually test data receiving val test1: ReceiverInputDStream[String] = - ZeroMQUtils.createStream(ssc, () => null, publishUrl, subscribe, bytesToObjects) + ZeroMQUtils.createStream[String](ssc, () => null, publishUrl, subscribe, bytesToObjects) val test2: ReceiverInputDStream[String] = ZeroMQUtils.createStream( ssc, () => null, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2) val test3: ReceiverInputDStream[String] = ZeroMQUtils.createStream( From f7186293e2b4fd69b30b69f4c92c5d2bbe60d30d Mon Sep 17 00:00:00 2001 From: zsxwing Date: Mon, 27 Jul 2015 19:47:08 +0800 Subject: [PATCH 06/12] Fix tests for streaming-akka --- .../streaming/akka/AkkaStreamSuite.scala | 21 ++++++++----------- project/SparkBuild.scala | 17 ++++++++++++++- 2 files changed, 25 insertions(+), 13 deletions(-) diff --git a/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaStreamSuite.scala b/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaStreamSuite.scala index b9dad3f95b76b..81037ada6bc89 100644 --- a/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaStreamSuite.scala +++ b/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaStreamSuite.scala @@ -23,19 +23,19 @@ import scala.concurrent.duration._ import akka.actor._ import com.typesafe.config.ConfigFactory -import org.scalatest.BeforeAndAfterAll +import org.scalatest.BeforeAndAfter import org.scalatest.concurrent.Eventually import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.streaming.{Milliseconds, StreamingContext} -class AkkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfterAll { +class AkkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter { private var ssc: StreamingContext = _ private var actorSystem: ActorSystem = _ - override def afterAll(): Unit = { + after { if (ssc != null) { ssc.stop() ssc = null @@ -47,13 +47,11 @@ class AkkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfterA } } - ignore("actor input stream") { - // Because unit tests run with "-Dsun.io.serialization.extendedDebugInfo=true", this test will - // fail due to https://github.com/typesafehub/config/issues/176 - // Setting "sun.io.serialization.extendedDebugInfo" to false at runtime doesn't work because - // ObjectOutputStream.extendedDebugInfo is a static field and cannot be changed at runtime. - // And the next version of Config is 1.3.0 but it only supports Java 8. - // So now just ignore this test. + test("actor input stream") { + // `Props` contains a reference to com.typesafe.config.Config and will be serialized. However, + // because of https://github.com/typesafehub/config/issues/176, this unit test cannot run with + // "-Dsun.io.serialization.extendedDebugInfo=true". Therefore, + // "sun.io.serialization.extendedDebugInfo" is disabled in SparkBuild.scala for streaming-akka val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName) ssc = new StreamingContext(sparkConf, Milliseconds(500)) @@ -74,7 +72,7 @@ class AkkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfterA } ssc.start() - eventually(timeout(10.seconds), interval(100.milliseconds)) { + eventually(timeout(10.seconds), interval(10.milliseconds)) { assert((1 to 10).map(_.toString) === result) } } @@ -102,7 +100,6 @@ class FeederActor extends Actor { def receive: Receive = { case SubscribeReceiver(receiverActor: ActorRef) => - println(receiverActor) (1 to 10).foreach(i => receiverActor ! i.toString()) } } diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index ad3a394399ac5..96fab669a5659 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -201,6 +201,14 @@ object SparkBuild extends PomBuild { /* Enable tests settings for all projects except examples, assembly and tools */ (allProjects ++ optionallyEnabledProjects).foreach(enable(TestSettings.settings)) + (allProjects ++ optionallyEnabledProjects).filterNot { x => + // The unit tests in streamingAkka and streamingZeromq need to serialize + // com.typesafe.config.Config. However, we cannot enable sun.io.serialization.extendedDebugInfo + // because of https://github.com/typesafehub/config/issues/176 + // Note: although this issue is resolved in Config 1.3.0, but it only supports Java 8. + Seq(streamingAkka, streamingZeromq).contains(x) + }.foreach(enable(SerializationDebugSettings.settings)) + // TODO: remove streamingAkka from this list after 1.5.0 allProjects.filterNot(x => Seq(spark, hive, hiveThriftServer, catalyst, repl, networkCommon, networkShuffle, networkYarn, unsafe, streamingAkka).contains(x)).foreach { @@ -545,7 +553,6 @@ object TestSettings { javaOptions in Test += "-Dspark.ui.showConsoleProgress=false", javaOptions in Test += "-Dspark.driver.allowMultipleContexts=true", javaOptions in Test += "-Dspark.unsafe.exceptionOnMemoryLeak=true", - javaOptions in Test += "-Dsun.io.serialization.extendedDebugInfo=true", javaOptions in Test += "-Dderby.system.durability=test", javaOptions in Test ++= System.getProperties.filter(_._1 startsWith "spark") .map { case (k,v) => s"-D$k=$v" }.toSeq, @@ -583,3 +590,11 @@ object TestSettings { ) } + +object SerializationDebugSettings { + import BuildCommons._ + + lazy val settings = Seq ( + javaOptions in Test += "-Dsun.io.serialization.extendedDebugInfo=true" + ) +} From 2d80315bf884b49d8845c32a93df1f9fdb130b97 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Mon, 27 Jul 2015 22:16:59 +0800 Subject: [PATCH 07/12] Fix the compilation error --- .../org/apache/spark/examples/streaming/ZeroMQWordCount.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala index 60971867cccbf..51f22e8a03e65 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala @@ -89,7 +89,7 @@ object ZeroMQWordCount { def bytesToStringIterator(x: Seq[ByteString]): Iterator[String] = x.map(_.utf8String).iterator // For this stream, a zeroMQ publisher should be running. - val lines = ZeroMQUtils.createStream( + val lines = ZeroMQUtils.createStream[String]( ssc, () => ZeroMQGlobalActorSystem.actorSystem, url, Subscribe(topic), bytesToStringIterator _) val words = lines.flatMap(_.split(" ")) From 32bbffd304359e09e67e826b1eb9896dce7ac3a1 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Mon, 27 Jul 2015 22:55:26 +0800 Subject: [PATCH 08/12] Minor fixes --- .../spark/streaming/akka/AkkaStreamSuite.scala | 1 + .../spark/streaming/zeromq/ZeroMQStreamSuite.scala | 2 +- project/SparkBuild.scala | 13 +++++++------ 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaStreamSuite.scala b/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaStreamSuite.scala index 81037ada6bc89..c92b4323491fc 100644 --- a/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaStreamSuite.scala +++ b/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaStreamSuite.scala @@ -45,6 +45,7 @@ class AkkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter actorSystem.awaitTermination(30.seconds) actorSystem = null } + CachedActorSystem.set(null) } test("actor input stream") { diff --git a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala index 46baa03df7fbd..211fecd562fcb 100644 --- a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala +++ b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala @@ -48,7 +48,7 @@ class ZeroMQStreamSuite extends SparkFunSuite { ssc, () => null, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2, SupervisorStrategy.defaultStrategy) - // TODO: Actually test data receiving + // TODO: Actually test data receiving. A real test needs the native ZeroMQ library ssc.stop() } } diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 96fab669a5659..a9aecf7a6fc91 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -201,12 +201,13 @@ object SparkBuild extends PomBuild { /* Enable tests settings for all projects except examples, assembly and tools */ (allProjects ++ optionallyEnabledProjects).foreach(enable(TestSettings.settings)) - (allProjects ++ optionallyEnabledProjects).filterNot { x => - // The unit tests in streamingAkka and streamingZeromq need to serialize - // com.typesafe.config.Config. However, we cannot enable sun.io.serialization.extendedDebugInfo - // because of https://github.com/typesafehub/config/issues/176 - // Note: although this issue is resolved in Config 1.3.0, but it only supports Java 8. - Seq(streamingAkka, streamingZeromq).contains(x) + /* Enable sun.io.serialization.extendedDebugInfo for all projects except streamingAkka */ + (allProjects ++ optionallyEnabledProjects).filter { x => + // The unit tests in streamingAkka need to serialize com.typesafe.config.Config. However, we + // cannot enable sun.io.serialization.extendedDebugInfo because of + // https://github.com/typesafehub/config/issues/176 + // Note: although this issue is already resolved in Config 1.3.0, but it only supports Java 8. + x != streamingAkka }.foreach(enable(SerializationDebugSettings.settings)) // TODO: remove streamingAkka from this list after 1.5.0 From 8b2ff5cea8d95c3183b0d2b97eb9d1cdbc0bf45d Mon Sep 17 00:00:00 2001 From: zsxwing Date: Mon, 27 Jul 2015 23:08:00 +0800 Subject: [PATCH 09/12] Fix the import order --- .../scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala index 731c428805115..98bf91c20d629 100644 --- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala +++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala @@ -28,9 +28,9 @@ import org.apache.spark.SparkEnv import org.apache.spark.api.java.function.{Function => JFunction} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.akka.{ActorSupervisorStrategy, ActorSystemFactory, AkkaUtils} import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext} import org.apache.spark.streaming.dstream.{ReceiverInputDStream} -import org.apache.spark.streaming.akka.{ActorSystemFactory, ActorSupervisorStrategy, AkkaUtils} object ZeroMQUtils { /** From 49f8b2c86e45a69d0bf9179ac1283dd74d163de9 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Tue, 28 Jul 2015 17:11:28 +0800 Subject: [PATCH 10/12] Remove JavaActorHelper and the JavaActorWordCount example --- .../streaming/JavaActorWordCount.java | 182 ------------------ .../spark/streaming/akka/ActorReceiver.scala | 54 ------ 2 files changed, 236 deletions(-) delete mode 100644 examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java deleted file mode 100644 index c241abc30cb68..0000000000000 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.examples.streaming; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; - -import scala.Tuple2; - -import akka.actor.ActorSelection; -import akka.actor.ActorSystem; -import akka.actor.Props; -import akka.actor.UntypedActor; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; - -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.Function2; -import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.streaming.Duration; -import org.apache.spark.streaming.akka.ActorSystemFactory; -import org.apache.spark.streaming.akka.AkkaUtils; -import org.apache.spark.streaming.akka.JavaActorHelper; -import org.apache.spark.streaming.api.java.JavaDStream; -import org.apache.spark.streaming.api.java.JavaStreamingContext; - -/** - * A sample actor as receiver, is also simplest. This receiver actor - * goes and subscribe to a typical publisher/feeder actor and receives - * data. - * - * @see [[org.apache.spark.examples.streaming.FeederActor]] - */ -class JavaSampleActorReceiver extends UntypedActor { - - private final String urlOfPublisher; - - private final JavaActorHelper helper = new JavaActorHelper(this); - - public JavaSampleActorReceiver(String urlOfPublisher) { - this.urlOfPublisher = urlOfPublisher; - } - - private ActorSelection remotePublisher; - - @Override - public void preStart() { - remotePublisher = getContext().actorSelection(urlOfPublisher); - remotePublisher.tell(new SubscribeReceiver(getSelf()), getSelf()); - } - - public void onReceive(Object msg) throws Exception { - helper.store((T) msg); - } - - @Override - public void postStop() { - remotePublisher.tell(new UnsubscribeReceiver(getSelf()), getSelf()); - } -} - -/** - * A sample word count program demonstrating the use of plugging in - * Actor as Receiver - * Usage: JavaActorWordCount - * and describe the AkkaSystem that Spark Sample feeder is running on. - * - * To run this example locally, you may run Feeder Actor as - *
- *     $ bin/run-example org.apache.spark.examples.streaming.FeederActor 127.0.0.1 9999
- * 
- * and then run the example - *
- *     $ bin/run-example org.apache.spark.examples.streaming.JavaActorWordCount 127.0.0.1 9999
- * 
- */ -public class JavaActorWordCount { - - public static void main(String[] args) { - if (args.length < 2) { - System.err.println("Usage: JavaActorWordCount "); - System.exit(1); - } - - StreamingExamples.setStreamingLogLevels(); - - final String host = args[0]; - final String port = args[1]; - SparkConf sparkConf = new SparkConf().setAppName("JavaActorWordCount"); - // Create the context and set the batch size - JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); - - /* - * Following is the use of AkkaUtils.createStream to plug in custom actor as receiver - * - * An important point to note: - * Since Actor may exist outside the spark framework, It is thus user's responsibility - * to ensure the type safety, i.e type of data received and InputDStream - * should be same. - * - * For example: Both AkkaUtils.createStream and JavaSampleActorReceiver are parameterized - * to same type to ensure type safety. - */ - - ActorSystemFactory actorSystemFactory = new ActorSystemFactory() { - @Override - public ActorSystem create() { - return JavaGlobalActorSystem.getActorSystem(); - } - }; - - String feederActorURI = "akka.tcp://test@" + host + ":" + port + "/user/FeederActor"; - - JavaDStream lines1 = AkkaUtils.createStream( - jssc, - actorSystemFactory, - Props.create(JavaSampleActorReceiver.class, feederActorURI), "SampleReceiver1"); - JavaDStream lines2 = AkkaUtils.createStream( - jssc, - actorSystemFactory, - Props.create(JavaSampleActorReceiver.class, feederActorURI), "SampleReceiver2"); - - JavaDStream lines = lines1.union(lines2); - - // compute wordcount - lines.flatMap(new FlatMapFunction() { - @Override - public Iterable call(String s) { - return Arrays.asList(s.split("\\s+")); - } - }).mapToPair(new PairFunction() { - @Override - public Tuple2 call(String s) { - return new Tuple2(s, 1); - } - }).reduceByKey(new Function2() { - @Override - public Integer call(Integer i1, Integer i2) { - return i1 + i2; - } - }).print(); - - jssc.start(); - jssc.awaitTermination(); - } -} - -/** - * A global `ActorSystem` to avoid creating multiple `ActorSystem`s in an executor. - */ -class JavaGlobalActorSystem { - - private static ActorSystem actorSystem = null; - - public synchronized static ActorSystem getActorSystem() { - if (actorSystem == null) { - Map akkaConf = new HashMap(); - akkaConf.put("akka.actor.provider", "akka.remote.RemoteActorRefProvider"); - akkaConf.put( - "akka.remote.netty.tcp.transport-class", "akka.remote.transport.netty.NettyTransport"); - actorSystem = ActorSystem.create("test", ConfigFactory.parseMap(akkaConf)); - } - return actorSystem; - } -} diff --git a/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala b/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala index c6564c6e85972..f1e3002b59b63 100644 --- a/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala +++ b/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala @@ -102,60 +102,6 @@ trait ActorHelper extends Logging{ } } -/** - * :: DeveloperApi :: - * A helper class for your Actor to gain access to - * the API for pushing received data into Spark Streaming for being processed. - * - * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html - * - * @example {{{ - * class MyActor extends UntypedActor { - * - * private final JavaActorHelper helper = new JavaActorHelper(this); - * - * public void onReceive(Object msg) throws Exception { - * helper.store(msg); - * } - * - * } - * - * // Can be used with an actorStream as follows - * AkkaUtils.createStream( - * jssc, actorSystemFactory, Props.create(new MyActor()), "MyActorReceiver"); - * - * }}} - * - * @note Since Actor may exist outside the spark framework, It is thus user's responsibility - * to ensure the type safety, i.e parametrized type of push block and InputDStream - * should be same. - */ -@DeveloperApi -class JavaActorHelper(actor: Actor) { - /** Store an iterator of received data as a data block into Spark's memory. */ - def store[T](iter: java.util.Iterator[T]) { - actor.context.parent ! IteratorData(iter) - } - - /** - * Store the bytes of received data as a data block into Spark's memory. Note - * that the data in the ByteBuffer must be serialized using the same serializer - * that Spark is configured to use. - */ - def store(bytes: ByteBuffer) { - actor.context.parent ! ByteBufferData(bytes) - } - - /** - * Store a single item of received data to Spark's memory. - * These single items will be aggregated together into data blocks before - * being pushed into Spark's memory. - */ - def store[T](item: T) { - actor.context.parent ! SingleItemData(item) - } -} - /** * :: DeveloperApi :: * Statistics for querying the supervisor about state of workers. Used in From 73d883d3703806b50544e3f8fdd938e7f6f57c85 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Thu, 15 Oct 2015 18:45:11 +0800 Subject: [PATCH 11/12] Split allProjects to two Seqs to resolve the Tuple22 limitation --- project/SparkBuild.scala | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index bdd240d48cd42..920baa13b34a4 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -33,13 +33,17 @@ object BuildCommons { private val buildLocation = file(".").getAbsoluteFile.getParentFile - val allProjects@Seq(bagel, catalyst, core, graphx, hive, hiveThriftServer, mllib, repl, - sql, networkCommon, networkShuffle, streaming, streamingFlumeSink, streamingAkka, streamingFlume, streamingKafka, - streamingMqtt, streamingTwitter, streamingZeromq, launcher, unsafe, testTags) = + val nonStreamingProjects@Seq(bagel, catalyst, core, graphx, hive, hiveThriftServer, mllib, repl, + sql, networkCommon, networkShuffle, launcher, unsafe, testTags) = Seq("bagel", "catalyst", "core", "graphx", "hive", "hive-thriftserver", "mllib", "repl", - "sql", "network-common", "network-shuffle", "streaming", "streaming-flume-sink", - "streaming-akka", "streaming-flume", "streaming-kafka", "streaming-mqtt", "streaming-twitter", - "streaming-zeromq", "launcher", "unsafe", "test-tags").map(ProjectRef(buildLocation, _)) + "sql", "network-common", "network-shuffle", "launcher", "unsafe", "test-tags").map(ProjectRef(buildLocation, _)) + + val streamingProjects@Seq(streaming, streamingFlumeSink, streamingAkka, streamingFlume, streamingKafka, + streamingMqtt, streamingTwitter, streamingZeromq) = + Seq("streaming", "streaming-flume-sink", "streaming-akka", "streaming-flume", "streaming-kafka", + "streaming-mqtt", "streaming-twitter", "streaming-zeromq").map(ProjectRef(buildLocation, _)) + + val allProjects = nonStreamingProjects ++ streamingProjects val optionallyEnabledProjects@Seq(yarn, yarnStable, java8Tests, sparkGangliaLgpl, streamingKinesisAsl) = Seq("yarn", "yarn-stable", "java8-tests", "ganglia-lgpl", From 8bd3e3dc1c859dd3b02348be1cfc716d7b7ce3c7 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Thu, 15 Oct 2015 19:09:37 +0800 Subject: [PATCH 12/12] Fix the Spark version --- external/akka/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/akka/pom.xml b/external/akka/pom.xml index 0568a1bcda7db..bb09bbe922a3f 100644 --- a/external/akka/pom.xml +++ b/external/akka/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.5.0-SNAPSHOT + 1.6.0-SNAPSHOT ../../pom.xml