# Lesson 38 - Structured Streaming

In [0]:
import time
import pandas as pd
import matplotlib.pyplot as plt
from pyspark.sql.functions import col, expr

## Retail Data

We will demonstrate the use of Structured Streaming in Spark by exploring a dataset relating to purchases made with an online retailer. This dataset is stored in several CSV files located in the directory `/FileStore/tables/retail/`. Each file contains information about a single day of purchases. To get a sense as to what our data looks like, and to determine the schema we should use, we will first read the contents of a single file into a static DataFrame.

In [0]:
static_df = (
    spark.read
    .option('header', True)
    .option('inferSchema', True)
    .csv('/FileStore/tables/retail/2010_12_01.csv')
)

print(static_df.count())

retail_schema = static_df.schema

static_df.show(5, truncate=False)

## Creating a Streaming DataFrame

Spark can read streaming data from a variety of sources. We will discuss some of the available streaming sources in a later lesson, but for now we will consider streams in which new data is periodically written to new files within a directory to which the cluster has access. 

The process of creating a streaming DataFrame from a file source is quite similar to the process used to create a typical static DataFrame from a data file. The primary difference is that where you would use `spark.read` to create a static DataFrame, you would use `spark.readStream` to create a Streaming DataFrame. As is the case when creating a static DataFrame, we can use `option()` to set the delimiter, indicate whether or not the files contain a header, and to set other options. We can also define a schema using `schema()`. The path to the directory that will contain the incoming files can be specified using the `csv()` method.

When creating a streaming DataFrame, we can also control how Spark schedules the processing of new data. A **trigger** defines when Spark should check for new data coming into the stream. By default, Spark will trigger the processing of a new micro-batch of data as soon as it finishes process the current micro-batch. This results in the lowest latency, but could incur significant overhead as a result of processing many small micro-batches. Later in this lesson, we discuss how to specify a minimum amount of time that Spark should wait between processing two micro-batches. Increasing the processing time will increase the latency of our streaming application, but could also increase the throughput by decreasing the amount of overhead required.

In the examples we consider in this course, we will also set `option('maxFilesPerTrigger', 1)`. This tells Spark to process only one file at a time, regardless of how many have been written to the directory since the last trigger. In the examples we consider, there is no external process that is writing files to a directory. All of the files are present in the directory from the beginning. We set this option to simulate the process of files being periodically added to the directory.

In [0]:
retail_stream = (
    spark.readStream
    .option('header', True)
    .option('maxFilesPerTrigger', 1)
    .schema(retail_schema)
    .csv('/FileStore/tables/retail/')
)

We will now print the type of `retail_stream` to see that it is simply a DataFrame. We will also print its `isStreaming` attribute to confirm that Spark understands that this DataFrame was defined using a streaming source.

In [0]:
print(type(retail_stream))
print(retail_stream.isStreaming)

## Writing Data to a Sink

There is essentially only one action that can be performed on a streaming DataFrame. That is to start the stream, instructing it to write the results to some destination, or **sink**. This is accomplished by creating a `DataStreamWriter` object. We can create such an object by accessing the `writeStream` attribute of a streaming DataFrame and then setting options to specify how and to where the data is to be written. 

When creating a `DataStreamWriter`, we need to specify a sink to which the results should be written. We will discuss the types of sinks supported by Structured Streaming in a later lesson. For now, we will simply write the results of our transformations to an in-memory table against which we can submit SQL queries. This is specified using `format('memory')`. We will also need to use `queryName()` to specify the name of the in-memory table to which the results are being written.

Next, we need to specify an output model. Spark Streaming supports three output modes: Append, Complete, and Update. 

- **Append Mode.** When using this mode, rows created when processing a new micro-batch will simply be added to the table containing the results. Once a row is written to the output table, it will never to altered. This mode is most useful when we wish to write all of the data from the stream to a table without applying any aggregations. 
- **Complete Mode.** In this mode, all of the records of the output are rewritten to the sink after every new micro-batch. This mode is useful when we are applying grouping and aggregation operations on the contents of the stream. 
- **Update Mode.** When using update model, only the records whose values are affected by the data from the most recent batch are updated. 

You can specify the output mode using the `outputMode()` method, passing it one of the following strings: `'append'`, `'complete'`, or `'update'`. 

We can also use the `trigger()` method to specify that Spark should wait some set amount of time between triggers. For example, if we want to force Structure Streaming to wait at least 10 seconds between micro-batches, we could specify `.trigger(processingTime='10 seconds')` when creating the `DataStreamWriter`.

To start the stream, you call the `start()` method of the `DataStreamWriter`. This will return a `StreamingQuery` object that can be used to monitor and the status of the stream, and to stop it.  We can stop the stream at any point by calling the `stop()` method of the `StreamingQuery` object.

In [0]:
writer = (
    retail_stream
    .writeStream
    .trigger(processingTime='2 seconds') 
    .format('memory')
    .queryName('retail_data')
    .outputMode('append')
)

query = writer.start()

In [0]:
print(type(writer))
print(type(query))
print(query.isActive)

