# Output Operations on DStreams

Output operations allow DStream’s data to be pushed out to external systems like a database or a file systems. Since the output operations actually allow the transformed data to be consumed by external systems, they trigger the actual execution of all the DStream transformations (similar to actions for RDDs). Currently, the following output operations are defined:

| Output Operation        | Meaning           |
|-------------:|:------------- |
| **pprint**()      | Prints the first ten elements of every batch of data in a DStream on the driver node running the streaming application. This is useful for development and debugging. |
| **saveAsTextFiles**(prefix, [suffix])     | Save this DStream's contents as text files. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]". |
| **foreachRDD**(func) | The most generic output operator that applies a function, func, to each RDD generated from the stream. This function should push the data in each RDD to an external system, such as saving the RDD to files, or writing it over the network to a database. Note that the function func is executed in the driver process running the streaming application, and will usually have RDD actions in it that will force the computation of the streaming RDDs. |



In the Java and Scala APIs for Spark, there are also the `saveAsObjectFiles` and `saveAsHadoopFiles`. Unfortunately, these are not available in the Python API


### Demo
The following is a demonstration of how to output RDDs. Once could easily use `statuses.saveAsTextFiles("Tweets", "txt")` to achieve this, but we will limit the number of tweets to 1000

In [None]:
import findspark
# TODO: your path will likely not have 'matthew' in it. Change it to reflect your path.
findspark.init('/home/matthew/spark-2.1.0-bin-hadoop2.7')

In [None]:
import pyspark
import pyspark.streaming
from pyspark.streaming import SparkContext
from pyspark.streaming import StreamingContext
import utils


# Configure Twitter credentials using twitter.txt
setupTwitter()
    
# Set up a Spark streaming context named "SaveTweets" that runs locally using
# all CPU cores and one-second batches of data
ssc = StreamingContext("local[*]", "SaveTweets", Seconds(1))
    
# Get rid of log spam (should be called after the context is set up)
setupLogging()

# Create a DStream from Twitter using our streaming context
tweets = TwitterUtils.createStream(ssc, None)
    
# Now extract the text of each status update into RDD's using map()
statuses = tweets.map(lambda status: status.getText())

totalTweets = int(0)
        
def twitterStatus(rdd, time):
    
    # Don't bother with empty batches
    if rdd.count() > 0:
    
        # Combine each partition's results into a single RDD:
        repartitionedRDD = rdd.repartition(1).cache()
        # And print out a directory with the results.
        repartitionedRDD.saveAsTextFile("Tweets_" + time.milliseconds.toString)
        
        # Stop once we've collected 1000 tweets.
        totalTweets += repartitionedRDD.count()
        print("Tweet count: " + totalTweets)
        if totalTweets > 1000:
            sys.exit(0)
    
statuses.foreachRDD(twitterStatus(rdd, time))
    
ssc.checkpoint("..checkpoint/")
ssc.start()
ssc.awaitTermination()
      

## References
1. https://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams
