<a href="https://colab.research.google.com/github/SumiranRai/MDSC-Lab/blob/main/MDSC-205-Software-Lab-In-Data-Engineering/Streaming_Data.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Sumiran Rai

Regd. No. - 24040208007

Software Lab in Data Engineering

# Operations on Streaming Data using PySpark

Install PySpark

In [9]:
!pip -qq install pyspark

Importing required packages

In [10]:
import os
import time
import random
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

Starting the Spark Session

In [11]:
spark = SparkSession.builder.master("local[*]").getOrCreate()

Reading the Data

In [12]:
# header=True → Uses the firt row as column names.
# inferSchema=True → Automatiscally detects data types.

df=spark.read.csv('/content/TempHumi.csv',header=True,inferSchema=True)

In [13]:
df.columns

['Date', 'Temperatur (C)', 'Humidity (%)', 'Hour']

In [14]:
df.show(5)

+--------------------+--------------+------------+----+
|                Date|Temperatur (C)|Humidity (%)|Hour|
+--------------------+--------------+------------+----+
|12/12/2017 12:00:...|          16.1|      72.092|   0|
|12/12/2017 12:05:...|          16.0|      72.092|   0|
|12/12/2017 12:10:...|          15.9|      72.092|   0|
|12/12/2017 12:15:...|         15.85|    72.04201|   0|
|12/12/2017 12:20:...|         15.85|      72.342|   0|
+--------------------+--------------+------------+----+
only showing top 5 rows



In [15]:
df.select('Date').dtypes # Checking data type of any column

[('Date', 'string')]

In [16]:
df.groupBy("Hour").count().show(24)

+----+-----+
|Hour|count|
+----+-----+
|  12|   12|
|  22|   12|
|   1|   12|
|  13|   12|
|   6|   12|
|  16|   12|
|   3|   11|
|  20|   12|
|   5|   12|
|  19|   12|
|  15|   12|
|   9|   12|
|  17|   12|
|   4|   13|
|   8|   12|
|  23|   12|
|   7|   12|
|  10|   12|
|  21|   12|
|  11|   12|
|  14|   12|
|   2|   12|
|   0|   12|
|  18|   12|
+----+-----+



We can now save the output of that job by filtering on each step and saving it to a separate file

In [17]:
!mkdir HourFolder  # Making a folder on current directory

In [18]:
steps = df.select("Hour").distinct().collect()

In [19]:
print(steps)
print(type(steps))
print(steps[0])
print(steps[0][0])

[Row(Hour=12), Row(Hour=22), Row(Hour=1), Row(Hour=13), Row(Hour=6), Row(Hour=16), Row(Hour=3), Row(Hour=20), Row(Hour=5), Row(Hour=19), Row(Hour=15), Row(Hour=9), Row(Hour=17), Row(Hour=4), Row(Hour=8), Row(Hour=23), Row(Hour=7), Row(Hour=10), Row(Hour=21), Row(Hour=11), Row(Hour=14), Row(Hour=2), Row(Hour=0), Row(Hour=18)]
<class 'list'>
Row(Hour=12)
12


In [20]:
df_test = df.where(f"Hour={steps[0][0]}")
df_test.show()
type(df_test)

+--------------------+--------------+------------+----+
|                Date|Temperatur (C)|Humidity (%)|Hour|
+--------------------+--------------+------------+----+
|12/12/2017 12:00:...|     19.900002|   69.742004|  12|
|12/12/2017 12:05:...|         19.95|      68.892|  12|
|12/12/2017 12:10:...|         20.05|    69.04201|  12|
|12/12/2017 12:15:...|         20.05|      68.942|  12|
|12/12/2017 12:20:...|          20.3|      69.342|  12|
|12/12/2017 12:25:...|         20.35|      69.092|  12|
|12/12/2017 12:30:...|     20.400002|      68.842|  12|
|12/12/2017 12:35:...|         20.45|      68.642|  12|
|12/12/2017 12:40:...|          20.5|   68.742004|  12|
|12/12/2017 12:45:...|         20.55|      68.592|  12|
|12/12/2017 12:50:...|     20.599998|   68.242004|  12|
|12/12/2017 12:55:...|     20.599998|    68.29201|  12|
+--------------------+--------------+------------+----+



