In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285397 sha256=bc06891a1a3c98c0fec22ff88b051d2c601f7604b0bff9f7643f8e1af7077040
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("Streaming").config('spark.ui.port', '4050').getOrCreate()

In [None]:
!wget https://raw.githubusercontent.com/lawlesst/vivo-sample-data/master/data/csv/people.csv

--2023-08-10 17:50:21--  https://raw.githubusercontent.com/lawlesst/vivo-sample-data/master/data/csv/people.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 4095 (4.0K) [text/plain]
Saving to: ‘people.csv’


2023-08-10 17:50:21 (64.4 MB/s) - ‘people.csv’ saved [4095/4095]



Copy a CSV file into a folder, convert it to JSON, and write it to the output folder.

In [None]:
from pyspark.sql import SparkSession

# Building a spark session
spark = SparkSession.builder.master("local").appName("Streaming").config('spark.ui.port', '4050').getOrCreate()

# Copy the CSV file to the input folder
!mkdir input/
!cp people.csv input/

# Read the CSV file from the input folder
people_df = spark.read.format("csv").option("header", True).load("input/people.csv")

# Write the DataFrame as JSON to the output folder
people_df.write.format("json").mode("overwrite").save("output/people_json")

In [None]:
people_df.show()

+---------+--------------------+---------+--------+--------+------------------+-----------------+------------+-------------------+
|person_ID|                name|    first|    last|  middle|             email|            phone|         fax|              title|
+---------+--------------------+---------+--------+--------+------------------+-----------------+------------+-------------------+
|     3130|     Burks, Rosella |  Rosella|   Burks|    null|   BurksR@univ.edu|     963.555.1253|963.777.4065|         Professor |
|     3297|      Avila, Damien |   Damien|   Avila|    null|   AvilaD@univ.edu|     963.555.1352|963.777.7914|         Professor |
|     3547|       Olsen, Robin |    Robin|   Olsen|    null|   OlsenR@univ.edu|     963.555.1378|963.777.9262|Assistant Professor|
|     1538| Moises, Edgar Estes|    Edgar|  Moises|   Estes|  MoisesE@univ.edu|963.555.2731x3565|963.777.8264|          Professor|
|     2941| Brian, Heath Pruitt|    Heath|   Brian|  Pruitt|   BrianH@univ.edu|    



```
# This is formatted as code
```

Run a streaming job to compute an aggregate for every minute window.

In [None]:
#Code 1
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import input_file_name

# Create a SparkSession
spark = SparkSession.builder.master("local").appName("Streaming").config('spark.ui.port', '4050').getOrCreate()

