Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ Apart from these, the following properties are also available, and may be useful
<td>(infinite)</td>
<td>
Duration (seconds) of how long Spark will remember any metadata (stages generated, tasks generated, etc.).
Periodic cleanups will ensure that metadata older than this duration will be forgetten. This is
Periodic cleanups will ensure that metadata older than this duration will be forgotten. This is
useful for running Spark for many hours / days (for example, running 24/7 in case of Spark Streaming
applications). Note that any RDD that persists in memory for more than this duration will be cleared as well.
</td>
Expand All @@ -471,8 +471,8 @@ Apart from these, the following properties are also available, and may be useful
<td>spark.streaming.blockInterval</td>
<td>200</td>
<td>
Duration (milliseconds) of how long to batch new objects coming from network receivers used
in Spark Streaming.
Interval (milliseconds) at which data received by Spark Streaming receivers is coalesced
into blocks of data before storing them in Spark.
</td>
</tr>
<tr>
Expand Down
6 changes: 3 additions & 3 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,10 @@ Note that on Windows, you need to set the environment variables on separate line
* [Shark](http://shark.cs.berkeley.edu): Apache Hive over Spark
* [Mailing Lists](http://spark.apache.org/mailing-lists.html): ask questions about Spark here
* [AMP Camps](http://ampcamp.berkeley.edu/): a series of training camps at UC Berkeley that featured talks and
exercises about Spark, Shark, Mesos, and more. [Videos](http://ampcamp.berkeley.edu/agenda-2012),
[slides](http://ampcamp.berkeley.edu/agenda-2012) and [exercises](http://ampcamp.berkeley.edu/exercises-2012) are
exercises about Spark, Shark, Spark Streaming, Mesos, and more. [Videos](http://ampcamp.berkeley.edu/3/),
[slides](http://ampcamp.berkeley.edu/3/) and [exercises](http://ampcamp.berkeley.edu/3/exercises/) are
available online for free.
* [Code Examples](http://spark.apache.org/examples.html): more are also available in the [examples subfolder](https://github.com/apache/spark/tree/master/examples/src/main/scala/) of Spark
* [Code Examples](http://spark.apache.org/examples.html): more are also available in the [examples subfolder](https://github.com/apache/spark/tree/master/examples/src/main/scala/org/apache/spark/) of Spark
* [Paper Describing Spark](http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf)
* [Paper Describing Spark Streaming](http://www.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-259.pdf)

Expand Down
273 changes: 183 additions & 90 deletions docs/streaming-custom-receivers.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,126 +3,219 @@ layout: global
title: Spark Streaming Custom Receivers
---

A "Spark Streaming" receiver can be a simple network stream, streams of messages from a message queue, files etc. A receiver can also assume roles more than just receiving data like filtering, preprocessing, to name a few of the possibilities. The api to plug-in any user defined custom receiver is thus provided to encourage development of receivers which may be well suited to ones specific need.
Spark Streaming can receive streaming data from any arbitrary data source beyond
the one's for which it has in-built support (that is, beyond Flume, Kafka, files, sockets, etc.).
This requires the developer to implement a *receiver* that is customized for receiving data from
the concerned data source. This guide walks through the process of implementing a custom receiver
and using it in a Spark Streaming application.

### Implementing a Custom Receiver

This starts with implementing a [Receiver](api/scala/index.html#org.apache.spark.streaming.receiver.Receiver).
A custom receiver must extend this abstract class by implementing two methods
- `onStart()`: Things to do to start receiving data.
- `onStop()`: Things to do to stop receiving data.

Note that `onStart()` and `onStop()` must not block indefinitely. Typically, onStart() would start the threads
that responsible for receiving the data and `onStop()` would ensure that the receiving by those threads
are stopped. The receiving threads can also use `isStopped()`, a `Receiver` method, to check whether they
should stop receiving data.

Once the data is received, that data can be stored inside Spark
by calling `store(data)`, which is a method provided by the
[Receiver](api/scala/index.html#org.apache.spark.streaming.receiver.Receiver) class.
There are number of flavours of `store()` which allow you store the received data
record-at-a-time or as whole collection of objects / serialized bytes.

Any exception in the receiving threads should be caught and handled properly to avoid silent
failures of the receiver. `restart(<exception>)` will restart the receiver by
asynchronously calling `onStop()` and then calling `onStart()` after a delay.
`stop(<exception>)` will call `onStop()` and terminate the receiver. Also, `reportError(<error>)`
reports a error message to the driver (visible in the logs and UI) without stopping / restarting
the receiver.

The following is a custom receiver that receives a stream of text over a socket. It treats
'\n' delimited lines in the text stream as records and stores them with Spark. If the receiving thread
has any error connecting or receiving, the receiver is restarted to make another attempt to connect.

<div class="codetabs">
<div data-lang="scala" markdown="1" >

This guide shows the programming model and features by walking through a simple sample receiver and corresponding Spark Streaming application.
{% highlight scala %}

### Writing a Simple Receiver
class CustomReceiver(host: String, port: Int)
extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {

def onStart() {
// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
override def run() { receive() }
}.start()
}

def onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself isStopped() returns false
}

/** Create a socket connection and receive data until receiver is stopped */
private def receive() {
var socket: Socket = null
var userInput: String = null
try {
// Connect to host:port
socket = new Socket(host, port)

// Until stopped or connection broken continue reading
val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"))
userInput = reader.readLine()
while(!isStopped && userInput != null) {
store(userInput)
userInput = reader.readLine()
}
reader.close()
socket.close()

// Restart in an attempt to connect again when server is active again
restart("Trying to connect again")
} catch {
case e: java.net.ConnectException =>
// restart if could not connect to server
restart("Error connecting to " + host + ":" + port, e)
case t: Throwable =>
// restart if there is any other error
restart("Error receiving data", t)
}
}
}

This starts with implementing [NetworkReceiver](api/scala/index.html#org.apache.spark.streaming.dstream.NetworkReceiver).
{% endhighlight %}

The following is a simple socket text-stream receiver.
</div>
<div data-lang="java" markdown="1">

{% highlight java %}

public class JavaCustomReceiver extends Receiver<String> {

String host = null;
int port = -1;

public JavaCustomReceiver(String host_ , int port_) {
super(StorageLevel.MEMORY_AND_DISK_2());
host = host_;
port = port_;
}

public void onStart() {
// Start the thread that receives data over a connection
new Thread() {
@Override public void run() {
receive();
}
}.start();
}

public void onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself isStopped() returns false
}

/** Create a socket connection and receive data until receiver is stopped */
private void receive() {
Socket socket = null;
String userInput = null;

try {
// connect to the server
socket = new Socket(host, port);

BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));

// Until stopped or connection broken continue reading
while (!isStopped() && (userInput = reader.readLine()) != null) {
System.out.println("Received data '" + userInput + "'");
store(userInput);
}
reader.close();
socket.close();

// Restart in an attempt to connect again when server is active again
restart("Trying to connect again");
} catch(ConnectException ce) {
// restart if could not connect to server
restart("Could not connect", ce);
} catch(Throwable t) {
// restart if there is any other error
restart("Error receiving data", t);
}
}
}

{% highlight scala %}
class SocketTextStreamReceiver(host: String, port: Int)
extends NetworkReceiver[String]
{
protected lazy val blocksGenerator: BlockGenerator =
new BlockGenerator(StorageLevel.MEMORY_ONLY_SER_2)

protected def onStart() = {
blocksGenerator.start()
val socket = new Socket(host, port)
val dataInputStream = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"))
var data: String = dataInputStream.readLine()
while (data != null) {
blocksGenerator += data
data = dataInputStream.readLine()
}
}

protected def onStop() {
blocksGenerator.stop()
}
}
{% endhighlight %}

</div>
</div>

All we did here is extended NetworkReceiver and called blockGenerator's API method (i.e. +=) to push our blocks of data. Please refer to scala-docs of NetworkReceiver for more details.

### Using the custom receiver in a Spark Streaming application

### An Actor as Receiver
The custom receiver can be used in a Spark Streaming application by using
`streamingContext.receiverStream(<instance of custom receiver>)`. This will create
input DStream using data received by the instance of custom receiver, as shown below

This starts with implementing [Actor](#References)

Following is a simple socket text-stream receiver, which is appearently overly simplified using Akka's socket.io api.
<div class="codetabs">
<div data-lang="scala" markdown="1" >

{% highlight scala %}
class SocketTextStreamReceiver (host:String,
port:Int,
bytesToString: ByteString => String) extends Actor with Receiver {

override def preStart = IOManager(context.system).connect(host, port)

def receive = {
case IO.Read(socket, bytes) => pushBlock(bytesToString(bytes))
}

}
// Assuming ssc is the StreamingContext
val customReceiverStream = ssc.receiverStream(new CustomReceiver(host, port))
val words = lines.flatMap(_.split(" "))
...
{% endhighlight %}

All we did here is mixed in trait Receiver and called pushBlock api method to push our blocks of data. Please refer to scala-docs of Receiver for more details.

### A Sample Spark Application
The full source code is in the example [CustomReceiver.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala).

* First create a Spark streaming context with master url and batchduration.
</div>
<div data-lang="java" markdown="1">

{% highlight scala %}
val ssc = new StreamingContext(master, "WordCountCustomStreamSource",
Seconds(batchDuration))
{% highlight java %}
// Assuming ssc is the JavaStreamingContext
JavaDStream<String> customReceiverStream = ssc.receiverStream(new JavaCustomReceiver(host, port));
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { ... });
...
{% endhighlight %}

* Plug-in the custom receiver into the spark streaming context and create a DStream.
The full source code is in the example [JavaCustomReceiver.java](https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/streaming/examples/JavaCustomReceiver.java).

{% highlight scala %}
val lines = ssc.networkStream[String](new SocketTextStreamReceiver(
"localhost", 8445))
{% endhighlight %}
</div>
</div>

* OR Plug-in the actor as receiver into the spark streaming context and create a DStream.

{% highlight scala %}
val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
"localhost",8445, z => z.utf8String)),"SocketReceiver")
{% endhighlight %}

* Process it.
### Implementing and Using a Custom Actor-based Receiver

{% highlight scala %}
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
Custom [Akka Actors](http://doc.akka.io/docs/akka/2.2.4/scala/actors.html) can also be used to
receive data. The [`ActorHelper`](api/scala/index.html#org.apache.spark.streaming.receiver.ActorHelper)
trait can be applied on any Akka actor, which allows received data to be stored in Spark using
`store(...)` methods. The supervisor strategy of this actor can be configured to handle failures, etc.

wordCounts.print()
ssc.start()
{% highlight scala %}
class CustomActor extends Actor with ActorHelper {
def receive = {
case data: String => store(data)
}
}
{% endhighlight %}

* After processing it, stream can be tested using the netcat utility.

$ nc -l localhost 8445
hello world
hello hello


## Multiple Homogeneous/Heterogeneous Receivers.

A DStream union operation is provided for taking union on multiple input streams.
And a new input stream can be created with this custom actor as

{% highlight scala %}
val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
"localhost",8445, z => z.utf8String)),"SocketReceiver")

// Another socket stream receiver
val lines2 = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
"localhost",8446, z => z.utf8String)),"SocketReceiver")

val union = lines.union(lines2)
// Assuming ssc is the StreamingContext
val lines = ssc.actorStream[String](Props(new CustomActor()), "CustomReceiver")
{% endhighlight %}

Above stream can be easily process as described earlier.

_A more comprehensive example is provided in the spark streaming examples_
See [ActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala)
for an end-to-end example.

## References

1.[Akka Actor documentation](http://doc.akka.io/docs/akka/2.0.5/scala/actors.html)
2.[NetworkReceiver](api/scala/index.html#org.apache.spark.streaming.dstream.NetworkReceiver)
Loading