Since we don't have a real-time data source, the notebook creates a simulated streaming environment by -

Splitting Data into Hourly Chunks

Each row has an "Hour" column. The dataset is split into multiple small CSV files, where each file contains data from a single hour.

###  Creating the streaming version of this input
Implementing the above steps in the loop and making individual `csv` files for each hour step. These files are saved in a folder as given below. We will use this folder as a source of incoming stream data and, we will read each file one by one as if it is a stream.

In [21]:
for step in steps:
  df1 = df.where(f"Hour={step[0]}")
  df1.coalesce(1).write.csv(path='/content/HourFolder/Hourly_Record',header="true",mode="append")

In [22]:
Hourly_Record_list = [i for i in os.listdir("/content/HourFolder/Hourly_Record/") if i.endswith(".csv")]
Hourly_Record_list

['part-00000-fbb7cfd8-9b6d-483a-a975-b17d736c2ae9-c000.csv',
 'part-00000-29f0c0ae-a926-4b1e-9dc8-e93be30efa7c-c000.csv',
 'part-00000-6702ebaf-4f8a-40b3-87b0-2423d9ae5050-c000.csv',
 'part-00000-72bb344f-c03d-42ad-83b7-464ac959af64-c000.csv',
 'part-00000-ca6fac55-59e3-4646-8a2a-bcefd7d73676-c000.csv',
 'part-00000-aaf477e5-9b83-4247-bdca-9f5f334ff55c-c000.csv',
 'part-00000-696224d1-4d86-4377-a055-a3bbcd332938-c000.csv',
 'part-00000-e2c431c7-6224-46bf-981a-1a5eab430531-c000.csv',
 'part-00000-c5176318-523a-4de4-958e-2e0c7ddf145b-c000.csv',
 'part-00000-fff72b43-a3c5-40fc-b97c-424a03d5fc5a-c000.csv',
 'part-00000-962b488d-e8f2-4bc2-bcca-4df147fc144e-c000.csv',
 'part-00000-c07eb922-a833-4ab0-9984-498661f5a1bc-c000.csv',
 'part-00000-f61fde5a-593b-421c-8acc-d688278ad473-c000.csv',
 'part-00000-b5494da8-63a2-4927-b71a-ac26218d44b3-c000.csv',
 'part-00000-078ce559-4020-4967-a7ac-4e66a9db781a-c000.csv',
 'part-00000-94466a82-1dbb-47b3-864a-60adf2007211-c000.csv',
 'part-00000-aaf3a808-57

### Checking any one file from above

In [23]:
file_name = Hourly_Record_list[random.randint(0,len(Hourly_Record_list)-1)]
file_name

'part-00000-c07eb922-a833-4ab0-9984-498661f5a1bc-c000.csv'

In [24]:
part =spark.read.csv("/content/HourFolder/Hourly_Record/"+file_name, header=True,inferSchema=True)
part.show()

+--------------------+--------------+------------+----+
|                Date|Temperatur (C)|Humidity (%)|Hour|
+--------------------+--------------+------------+----+
|12/12/2017 02:00:...|          15.5|   75.492004|   2|
|12/12/2017 02:05:...|          15.5|      75.642|   2|
|12/12/2017 02:10:...|     15.450001|    75.54201|   2|
|12/12/2017 02:15:...|     15.450001|      75.842|   2|
|12/12/2017 02:20:...|     15.450001|   75.992004|   2|
|12/12/2017 02:25:...|          15.5|      75.942|   2|
|12/12/2017 02:30:...|          15.5|      75.842|   2|
|12/12/2017 02:35:...|          15.4|      75.942|   2|
|12/12/2017 02:40:...|          15.4|      76.092|   2|
|12/12/2017 02:45:...|          15.4|      76.342|   2|
|12/12/2017 02:50:...|         15.45|      76.342|   2|
|12/12/2017 02:55:...|     15.450001|    76.04201|   2|
+--------------------+--------------+------------+----+



In [25]:
part.schema # checking the part schema

StructType([StructField('Date', StringType(), True), StructField('Temperatur (C)', DoubleType(), True), StructField('Humidity (%)', DoubleType(), True), StructField('Hour', IntegerType(), True)])

In [26]:
streaming = (spark.readStream.schema(part.schema).option('maxFilesPerTrigger',1).csv('HourFolder/Hourly_Record/'))

# maxFilesPerTrigger: maximum number of new files to be considered in every trigger, here taken 1.

# File source - Reads files written in a directory as a stream of data, here directory: HourFolder/Hourly_Record.

# Supported file formats are text, CSV, JSON, ORC, Parquet, here is CSV.

# Files will be processed in the order of file modification time.

In [27]:
type(streaming)

### Setting Up Transformation

* Hourly_Mean_Value is a streaming DataFrame (that is, a DataFrame on unbounded, streaming data) that represents the running mean that will be computed once the streaming query is started and the streaming input data is being continuously processed.

* Now that we have our transformation, we need to specify an output sink for the results. for this example, we are going to write to a memory sink that keeps the results in memory.

* We also need to define how spark will output that data. In this example, we will use the complete output mode (rewriting all of the keys along with their counts after every trigger).

* In this example, we will not include activity query.awaitTermination() because it is required only to prevent the driver process from terminating when the stream is active. So to be able to run this locally in a notebook we will not include it.

In [28]:
Hourly_Mean_Value = streaming.groupBy('Hour').mean("Temperatur (C)","Humidity (%)").orderBy(F.desc("Hour"))

### Define output sink and output mode

* Memory sink (for debugging) - The output is stored in memory as an in-memory table.
* Both, Append and Complete output modes, are supported.
* This should be used for debugging purposes on low data volumes.
* as the entire output is collected and stored in the driver’s memory. Hence, use it with caution.
* Have all the aggregates in an in-memory table. The query name will be the table name, here: "Temp_Humi_Mean".
* Note that we have to call start() to start the execution of the query.
* This returns a StreamingQuery object which is a handle to the continuously running execution.

In [29]:
writer = (Hourly_Mean_Value.writeStream.queryName("Temp_Humi_Mean").format("memory").outputMode("complete").start())

Specifying processing details:
 * Triggering details: This indicates when to trigger the discovery and processing of newly available streaming data.
 * Checkpoint location: This option is necessary for failure recovery in the real application.

 For sake of simplicity, these parameters are not set by us here. By default trigger --> The streaming query executes data in micro-batches where the next micro-batch is triggered as soon as the previous micro-batch has been completed.

 ### Start the query
 * Once everything has been specified, the final step is to start the query, already done in the above cell by .start(). We have created a table with --> .queryName ("Temp_Humi_Mean") which is updated according to trigger.

* This is a StreamingQuery object which is a handle to the continuously running execution. We can use this object to manage the query.

In [30]:
for x in range(45):
  print('Query Result at time step : ',x)
  df_q = spark.sql("SELECT * FROM Temp_Humi_Mean")
  df_q.show(24)
  time.sleep(1)

Query Result at time step :  0
+----+-------------------+-----------------+
|Hour|avg(Temperatur (C))|avg(Humidity (%))|
+----+-------------------+-----------------+
+----+-------------------+-----------------+

Query Result at time step :  1
+----+-------------------+-----------------+
|Hour|avg(Temperatur (C))|avg(Humidity (%))|
+----+-------------------+-----------------+
+----+-------------------+-----------------+

Query Result at time step :  2
+----+-------------------+-----------------+
|Hour|avg(Temperatur (C))|avg(Humidity (%))|
+----+-------------------+-----------------+
+----+-------------------+-----------------+

Query Result at time step :  3
+----+-------------------+-----------------+
|Hour|avg(Temperatur (C))|avg(Humidity (%))|
+----+-------------------+-----------------+
+----+-------------------+-----------------+

Query Result at time step :  4
+----+-------------------+-----------------+
|Hour|avg(Temperatur (C))|avg(Humidity (%))|
+----+-------------------+-----

### Check if the stream is active

In [31]:
spark.streams.active  # get the list of currently active streaming queries.

[<pyspark.sql.streaming.query.StreamingQuery at 0x7a0edeefbb90>]

In [32]:
spark.streams.active[0].isActive

True

In [33]:
writer.status
# It gives information about what the query is immediately doing - is a trigger active, is data being processed, etc.
# Will print something like the following.

{'message': 'Processing new data',
 'isDataAvailable': True,
 'isTriggerActive': True}

### Finally stop the query

In [34]:
writer.stop()  # stop the query.