Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-7799][Streaming][Document]Add the linking and deploying instructions for streaming-akka project #10856

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
81 changes: 44 additions & 37 deletions docs/streaming-custom-receivers.md
Original file line number Diff line number Diff line change
Expand Up @@ -257,54 +257,61 @@ The following table summarizes the characteristics of both types of receivers

## Implementing and Using a Custom Actor-based Receiver

<div class="codetabs">
<div data-lang="scala" markdown="1" >

Custom [Akka Actors](http://doc.akka.io/docs/akka/2.3.11/scala/actors.html) can also be used to
receive data. Extending [`ActorReceiver`](api/scala/index.html#org.apache.spark.streaming.akka.ActorReceiver)
allows received data to be stored in Spark using `store(...)` methods. The supervisor strategy of
this actor can be configured to handle failures, etc.
receive data. Here are the instructions.

{% highlight scala %}
1. **Linking:** You need to add the following dependency to your SBT or Maven project (see [Linking section](streaming-programming-guide.html#linking) in the main programming guide for further information).

class CustomActor extends ActorReceiver {
def receive = {
case data: String => store(data)
}
}
groupId = org.apache.spark
artifactId = spark-streaming-akka_{{site.SCALA_BINARY_VERSION}}
version = {{site.SPARK_VERSION_SHORT}}

// A new input stream can be created with this custom actor as
val ssc: StreamingContext = ...
val lines = AkkaUtils.createStream[String](ssc, Props[CustomActor](), "CustomReceiver")
2. **Programming:**

{% endhighlight %}
<div class="codetabs">
<div data-lang="scala" markdown="1" >

See [ActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala) for an end-to-end example.
</div>
<div data-lang="java" markdown="1">
You need to extend [`ActorReceiver`](api/scala/index.html#org.apache.spark.streaming.akka.ActorReceiver)
so as to store received data into Spark using `store(...)` methods. The supervisor strategy of
this actor can be configured to handle failures, etc.

Custom [Akka UntypedActors](http://doc.akka.io/docs/akka/2.3.11/java/untyped-actors.html) can also be used to
receive data. Extending [`JavaActorReceiver`](api/scala/index.html#org.apache.spark.streaming.akka.JavaActorReceiver)
allows received data to be stored in Spark using `store(...)` methods. The supervisor strategy of
this actor can be configured to handle failures, etc.
class CustomActor extends ActorReceiver {
def receive = {
case data: String => store(data)
}
}

{% highlight java %}
// A new input stream can be created with this custom actor as
val ssc: StreamingContext = ...
val lines = AkkaUtils.createStream[String](ssc, Props[CustomActor](), "CustomReceiver")

class CustomActor extends JavaActorReceiver {
@Override
public void onReceive(Object msg) throws Exception {
store((String) msg);
}
}
See [ActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala) for an end-to-end example.
</div>
<div data-lang="java" markdown="1">

// A new input stream can be created with this custom actor as
JavaStreamingContext jssc = ...;
JavaDStream<String> lines = AkkaUtils.<String>createStream(jssc, Props.create(CustomActor.class), "CustomReceiver");
You need to extend [`JavaActorReceiver`](api/scala/index.html#org.apache.spark.streaming.akka.JavaActorReceiver)
so as to store received data into Spark using `store(...)` methods. The supervisor strategy of
this actor can be configured to handle failures, etc.

{% endhighlight %}
class CustomActor extends JavaActorReceiver {
@Override
public void onReceive(Object msg) throws Exception {
store((String) msg);
}
}

See [JavaActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/JavaActorWordCount.scala) for an end-to-end example.
</div>
</div>
// A new input stream can be created with this custom actor as
JavaStreamingContext jssc = ...;
JavaDStream<String> lines = AkkaUtils.<String>createStream(jssc, Props.create(CustomActor.class), "CustomReceiver");

See [JavaActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/JavaActorWordCount.scala) for an end-to-end example.
</div>
</div>

3. **Deploying:** As with any Spark applications, `spark-submit` is used to launch your application.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make the list number here display 3, I added some indents in programming. Please use https://github.com/apache/spark/pull/10856/files?w=1 to review the content.

You need to package `spark-streaming-akka_{{site.SCALA_BINARY_VERSION}}` and its dependencies into
the application JAR. Make sure `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}`
are marked as `provided` dependencies as those are already present in a Spark installation. Then
use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide).

<span class="badge" style="background-color: grey">Python API</span> Since actors are available only in the Java and Scala libraries, AkkaUtils is not available in the Python API.