# Advanced Certification Program in Computational Data Science
## A program by IISc and TalentSprint
### Assignment 8: Spark Streaming


## Learning Objectives
At the end of the experiment, you will be able to :

* understand Spark Structured Streaming pipeline and implement it using Pyspark.
* understand Structured Streaming Query along with steps involved.

## Information

### Streaming data :
Streaming data is data that is continuously generated by different sources such as applications, networking devices, server log files, website activity, banking transactions and location data. Such data should be processed incrementally using Stream Processing techniques without having access to all of the data. Besides, it should be considered that concept drift may happen in the data which means that the properties of the stream may change over time.
It is usually used in the context of big data in which it is generated by many different sources at high speed.

To know more about streaming data click [here](https://aws.amazon.com/streaming-data/).


### Examples of streaming data
* IoT/Sensor generated data at certain time intervals in vehicles, industrial equipments, and farm machineries can be monitored to check performance and detect defects in advance.
* Continuous changing stock market value can be tracked for the financial institution in real-time.
* A media publisher streams billions of clickstream records from its online properties, aggregates and enriches the data with demographic information about users, and optimizes content placement on its site, delivering relevancy and a better experience to its audience.

### Setup Steps:

#### Install Pyspark

In [4]:
!pip -qq install pyspark

[K     |████████████████████████████████| 281.4 MB 28 kB/s 
[K     |████████████████████████████████| 198 kB 48.3 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


#### Importing required packages

In [5]:
import os
import time
import random
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

#### Starting the Spark Session

In [6]:
spark = SparkSession.builder.master("local[*]").getOrCreate()

#### Reading Data



In [7]:
df=spark.read.csv('TempHumi.csv',header=True,inferSchema=True)

In [8]:

df.columns

['Date', 'Temperatur (C)', 'Humidity (%)', 'Hour']

In [9]:

df.head(2)

[Row(Date='12/12/2017 12:00:00 AM IST', Temperatur (C)=16.1, Humidity (%)=72.092, Hour=0),
 Row(Date='12/12/2017 12:05:00 AM IST', Temperatur (C)=16.0, Humidity (%)=72.092, Hour=0)]

In [10]:
df.select('Date').dtypes # Checking data type of any column

[('Date', 'string')]

In [11]:
df.groupBy("Hour").count().show(24)

+----+-----+
|Hour|count|
+----+-----+
|  12|   12|
|  22|   12|
|   1|   12|
|  13|   12|
|   6|   12|
|  16|   12|
|   3|   11|
|  20|   12|
|   5|   12|
|  19|   12|
|  15|   12|
|   9|   12|
|  17|   12|
|   4|   13|
|   8|   12|
|  23|   12|
|   7|   12|
|  10|   12|
|  21|   12|
|  11|   12|
|  14|   12|
|   2|   12|
|   0|   12|
|  18|   12|
+----+-----+



We can now save the output of that job by filtering on each step and saving it to a separate file

In [12]:
!mkdir HourFolder  ## Making a folder on current directory

In [13]:
steps = df.select("Hour").distinct().collect()
# .distinct() function --> is used to drop/remove the duplicate rows (all columns) from dataframe.
# .Collect (Action) - Return all the elements of the dataset as a list at the driver program.
#  This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.
#  Calling collect() on an RDD will return the entire dataset to the driver which can cause out of memory and thus we should avoid it for a large dataset.

In [17]:

print(steps)
print(type(steps))
print(steps[0])
print(steps[0][0])

[Row(Hour=12), Row(Hour=22), Row(Hour=1), Row(Hour=13), Row(Hour=6), Row(Hour=16), Row(Hour=3), Row(Hour=20), Row(Hour=5), Row(Hour=19), Row(Hour=15), Row(Hour=9), Row(Hour=17), Row(Hour=4), Row(Hour=8), Row(Hour=23), Row(Hour=7), Row(Hour=10), Row(Hour=21), Row(Hour=11), Row(Hour=14), Row(Hour=2), Row(Hour=0), Row(Hour=18)]
<class 'list'>
Row(Hour=12)
12


In [19]:
df_test = df.where(f"Hour={steps[0][0]}")
df_test.show()
type(df_test)

+--------------------+--------------+------------+----+
|                Date|Temperatur (C)|Humidity (%)|Hour|
+--------------------+--------------+------------+----+
|12/12/2017 12:00:...|     19.900002|   69.742004|  12|
|12/12/2017 12:05:...|         19.95|      68.892|  12|
|12/12/2017 12:10:...|         20.05|    69.04201|  12|
|12/12/2017 12:15:...|         20.05|      68.942|  12|
|12/12/2017 12:20:...|          20.3|      69.342|  12|
|12/12/2017 12:25:...|         20.35|      69.092|  12|
|12/12/2017 12:30:...|     20.400002|      68.842|  12|
|12/12/2017 12:35:...|         20.45|      68.642|  12|
|12/12/2017 12:40:...|          20.5|   68.742004|  12|
|12/12/2017 12:45:...|         20.55|      68.592|  12|
|12/12/2017 12:50:...|     20.599998|   68.242004|  12|
|12/12/2017 12:55:...|     20.599998|    68.29201|  12|
+--------------------+--------------+------------+----+



pyspark.sql.dataframe.DataFrame

###  Creating the streaming version of this input:
Implementing the above steps in the loop and making individual csv files for each hour step. These files are saved in a folder as given below. We will use this folder as a source of incoming stream data and, we will read each file one by one as if it was a stream.

In [20]:
for step in steps:
    df1 = df.where(f"Hour={step[0]}")
    df1.coalesce(1).write.csv(path='/content/HourFolder/Hourly_Record',header="true",mode="append")

In [21]:
Hourly_Record_lisy = [i for i in os.listdir("/content/HourFolder/Hourly_Record/") if i.endswith(".csv")]
Hourly_Record_lisy

['part-00000-62c4d289-8cdb-4f1a-acc5-f042181ada55-c000.csv',
 'part-00000-8639f2eb-ac27-4acf-bc46-a9b7e1f9b7d5-c000.csv',
 'part-00000-08b7f058-735d-4c74-aa57-c790fee52f31-c000.csv',
 'part-00000-142b0277-08d6-4af9-afc6-3c5a13285434-c000.csv',
 'part-00000-997a99f9-421c-48f7-8b84-cc04f1749e8c-c000.csv',
 'part-00000-cb534fa7-a14d-45a1-b7a2-7e8825a7d203-c000.csv',
 'part-00000-109e6e33-4c87-469b-88e6-2bae792da530-c000.csv',
 'part-00000-0f45a3fb-a8c4-4899-b34a-58362152f3c3-c000.csv',
 'part-00000-76158d90-aacf-4b6a-a6c0-e3d0ff4c49dd-c000.csv',
 'part-00000-b5d68ea9-8010-4994-96f2-8ab4caf8a322-c000.csv',
 'part-00000-8f1a11d7-cd25-4f60-8e84-34540f01ea67-c000.csv',
 'part-00000-74a23005-cb3c-4550-8f1f-abcba822349c-c000.csv',
 'part-00000-c899a497-e8e4-41e4-b936-b1078545df29-c000.csv',
 'part-00000-bec550d9-72a5-4f21-a290-6390afe09733-c000.csv',
 'part-00000-238bc993-abb7-4eb2-8c0f-e6381b543b19-c000.csv',
 'part-00000-2fd75edc-831d-4a63-b98c-da88f4108daa-c000.csv',
 'part-00000-60073358-27

#### Checking any one file from above :

In [22]:

Hourly_Record_lisy[4]

'part-00000-997a99f9-421c-48f7-8b84-cc04f1749e8c-c000.csv'

In [34]:

part = spark.read.csv("/content/HourFolder/Hourly_Record/part-00000-997a99f9-421c-48f7-8b84-cc04f1749e8c-c000.csv",header=True,inferSchema=True)
part.show()

+--------------------+--------------+------------+----+
|                Date|Temperatur (C)|Humidity (%)|Hour|
+--------------------+--------------+------------+----+
|12/12/2017 12:00:...|          16.1|      72.092|   0|
|12/12/2017 12:05:...|          16.0|      72.092|   0|
|12/12/2017 12:10:...|          15.9|      72.092|   0|
|12/12/2017 12:15:...|         15.85|    72.04201|   0|
|12/12/2017 12:20:...|         15.85|      72.342|   0|
|12/12/2017 12:25:...|         15.85|      72.192|   0|
|12/12/2017 12:30:...|     15.799999|      72.392|   0|
|12/12/2017 12:35:...|     15.799999|    72.54201|   0|
|12/12/2017 12:40:...|     15.799999|      72.642|   0|
|12/12/2017 12:45:...|     15.799999|      72.642|   0|
|12/12/2017 12:50:...|         15.75|      72.892|   0|
|12/12/2017 12:55:...|         15.75|      73.192|   0|
+--------------------+--------------+------------+----+



### The Programming Model of Structured Streaming
“Table” is a well-known concept that developers are familiar with when building
batch applications. Structured Streaming extends this concept to streaming applications by treating a stream as an unbounded, continuously appended table, as illustrated in Figure.

![data_stream_unbounded%20table.PNG](https://cdn.iisc.talentsprint.com/CDS/Images/data_stream_unbounded_table.PNG)

Every new record received in the data stream is like a new row being appended to the unbounded input table. This leads to a new stream processing model that is very similar to a batch processing model. We will express our streaming computation as a standard batch-like query as on a static table, and Spark runs it as an incremental query on the unbounded input table. Let’s understand this model in more detail. A query on the input will generate the “Result Table” as given in the figure below. Every trigger interval (say, every 1 second), new rows get appended to the Input Table, which eventually updates the Result Table. Whenever the result table gets updated, we would want to write the changed result rows to an external sink.

![spark%20stream.PNG](https://cdn.iisc.talentsprint.com/CDS/Images/spark_stream.PNG)

The “Output” is defined as what gets written out to the external storage. The output can be defined in a different mode:

Complete Mode - The entire updated Result Table will be written to the external storage. It is up to the storage connector to decide how to handle the writing of the entire table.

Append Mode - Only the new rows appended in the Result Table since the last trigger will be written to the external storage. This is applicable only on the queries where existing rows in the Result Table are not expected to change.

Update Mode - Only the rows that were updated in the Result Table since the last trigger will be written to the external storage (available since Spark 2.1.1). Note that this is different from the Complete Mode in that this mode only outputs the rows that have changed since the last trigger. If the query doesn’t contain aggregations, it will be equivalent to Append mode.

Note that each mode is applicable on certain types of queries.

### Five Steps to Define a Streaming Query
We have prepared our data as if it is coming from a continuous streaming source now in this section, we will explore the steps involved in defining a streaming query.
#### Step 1: Define input sources
*  The first step is to define a DataFrame from a streaming source. For streaming sources, we need spark .readStream to create a DataStreamReader.
* Apache Spark natively supports reading data streams from Apache Kafka and all the various file-based formats that DataFrameReader supports (Parquet, ORC, JSON, etc.).
* We can think of the folder containing 24 csv files as the Kafka source feeding to spark streaming.

In [35]:
part.schema # checking the part schema

StructType(List(StructField(Date,StringType,true),StructField(Temperatur (C),DoubleType,true),StructField(Humidity (%),DoubleType,true),StructField(Hour,IntegerType,true)))

In [36]:
streaming = (spark.readStream.schema(part.schema).option('maxFilesPerTrigger',1).csv('HourFolder/Hourly_Record/'))
# maxFilesPerTrigger: maximum number of new files to be considered in every trigger, here taken 1.
# File source - Reads files written in a directory as a stream of data, here directory: HourFolder/Hourly_Record.
# Supported file formats are text, CSV, JSON, ORC, Parquet, here is CSV.
# Files will be processed in the order of file modification time. 

In [37]:

type(streaming)

pyspark.sql.dataframe.DataFrame

#### Step 2: Setting  Up Transformation:


In [38]:
Hourly_Mean_Value = streaming.groupBy('Hour').mean("Temperatur (C)","Humidity (%)").orderBy(F.desc("Hour"))

* Hourly_Mean_Value is a streaming DataFrame (that is, a DataFrame on unbounded, streaming data) that represents the running mean that will be computed once the streaming query is started and the streaming input data is being continuously processed.
* Now that we have our transformation, we need to specify an output sink for the results. for this example, we are going to write to a memory sink that keeps the results in memory.
* We also need to define how spark will output that data. In this example, we will use the complete output mode (rewriting all of the keys along with their counts after every trigger).
* In this example, we will not include activity query.awaitTermination() because it is required only to prevent the driver process from terminating when the stream is active. So to be able to run this locally in a notebook we will not include it.

#### Step 3: Define output sink and output mode

In [39]:
writer = (Hourly_Mean_Value.writeStream.queryName("Temp_Humi_Mean").format("memory").outputMode("complete").start())
# Memory sink (for debugging) - The output is stored in memory as an in-memory table.
# Both, Append and Complete output modes, are supported. 
# This should be used for debugging purposes on low data volumes.
# as the entire output is collected and stored in the driver’s memory. Hence, use it with caution.
# Have all the aggregates in an in-memory table. The query name will be the table name, here: "Temp_Humi_Mean".
# Note that we have to call start() to start the execution of the query. 
# This returns a StreamingQuery object which is a handle to the continuously running execution. 

#### Step 4: Specifying processing details:
 * Triggering details: This indicates when to trigger the discovery and processing of newly available streaming data. 
 * Checkpoint location: This option is necessary for failure recovery in the real application.
 
 For sake of simplicity, these parameters are not set by us here. By default trigger --> The streaming query executes data in micro-batches where the next micro-batch is triggered as soon as the previous micro-batch has been completed. 


#### Step 5: Start the query :
* Once everything has been specified, the final step is to start the query, already done in the above cell by .start(). We have created a table with --> .queryName ("Temp_Humi_Mean") which is updated according to trigger.
* This is a StreamingQuery object which is a handle to the continuously running execution. We can use this object to manage the query.

In [40]:
import time
for x in range(45):
  print('Query Result at time step : ',x)
  df_q = spark.sql("select * from Temp_Humi_Mean")
  df_q.show(24)
  time.sleep(1)


Query Result at time step :  0
+----+-------------------+-----------------+
|Hour|avg(Temperatur (C))|avg(Humidity (%))|
+----+-------------------+-----------------+
|  22|  16.21250016666667|71.51283500000001|
|  13| 20.308333499999996|        69.325339|
|  12| 20.308333333333334|68.86700266666666|
|   1| 15.662500000000001|74.48783816666666|
|null|               null|             null|
+----+-------------------+-----------------+

Query Result at time step :  1
+----+-------------------+-----------------+
|Hour|avg(Temperatur (C))|avg(Humidity (%))|
+----+-------------------+-----------------+
|  22|  16.21250016666667|71.51283500000001|
|  13| 20.308333499999996|        69.325339|
|  12| 20.308333333333334|68.86700266666666|
|   6| 15.179166666666669|77.00033599999999|
|   1| 15.662500000000001|74.48783816666666|
|null|               null|             null|
+----+-------------------+-----------------+

Query Result at time step :  2
+----+-------------------+-----------------+
|Hour

#### Check if the stream is active

In [41]:
spark.streams.active  # get the list of currently active streaming queries.

[<pyspark.sql.streaming.StreamingQuery at 0x7f08ceb93c50>]

In [42]:
spark.streams.active[0].isActive

True

In [43]:
writer.status
# It gives information about what the query is immediately doing - is a trigger active, is data being processed, etc.
# Will print something like the following.

{'isDataAvailable': False,
 'isTriggerActive': True,
 'message': 'Getting offsets from FileStreamSource[file:/content/HourFolder/Hourly_Record]'}

If we want to turn off the stream we'll run writer.stop() to reset the query for testing purposes.

In [44]:
  
writer.stop()     # stop the query.