Skip to content

Commit

Permalink
address comments, add missing file
Browse files Browse the repository at this point in the history
add code tab for design patterns sections.
  • Loading branch information
davies committed Oct 17, 2014
1 parent 7e4bb8a commit 3821c4d
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 8 deletions.
95 changes: 87 additions & 8 deletions docs/streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ from pyspark.streaming import StreamingContext

# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = new StreamingContext(sc, 1)
ssc = StreamingContext(sc, 1)
{% endhighlight %}

Using this context, we can create a DStream that represents streaming data from a TCP
Expand Down Expand Up @@ -525,6 +525,7 @@ section for more details.
</div>

After a context is defined, you have to do the follow steps.

1. Define the input sources.
1. Setup the streaming computations.
1. Start the receiving and procesing of data using `streamingContext.start()`.
Expand Down Expand Up @@ -608,7 +609,7 @@ methods for creating DStreams from files and Akka actors as input sources.
streamingContext.fileStream<keyClass, valueClass, inputFormatClass>(dataDirectory);
</div>
<div data-lang="python" markdown="1">
streamingContext.textFileStream(dataDirectory);
streamingContext.textFileStream(dataDirectory)
</div>
</div>

Expand Down Expand Up @@ -1034,7 +1035,7 @@ Currently, the following output operations are defined:
<table class="table">
<tr><th style="width:30%">Output Operation</th><th>Meaning</th></tr>
<tr>
<td> <b>print</b>() </td>
<td> <b>print</b>() (<b>pprint<b>() in Python)</td>
<td> Prints first ten elements of every batch of data in a DStream on the driver.
This is useful for development and debugging. </td>
</tr>
Expand Down Expand Up @@ -1077,41 +1078,102 @@ For this purpose, a developer may inadvertantly try creating a connection object
the Spark driver, but try to use it in a Spark worker to save records in the RDDs.
For example (in Scala),

<div class="codetabs">
<div data-lang="scala" markdown="1">

{% highlight scala %}
dstream.foreachRDD(rdd => {
val connection = createNewConnection() // executed at the driver
rdd.foreach(record => {
connection.send(record) // executed at the worker
})
})
{% endhighlight %}

</div>
<div data-lang="python" markdown="1">

{% highlight python %}
def sendRecord(rdd):
connection = createNewConnection() # executed at the driver
rdd.foreach(lambda record: connection.send(record))
connection.close()

dstream.foreachRDD(sendRecord)
{% endhighlight %}

This is incorrect as this requires the connection object to be serialized and sent from the driver to the worker. Such connection objects are rarely transferrable across machines. This error may manifest as serialization errors (connection object not serializable), initialization errors (connection object needs to be initialized at the workers), etc. The correct solution is to create the connection object at the worker.
</div>
</div>

This is incorrect as this requires the connection object to be serialized and sent from the driver to the worker. Such connection objects are rarely transferrable across machines. This error may manifest as serialization errors (connection object not serializable), initialization errors (connection object needs to be initialized at the workers), etc. The correct solution is to create the connection object at the worker.

- However, this can lead to another common mistake - creating a new connection for every record. For example,

<div class="codetabs">
<div data-lang="scala" markdown="1">

{% highlight scala %}
dstream.foreachRDD(rdd => {
rdd.foreach(record => {
val connection = createNewConnection()
connection.send(record)
connection.close()
})
})
{% endhighlight %}

</div>
<div data-lang="python" markdown="1">

{% highlight python %}
def sendRecord(record):
connection = createNewConnection()
connection.send(record)
connection.close()

dstream.foreachRDD(lambda rdd: rdd.foreach(sendRecord))
{% endhighlight %}

</div>
</div>

Typically, creating a connection object has time and resource overheads. Therefore, creating and destroying a connection object for each record can incur unnecessarily high overheads and can significantly reduce the overall throughput of the system. A better solution is to use `rdd.foreachPartition` - create a single connection object and send all the records in a RDD partition using that connection.
Typically, creating a connection object has time and resource overheads. Therefore, creating and destroying a connection object for each record can incur unnecessarily high overheads and can significantly reduce the overall throughput of the system. A better solution is to use `rdd.foreachPartition` - create a single connection object and send all the records in a RDD partition using that connection.

<div class="codetabs">
<div data-lang="scala" markdown="1">
{% highlight scala %}
dstream.foreachRDD(rdd => {
rdd.foreachPartition(partitionOfRecords => {
val connection = createNewConnection()
partitionOfRecords.foreach(record => connection.send(record))
connection.close()
})
})
{% endhighlight %}
</div>

<div data-lang="python" markdown="1">
{% highlight python %}
def sendPartition(iter):
connection = createNewConnection()
for record in iter:
connection.send(record)
connection.close()

dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
{% endhighlight %}
</div>
</div>

This amortizes the connection creation overheads over many records.
This amortizes the connection creation overheads over many records.

- Finally, this can be further optimized by reusing connection objects across multiple RDDs/batches.
One can maintain a static pool of connection objects than can be reused as
RDDs of multiple batches are pushed to the external system, thus further reducing the overheads.


<div class="codetabs">
<div data-lang="scala" markdown="1">
{% highlight scala %}
dstream.foreachRDD(rdd => {
rdd.foreachPartition(partitionOfRecords => {
// ConnectionPool is a static, lazily initialized pool of connections
Expand All @@ -1120,8 +1182,25 @@ For example (in Scala),
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
})
})
{% endhighlight %}
</div>

<div data-lang="python" markdown="1">
{% highlight python %}
def sendPartition(iter):
# ConnectionPool is a static, lazily initialized pool of connections
connection = ConnectionPool.getConnection()
for record in iter:
connection.send(record)
# return to the pool for future reuse
ConnectionPool.returnConnection(connection)

dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
{% endhighlight %}
</div>
</div>

Note that the connections in the pool should be lazily created on demand and timed out if not used for a while. This achieves the most efficient sending of data to external systems.
Note that the connections in the pool should be lazily created on demand and timed out if not used for a while. This achieves the most efficient sending of data to external systems.


##### Other points to remember:
Expand Down
10 changes: 10 additions & 0 deletions python/docs/pyspark.streaming.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
pyspark.streaming module
==================

Module contents
---------------

.. automodule:: pyspark.streaming
:members:
:undoc-members:
:show-inheritance:

0 comments on commit 3821c4d

Please sign in to comment.