In [1]:
import os
import sys
os.environ["PYSPARK_SUBMIT_ARGS"]='--conf spark.sql.catalogImplementation=in-memory pyspark-shell'
os.environ["PYSPARK_PYTHON"]='/opt/anaconda/envs/bd9/bin/python'
os.environ["SPARK_HOME"]='/usr/hdp/current/spark2-client'

spark_home = os.environ.get('SPARK_HOME', None)
if not spark_home:
    raise ValueError('SPARK_HOME environment variable is not set')
sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.7-src.zip'))
exec(open(os.path.join(spark_home, 'python/pyspark/shell.py')).read())

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.3.2
      /_/

Using Python version 3.6.5 (default, Apr 29 2018 16:14:56)
SparkSession available as 'spark'.


In [4]:
spark

![big_picture](https://spark.apache.org/docs/latest/img/structured-streaming-stream-as-a-table.png)

![programming_model](https://spark.apache.org/docs/latest/img/structured-streaming-model.png)

The “Output” is defined as what gets written out to the external storage. The output can be defined in a different mode:

**Complete Mode** - The entire updated Result Table will be written to the external storage. It is up to the storage connector to decide how to handle writing of the entire table.

**Append Mode** - Only the new rows appended in the Result Table since the last trigger will be written to the external storage. This is applicable only on the queries where existing rows in the Result Table are not expected to change.

**Update Mode** - Only the rows that were updated in the Result Table since the last trigger will be written to the external storage (available since Spark 2.1.1). Note that this is different from the Complete Mode in that this mode only outputs the rows that have changed since the last trigger. If the query doesn’t contain aggregations, it will be equivalent to Append mode.

![quick_example](https://spark.apache.org/docs/latest/img/structured-streaming-example-model.png)

In [None]:
%pylab inline

In [None]:
plt.rcParams["figure.figsize"] = 15, 10

In [None]:
import seaborn as sns
sns.set_style("whitegrid")

In [None]:
!hdfs dfs -ls /user/pavel.klemenkov/lectures/lecture05/events|head

There are about 50 JSON files in the directory. Let's see what each JSON file contains.

In [None]:
!hdfs dfs -tail /user/pavel.klemenkov/lectures/lecture05/events/file-0.json

Each line in the file contains JSON record with two fields - `time` and `action`. Let's try to analyze these files interactively.

## Batch/Interactive Processing
The usual first step in attempting to process the data is to interactively query the data. Let's define a static DataFrame on the files, and give it a table name.

In [None]:
from pyspark.sql.types import *

inputPath = "/user/pavel.klemenkov/lectures/lecture05/events/"

# Since we know the data format already, let's define the schema to speed up processing (no need for Spark to infer schema)
jsonSchema = StructType(fields = [ 
    StructField("time", TimestampType(), True), 
    StructField("action", StringType(), True) 
])

# Static DataFrame representing data in the JSON files
staticInputDF = spark.read\
                     .schema(jsonSchema)\
                     .json(inputPath)

In [None]:
staticInputDF.show(5)

Now we can compute the number of "open" and "close" actions with one hour windows. To do this, we will group by the `action` column and 1 hour windows over the `time` column.

In [None]:
import pyspark.sql.functions as f

In [None]:
staticCountsDF = staticInputDF\
                  .groupBy(staticInputDF.action, f.window(staticInputDF.time, "1 hour"))\
                  .count()
        
staticCountsDF.cache()

# Register the DataFrame as table 'static_counts'
staticCountsDF.createOrReplaceTempView("static_counts")

Now we can directly use SQL to query the table. For example, here are the total counts across all the hours.

In [None]:
spark.sql("select action, sum(count) as total_count from static_counts group by action").show()

In [None]:
pdf = spark.sql("select action, sum(count) as total_count from static_counts group by action").toPandas()

In [None]:
sns.barplot(x="action", y="total_count", data=pdf)

How about a timeline of windowed counts?

In [None]:
query = """select action, date_format(window.end, "MMM-dd HH:mm") as time, count 
           from static_counts order by time, action
        """

spark.sql(query).show()

In [None]:
pdf = spark.sql(query).toPandas()

In [None]:
f = sns.barplot(x="time", y="count", hue="action", data=pdf)
for item in f.get_xticklabels():
    item.set_rotation(45)

Note the two ends of the graph. The close actions are generated such that they are after the corresponding open actions, so there are more "opens" in the beginning and more "closes" in the end.

## Stream Processing 
Now that we have analyzed the data interactively, let's convert this to a streaming query that continuously updates as data comes. Since we just have a static set of files, we are going to emulate a stream from them by reading one file at a time, in the chronological order they were created. The query we have to write is pretty much the same as the interactive query above.

In [None]:
import pyspark.sql.functions as f

In [None]:
# Similar to definition of staticInputDF above, just using `readStream` instead of `read`
streamingInputDF = spark.readStream\
                        .schema(jsonSchema)\
                        .option("maxFilesPerTrigger", 1)\
                        .json(inputPath)

In [None]:
# Same query as staticInputDF
streamingCountsDF = streamingInputDF\
                     .groupBy(streamingInputDF.action, f.window(streamingInputDF.time, "1 hour"))\
                     .count()

In [None]:
# Is this DF actually a streaming DF?
streamingCountsDF.isStreaming

As you can see, `streamingCountsDF` is a streaming Dataframe (`streamingCountsDF.isStreaming` was `true`). You can start streaming computation, by defining the sink and starting it. 
In our case, we want to interactively query the counts (same queries as above), so we will set the complete set of 1 hour counts to be in a in-memory table (note that this for testing purpose only in Spark 2.0).

In [None]:
streaming_query = streamingCountsDF.writeStream\
                                   .format("memory")\
                                   .queryName("counts")\
                                   .outputMode("complete")\
                                   .start()

`query` is a handle to the streaming query that is running in the background. This query is continuously picking up files and updating the windowed counts. 

Note the status of query in the above cell. The progress bar shows that the query is active. 
Furthermore, if you expand the `> counts` above, you will find the number of files they have already processed. 

Let's wait a bit for a few files to be processed and then interactively query the in-memory `counts` table.

In [None]:
query = """
    select action, date_format(window.end, "MMM-dd HH:mm") as time, count from counts order by time, action
"""

In [None]:
import time

In [None]:
from IPython.display import clear_output

In [None]:
while True:
    clear_output()
    spark.sql(query).show()
    time.sleep(3)

In [None]:
streaming_query.isActive

In [None]:
streaming_query.stop()

### Unsupported Operations

+ Multiple streaming aggregations (i.e. a chain of aggregations on a streaming DF) are not yet supported on streaming Datasets.

+ Limit and take first N rows are not supported on streaming Datasets.

+ Distinct operations on streaming Datasets are not supported.

+ Sorting operations are supported on streaming Datasets only after an aggregation and in Complete Output Mode.

+ Few types of outer joins on streaming Datasets are not supported. See the support matrix in the Join Operations section for more details.

In [None]:
spark.stop()