# Define the schema for the streaming DataFrame
schema = StructType([
    StructField("person_ID", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("first", StringType(), True),
    StructField("last", StringType(), True),
    StructField("middle", StringType(), True),
    StructField("email", StringType(), True),
    StructField("phone", StringType(), True),
    StructField("fax", StringType(), True),
    StructField("title", StringType(), True)
])

# Read the streaming DataFrame from CSV
people_df = spark.readStream.format("csv").schema(schema).option("header", True).load("input")
people_df = people_df.withColumn("filename",input_file_name())

# # Partition the data by the key column for parallel processing
# partitioned_people_df = people_df.repartition("title")

# Perform the aggregation on the title column
title_count_df = people_df.groupBy(["filename","title"]).count()

# Start the streaming query to compute aggregates every minute
query = title_count_df.writeStream \
    .outputMode("update") \
    .queryName("aggregates") \
    .trigger(processingTime='5 seconds') \
    .format("memory") \
    .start()

# Wait for the streaming query to process data
# query.awaitTermination()

# Display the results of the aggregates
spark.sql("select * from aggregates order by 1,2").show(100,truncate=False)

#Code 2
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder \
    .appName("SplitCSVByColumns") \
    .getOrCreate()

# Read the CSV file
csv_df = spark.read.csv("input/people.csv", header=True, inferSchema=True)

# Select specific columns
column1_df = csv_df.select("person_ID", "name")
column2_df = csv_df.select("person_ID", "title")

# Save each column as a separate CSV file
column1_df.write.csv("name.csv", header=True, mode="overwrite")
column2_df.write.csv("title.csv", header=True, mode="overwrite")

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Create a SparkSession
spark = SparkSession.builder \
    .appName("StreamingJoin") \
    .getOrCreate()

# Define the schema for the streaming DataFrames
schema1 = StructType([
    StructField("person_ID", IntegerType(), True),
    StructField("name", StringType(), True)
])

schema2 = StructType([
    StructField("person_ID", IntegerType(), True),
    StructField("title", StringType(), True)
])

# Read the streaming DataFrames
df1 = spark.readStream.format("csv").schema(schema1).option("header", True).load("name.csv")
df2 = spark.readStream.format("csv").schema(schema2).option("header", True).load("title.csv")

# Perform the join operation
joined_df = df1.join(df2, df1["person_ID"] == df2["person_ID"])

# Select the columns you want to include in the results
results_df = joined_df.select("*")

# Start the streaming query to output the results
query = results_df.writeStream \
    .format("memory") \
    .outputMode("append") \
    .queryName("streamingjointitledf") \
    .trigger(processingTime='5 seconds') \
    .start()

spark.sql("select * from streamingjointitledf").show(truncate=False)

#Code 4
from pyspark.sql.functions import *
tumblingWindows = windowing_df.withWatermark("timeReceived", "10 minutes").groupBy("eventId", window("timeReceived", "10 minutes")).count()
tumblingWindows.show(truncate = False)

#Code 5
from pyspark.sql.functions import *
slidingWindows = windowing_df.withWatermark("timeReceived", "10 minutes").groupBy("eventId", window("timeReceived", "10 minutes", "5 minutes")).count()
slidingWindows.show(truncate = False)

#Code 6
from pyspark.sql.functions import *
sessionWindows = windowing_df.withWatermark("timeReceived", "10 minutes").groupBy("eventId", session_window("timeReceived", "5 minutes")).count()
sessionWindows.show(truncate = False)

#Code 7
from pyspark.sql.functions import *

windowedCountsDF = windowing_df.withWatermark("timeReceived", "10 minutes").groupBy(windowing_df.eventId, session_window(windowing_df.timeReceived, \
                                                                                                                         when(windowing_df.eventId == "20", "10 seconds").when(windowing_df.eventId == "12","30 seconds").otherwise("10 minutes"))).count()

windowedCountsDF.show(100, truncate = False)

+--------+-----+-----+
|filename|title|count|
+--------+-----+-----+
+--------+-----+-----+

+---------+----+---------+-----+
|person_ID|name|person_ID|title|
+---------+----+---------+-----+
+---------+----+---------+-----+



NameError: ignored

In [None]:
# Display the results of the aggregates
spark.sql("select * from aggregates order by 1,2").show(100,truncate=False)

+--------------------------------+-------------------+-----+
|filename                        |title              |count|
+--------------------------------+-------------------+-----+
|file:///content/input/people.csv|Assistant Professor|11   |
|file:///content/input/people.csv|Associate Curator  |5    |
|file:///content/input/people.csv|Curator            |5    |
|file:///content/input/people.csv|Professor          |10   |
|file:///content/input/people.csv|Professor          |4    |
|file:///content/input/people.csv|Research Professor |5    |
+--------------------------------+-------------------+-----+



Split the csvs

In [None]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder \
    .appName("SplitCSVByColumns") \
    .getOrCreate()

# Read the CSV file
csv_df = spark.read.csv("input/people.csv", header=True, inferSchema=True)

# Select specific columns
column1_df = csv_df.select("person_ID", "name")
column2_df = csv_df.select("person_ID", "title")

# Save each column as a separate CSV file
column1_df.write.csv("name.csv", header=True, mode="overwrite")
column2_df.write.csv("title.csv", header=True, mode="overwrite")

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Create a SparkSession
spark = SparkSession.builder \
    .appName("StreamingJoin") \
    .getOrCreate()

# Define the schema for the streaming DataFrames
schema1 = StructType([
    StructField("person_ID", IntegerType(), True),
    StructField("name", StringType(), True)
])

schema2 = StructType([
    StructField("person_ID", IntegerType(), True),
    StructField("title", StringType(), True)
])

# Read the streaming DataFrames
df1 = spark.readStream.format("csv").schema(schema1).option("header", True).load("name.csv")
df2 = spark.readStream.format("csv").schema(schema2).option("header", True).load("title.csv")

# Perform the join operation
joined_df = df1.join(df2, df1["person_ID"] == df2["person_ID"])

# Select the columns you want to include in the results


In [None]:
results_df = joined_df.select("*")

# Start the streaming query to output the results
query = results_df.writeStream \
    .format("memory") \
    .outputMode("append") \
    .queryName("streamingjointitledf") \
    .trigger(processingTime='5 seconds') \
    .start()

# Wait for the query to terminate
# query.awaitTermination()

In [None]:
spark.sql("select * from streamingjointitledf").show(truncate=False)

+---------+-------------------------+---------+-------------------+
|person_ID|name                     |person_ID|title              |
+---------+-------------------------+---------+-------------------+
|2811     |Deirdre, Florence Barrera|2811     |Associate Curator  |
|1699     |Stanton, Kathie          |1699     |Professor          |
|2096     |Chuck, Lloyd Haney       |2096     |Assistant Professor|
|2342     |Austin, Liz              |2342     |Assistant Professor|
|2682     |Earline, Jaime Fitzgerald|2682     |Associate Curator  |
|1307     |Pate, Andrea             |1307     |Professor          |
|1824     |Head, Kurtis             |1824     |Professor          |
|3095     |Shields, Rich Pena       |3095     |Professor          |
|2941     |Brian, Heath Pruitt      |2941     |Associate Curator  |
|2383     |Page, Winnie             |2383     |Curator            |
|1089     |Payne, Ladonna           |1089     |Professor          |
|1538     |Moises, Edgar Estes      |1538     |P

In [None]:
windowingData = (("12", "2019-01-02 15:30:00"),
("12",  "2019-01-02 15:30:30"),
("12",  "2019-01-02 15:31:00"),
("12",  "2019-01-02 15:31:50"),
("12",  "2019-01-02 15:31:55"),
("16",  "2019-01-02 15:33:00"),
("16",  "2019-01-02 15:35:20"),
("16",  "2019-01-02 15:37:00"),
("20",  "2019-01-02 15:30:30"),
("20",  "2019-01-02 15:31:00"),
("20",  "2019-01-02 15:31:50"),
("20",  "2019-01-02 15:31:55"),
("20",  "2019-01-02 15:33:00"),
("20",  "2019-01-02 15:35:20"),
("20",  "2019-01-02 15:37:00"),
("20",  "2019-01-02 15:40:00"),
("20",  "2019-01-02 15:45:00"),
("20",  "2019-01-02 15:46:00"),
("20",  "2019-01-02 15:47:30"),
("20",  "2019-01-02 15:48:00"),
("20",  "2019-01-02 15:48:10"),
("20",  "2019-01-02 15:48:20"),
("20",  "2019-01-02 15:48:30"),
("20",  "2019-01-02 15:50:00"),
("20",  "2019-01-02 15:53:00"),
("20",  "2019-01-02 15:54:30"),
("20",  "2019-01-02 15:55:00"),
("22",  "2019-01-02 15:50:30"),
("22",  "2019-01-02 15:52:00"),
("22",  "2019-01-02 15:50:30"),
("22",  "2019-01-02 15:52:00"),
("22",  "2019-01-02 15:50:30"),
("22",  "2019-01-02 15:52:00"))
columns = ["eventId", "timeReceived"]
windowing_df = spark.createDataFrame(data = windowingData, schema = columns)
windowing_df.printSchema()
windowing_df.show(truncate=False)


root
 |-- eventId: string (nullable = true)
 |-- timeReceived: string (nullable = true)

+-------+-------------------+
|eventId|timeReceived       |
+-------+-------------------+
|12     |2019-01-02 15:30:00|
|12     |2019-01-02 15:30:30|
|12     |2019-01-02 15:31:00|
|12     |2019-01-02 15:31:50|
|12     |2019-01-02 15:31:55|
|16     |2019-01-02 15:33:00|
|16     |2019-01-02 15:35:20|
|16     |2019-01-02 15:37:00|
|20     |2019-01-02 15:30:30|
|20     |2019-01-02 15:31:00|
|20     |2019-01-02 15:31:50|
|20     |2019-01-02 15:31:55|
|20     |2019-01-02 15:33:00|
|20     |2019-01-02 15:35:20|
|20     |2019-01-02 15:37:00|
|20     |2019-01-02 15:40:00|
|20     |2019-01-02 15:45:00|
|20     |2019-01-02 15:46:00|
|20     |2019-01-02 15:47:30|
|20     |2019-01-02 15:48:00|
+-------+-------------------+
only showing top 20 rows



# Tumbling window

<img src='https://miro.medium.com/v2/resize:fit:1100/format:webp/1*13n52gO_dw2TTxpbNRE9bg.png' />

[API](https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.sql.functions.window.html)

In [None]:
from pyspark.sql.functions import *
tumblingWindows = windowing_df.withWatermark("timeReceived", "10 minutes").groupBy("eventId", window("timeReceived", "10 minutes")).count()
tumblingWindows.show(truncate = False)

+-------+------------------------------------------+-----+
|eventId|window                                    |count|
+-------+------------------------------------------+-----+
|20     |{2019-01-02 15:30:00, 2019-01-02 15:40:00}|7    |
|20     |{2019-01-02 15:40:00, 2019-01-02 15:50:00}|8    |
|16     |{2019-01-02 15:30:00, 2019-01-02 15:40:00}|3    |
|20     |{2019-01-02 15:50:00, 2019-01-02 16:00:00}|4    |
|22     |{2019-01-02 15:50:00, 2019-01-02 16:00:00}|6    |
|12     |{2019-01-02 15:30:00, 2019-01-02 15:40:00}|5    |
+-------+------------------------------------------+-----+



# Sliding Window


<img src='https://miro.medium.com/v2/resize:fit:1100/format:webp/1*aCXwoamSeMSN1_Sv07M6hg.png' />

In [None]:
from pyspark.sql.functions import *
slidingWindows = windowing_df.withWatermark("timeReceived", "10 minutes").groupBy("eventId", window("timeReceived", "10 minutes", "5 minutes")).count()
slidingWindows.show(truncate = False)

+-------+------------------------------------------+-----+
|eventId|window                                    |count|
+-------+------------------------------------------+-----+
|16     |{2019-01-02 15:25:00, 2019-01-02 15:35:00}|1    |
|20     |{2019-01-02 15:30:00, 2019-01-02 15:40:00}|7    |
|20     |{2019-01-02 15:55:00, 2019-01-02 16:05:00}|1    |
|12     |{2019-01-02 15:25:00, 2019-01-02 15:35:00}|5    |
|20     |{2019-01-02 15:40:00, 2019-01-02 15:50:00}|8    |
|20     |{2019-01-02 15:25:00, 2019-01-02 15:35:00}|5    |
|16     |{2019-01-02 15:30:00, 2019-01-02 15:40:00}|3    |
|20     |{2019-01-02 15:50:00, 2019-01-02 16:00:00}|4    |
|22     |{2019-01-02 15:45:00, 2019-01-02 15:55:00}|6    |
|22     |{2019-01-02 15:50:00, 2019-01-02 16:00:00}|6    |
|20     |{2019-01-02 15:45:00, 2019-01-02 15:55:00}|10   |
|16     |{2019-01-02 15:35:00, 2019-01-02 15:45:00}|2    |
|12     |{2019-01-02 15:30:00, 2019-01-02 15:40:00}|5    |
|20     |{2019-01-02 15:35:00, 2019-01-02 15:45:00}|3   

# Session window



<img src='https://miro.medium.com/v2/resize:fit:1100/format:webp/1*PprkRQPbYPSkrZ66LIAzHA.png' />


[API](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.session_window.html#:~:text=session_window,-pyspark.sql.functions&text=Generates%20session%20window%20given%20a,according%20to%20the%20given%20inputs.)

In [None]:
from pyspark.sql.functions import *
sessionWindows = windowing_df.withWatermark("timeReceived", "1 minutes").groupBy("eventId", session_window("timeReceived", "5 minutes")).count()
sessionWindows.show(truncate = False)

+-------+------------------------------------------+-----+
|eventId|session_window                            |count|
+-------+------------------------------------------+-----+
|12     |{2019-01-02 15:30:00, 2019-01-02 15:36:55}|5    |
|16     |{2019-01-02 15:33:00, 2019-01-02 15:42:00}|3    |
|20     |{2019-01-02 15:30:30, 2019-01-02 16:00:00}|19   |
|22     |{2019-01-02 15:50:30, 2019-01-02 15:57:00}|6    |
+-------+------------------------------------------+-----+



# Dynamic Gapping Period for Session Window

In [None]:
from pyspark.sql.functions import *

windowedCountsDF = windowing_df.withWatermark("timeReceived", "10 minutes").groupBy(windowing_df.eventId, session_window(windowing_df.timeReceived, \
    when(windowing_df.eventId == "20", "10 seconds").when(windowing_df.eventId == "12","30 seconds").otherwise("10 minutes"))).count()

windowedCountsDF.show(100, truncate = False)

+-------+------------------------------------------+-----+
|eventId|session_window                            |count|
+-------+------------------------------------------+-----+
|12     |{2019-01-02 15:30:00, 2019-01-02 15:31:30}|3    |
|12     |{2019-01-02 15:31:50, 2019-01-02 15:32:25}|2    |
|16     |{2019-01-02 15:33:00, 2019-01-02 15:47:00}|3    |
|20     |{2019-01-02 15:30:30, 2019-01-02 15:30:40}|1    |
|20     |{2019-01-02 15:31:00, 2019-01-02 15:31:10}|1    |
|20     |{2019-01-02 15:31:50, 2019-01-02 15:32:05}|2    |
|20     |{2019-01-02 15:33:00, 2019-01-02 15:33:10}|1    |
|20     |{2019-01-02 15:35:20, 2019-01-02 15:35:30}|1    |
|20     |{2019-01-02 15:37:00, 2019-01-02 15:37:10}|1    |
|20     |{2019-01-02 15:40:00, 2019-01-02 15:40:10}|1    |
|20     |{2019-01-02 15:45:00, 2019-01-02 15:45:10}|1    |
|20     |{2019-01-02 15:46:00, 2019-01-02 15:46:10}|1    |
|20     |{2019-01-02 15:47:30, 2019-01-02 15:47:40}|1    |
|20     |{2019-01-02 15:48:00, 2019-01-02 15:48:40}|4   