Skip to content

Commit

Permalink
[SPARK-7799][STREAMING][DOCUMENT] Add the linking and deploying instr…
Browse files Browse the repository at this point in the history
…uctions for streaming-akka project

Since `actorStream` is an external project, we should add the linking and deploying instructions for it.

A follow up PR of #10744

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #10856 from zsxwing/akka-link-instruction.
  • Loading branch information
zsxwing authored and tdas committed Jan 26, 2016
1 parent 08c781c commit cbd507d
Showing 1 changed file with 44 additions and 37 deletions.
81 changes: 44 additions & 37 deletions docs/streaming-custom-receivers.md
Original file line number Diff line number Diff line change
Expand Up @@ -257,54 +257,61 @@ The following table summarizes the characteristics of both types of receivers

## Implementing and Using a Custom Actor-based Receiver

<div class="codetabs">
<div data-lang="scala" markdown="1" >

Custom [Akka Actors](http://doc.akka.io/docs/akka/2.3.11/scala/actors.html) can also be used to
receive data. Extending [`ActorReceiver`](api/scala/index.html#org.apache.spark.streaming.akka.ActorReceiver)
allows received data to be stored in Spark using `store(...)` methods. The supervisor strategy of
this actor can be configured to handle failures, etc.
receive data. Here are the instructions.

{% highlight scala %}
1. **Linking:** You need to add the following dependency to your SBT or Maven project (see [Linking section](streaming-programming-guide.html#linking) in the main programming guide for further information).

class CustomActor extends ActorReceiver {
def receive = {
case data: String => store(data)
}
}
groupId = org.apache.spark
artifactId = spark-streaming-akka_{{site.SCALA_BINARY_VERSION}}
version = {{site.SPARK_VERSION_SHORT}}

// A new input stream can be created with this custom actor as
val ssc: StreamingContext = ...
val lines = AkkaUtils.createStream[String](ssc, Props[CustomActor](), "CustomReceiver")
2. **Programming:**

{% endhighlight %}
<div class="codetabs">
<div data-lang="scala" markdown="1" >

See [ActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala) for an end-to-end example.
</div>
<div data-lang="java" markdown="1">
You need to extend [`ActorReceiver`](api/scala/index.html#org.apache.spark.streaming.akka.ActorReceiver)
so as to store received data into Spark using `store(...)` methods. The supervisor strategy of
this actor can be configured to handle failures, etc.

Custom [Akka UntypedActors](http://doc.akka.io/docs/akka/2.3.11/java/untyped-actors.html) can also be used to
receive data. Extending [`JavaActorReceiver`](api/scala/index.html#org.apache.spark.streaming.akka.JavaActorReceiver)
allows received data to be stored in Spark using `store(...)` methods. The supervisor strategy of
this actor can be configured to handle failures, etc.
class CustomActor extends ActorReceiver {
def receive = {
case data: String => store(data)
}
}

{% highlight java %}
// A new input stream can be created with this custom actor as
val ssc: StreamingContext = ...
val lines = AkkaUtils.createStream[String](ssc, Props[CustomActor](), "CustomReceiver")

class CustomActor extends JavaActorReceiver {
@Override
public void onReceive(Object msg) throws Exception {
store((String) msg);
}
}
See [ActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala) for an end-to-end example.
</div>
<div data-lang="java" markdown="1">

// A new input stream can be created with this custom actor as
JavaStreamingContext jssc = ...;
JavaDStream<String> lines = AkkaUtils.<String>createStream(jssc, Props.create(CustomActor.class), "CustomReceiver");
You need to extend [`JavaActorReceiver`](api/scala/index.html#org.apache.spark.streaming.akka.JavaActorReceiver)
so as to store received data into Spark using `store(...)` methods. The supervisor strategy of
this actor can be configured to handle failures, etc.

{% endhighlight %}
class CustomActor extends JavaActorReceiver {
@Override
public void onReceive(Object msg) throws Exception {
store((String) msg);
}
}

See [JavaActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/JavaActorWordCount.scala) for an end-to-end example.
</div>
</div>
// A new input stream can be created with this custom actor as
JavaStreamingContext jssc = ...;
JavaDStream<String> lines = AkkaUtils.<String>createStream(jssc, Props.create(CustomActor.class), "CustomReceiver");

See [JavaActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/JavaActorWordCount.scala) for an end-to-end example.
</div>
</div>

3. **Deploying:** As with any Spark applications, `spark-submit` is used to launch your application.
You need to package `spark-streaming-akka_{{site.SCALA_BINARY_VERSION}}` and its dependencies into
the application JAR. Make sure `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}`
are marked as `provided` dependencies as those are already present in a Spark installation. Then
use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide).

<span class="badge" style="background-color: grey">Python API</span> Since actors are available only in the Java and Scala libraries, AkkaUtils is not available in the Python API.

0 comments on commit cbd507d

Please sign in to comment.