Since we are writing the results of our queries to an in-memory table, we can use `spark.sql` to view the contents of our output at any time when the stream is running.

In [0]:
for i in range(16):
    df = spark.sql('SELECT * from retail_data')
    print(df.count())
    time.sleep(1)

In [0]:
spark.sql('select * from retail_data ORDER BY InvoiceDate DESC').show(10)

In [0]:
query.stop()

## Transformations on Streaming DataFrame

In the previous example, we wrote all of the data received over our stream to a new source in its raw form. However, we will typically want to apply some sort of processing to the streaming data so that we can extract meaningful information from the data being received. One advantage of Spark Structured Streaming is that we can apply most of the standard DataFrame transformations that we are already familiar with to a streaming DataFrame with no modifications. When a transformation is applied to a streaming DataFrame, the resulting DataFrame will also be streaming. 

For the sake of completeness of this example, we will recreate our original streaming DataFrame in the cell below.

In [0]:
retail_stream = (
    spark.readStream
    .option('header', True)
    .option('maxFilesPerTrigger', 1)
    .schema(retail_schema)
    .csv('/FileStore/tables/retail/')
)

print(retail_stream.isStreaming)

In the cell below, we will apply several transformations to our streaming DataFrame. Notice that each of these transformations could have been applied to a static version of the data with no changes. Also note that Spark indicates that the DataFrame produced by these transformations is also streaming.

In [0]:
customer_summary = (
    retail_stream
    .withColumn('total_cost', expr('UnitPrice * Quantity'))
    .groupBy('customerID')
    .agg(
        expr('COUNT(*) AS num_purchases'),
        expr('ROUND(SUM(total_cost),2) AS total_spent'),
        expr('ROUND(MEAN(total_cost),2) AS avg_spent')
    )
    .sort('total_spent', ascending=False)
)

print(type(customer_summary))
print(customer_summary.isStreaming)

We will now create our `DataStreamWriter` and will start the stream. Again, we will write the results of the transformations to an in-memory sink.

In [0]:
writer = (
    customer_summary
    .writeStream
    .format('memory')
    .queryName('customer_summary')
    .outputMode('complete')
)

query = writer.start()

In [0]:
print(query.isActive)

We will now view the results that have been written to the in-memory sink.

In [0]:
print(spark.sql('SELECT * from customer_summary').count())
spark.sql('SELECT * from customer_summary').show(10, truncate=False)

We can use the data that has been written to our sink to create plots. The contents of these plots will change each time these cells are executed.

In [0]:
pdf = spark.sql('SELECT * from customer_summary WHERE customerID IS NOT NULL').toPandas()
pdf = pdf.sort_values('total_spent', ascending=False)

plt.figure(figsize=[4,6])
plt.barh(pdf.customerID[20::-1].astype(int).astype(str), 
         pdf.total_spent[20::-1], 
         color='cornflowerblue',
         edgecolor='k'
        )
plt.title('Amount Spent by Customer (Top 20)')
plt.show()

In [0]:
query.stop()

## Multiple Queries on Same Stream

It is possible to define multiple `DataStreamWriters` and multiple `StreamingQueries` on the same streaming DataFrame. This can be useful if we want to apply different types of grouping and aggregation transformations to the same stream.

In [0]:
retail_stream = (
    spark.readStream
    .option('header', True)
    .option('maxFilesPerTrigger', 1)
    .schema(retail_schema)
    .csv('/FileStore/tables/retail/')
)


In the cell below, we define two new DataFrames by applying different sets of transformations to our streaming DataFrame.

In [0]:
group_by_product = (
    retail_stream
    .groupBy('StockCode')
    .agg(
        expr('COUNT(*) AS num_orders'),
        expr('SUM(Quantity) AS units_sold')
    )
    .sort('num_orders', ascending=False)
)

group_by_country = (
    retail_stream
    .withColumn('total_cost', expr('UnitPrice * Quantity'))
    .groupBy('Country')
    .agg(
        expr('COUNT(*) AS num_orders'),
        expr('ROUND(SUM(total_cost),2) AS total_spent'),
        expr('ROUND(MEAN(total_cost),2) AS avg_spent')
    )
    .sort('total_spent', ascending=False)
)

Next, we will create a different `DataStreamWriter` and `StreamingQuery` object for each of the two transformed DataFrames.

In [0]:
product_writer = (
    group_by_product
    .writeStream
    .format('memory')
    .queryName('products')
    .outputMode('complete')
)

country_writer = (
    group_by_country
    .writeStream
    .format('memory')
    .queryName('countries')
    .outputMode('complete')
)

product_query = product_writer.start()
country_query = country_writer.start()

In [0]:
print(product_query.isActive)
print(country_query.isActive)

In [0]:
print(spark.sql('SELECT * from products').count())
spark.sql('SELECT * from products ').show(10, truncate=False)

In [0]:
print(spark.sql('SELECT * from countries').count())
spark.sql('SELECT * from countries').show(10, truncate=False)

In [0]:
product_query.stop()
country_query.stop()