# Initializing StreamingContext


To initialize a Spark Streaming program, a StreamingContext object has to be created which is the main entry point of all Spark Streaming functionality.
A StreamingContext object can be created from a SparkContext object.

In [None]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

sc = SparkContext(master, appName)
ssc = StreamingContext(sc, 1)

## After a context is defined, you have to do the following.
Define the input sources by creating input DStreams.
Define the streaming computations by applying transformation and output operations to DStreams.
Start receiving data and processing it using streamingContext.start().
Wait for the processing to be stopped (manually or due to any error) using streamingContext.awaitTermination().
The processing can be manually stopped using streamingContext.stop().

## Points to remember:
Once a context has been started, no new streaming computations can be set up or added to it.
Once a context has been stopped, it cannot be restarted.
Only one StreamingContext can be active in a JVM at the same time.
stop() on StreamingContext also stops the SparkContext. To stop only the StreamingContext, set the optional parameter of stop() called stopSparkContext to false.
A SparkContext can be re-used to create multiple StreamingContexts, as long as the previous StreamingContext is stopped (without stopping the SparkContext) before the next StreamingContext is created.

# Define Input DStreams

#### How Directories are Monitored
Spark Streaming will monitor the directory dataDirectory and process any files created in that directory.

A simple directory can be monitored, such as "hdfs://namenode:8040/logs/". All files directly under such a path will be processed as they are discovered.
A POSIX glob pattern can be supplied, such as "hdfs://namenode:8040/logs/2017/*". Here, the DStream will consist of all files in the directories matching the pattern. That is: it is a pattern of directories, not of files in directories.
All files must be in the same data format.
A file is considered part of a time period based on its modification time, not its creation time.
Once processed, changes to a file within the current window will not cause the file to be reread. That is: updates are ignored.
The more files under a directory, the longer it will take to scan for changes — even if no files have been modified.
If a wildcard is used to identify directories, such as "hdfs://namenode:8040/logs/2016-*", renaming an entire directory to match the path will add the directory to the list of monitored directories. Only the files in the directory whose modification time is within the current window will be included in the stream.
Calling FileSystem.setTimes() to fix the timestamp is a way to have the file picked up in a later window, even if its contents have not changed.

In [None]:
hdfs_dir = 'hdfs://msbx5420-m/user/peter/spark_streaming_demo/'
lines = ssc.textFileStream(hdfs_dir)

# DStream Transformations

In [None]:
counts = lines.flatMap(lambda line: line.split(" "))\
              .map(lambda x: (x, 1))\
              .reduceByKey(lambda a, b: a+b)

In [None]:
# Output Operations on DStreams

In [None]:
counts.pprint()

# Start receiving data and processing it 

In [None]:
ssc.start()
ssc.awaitTermination()