# Apache spark - starter

In [3]:
# Import
from pyspark.sql import SparkSession

def getStateMnMColorCount(file, state="getAll"):
    
    spark = (SparkSession
             .builder
             .appName("MnMColorCounter")
             .getOrCreate()
            )
    # Load the content
    mnm_df = spark.read.format("csv").option("header", True).option("inferSchema", True).load(file)
    
    # Structure of the data {state, color, count}
    # groupby state and aggregate the mnm_color count
    # query: SUM(count) FROM data GROUP BY state, mnm_color ORDER DESC
    counts_df = mnm_df.select("State", "Color", "Count").groupBy("State", "Color").sum("Count").orderBy("sum(Count)", ascending=False)
    if (state != "getAll"):
        counts_df = counts_df.where(counts_df.State == state)
    
    print(counts_df.show(n=10, truncate=False))
    
    spark.stop()

In [4]:
getStateMnMColorCount("data/dummy-mnm.csv")

+-----+------+----------+
|State|Color |sum(Count)|
+-----+------+----------+
|CA   |Yellow|100956    |
|WA   |Green |96486     |
|CA   |Brown |95762     |
|TX   |Green |95753     |
|TX   |Red   |95404     |
|CO   |Yellow|95038     |
|NM   |Red   |94699     |
|OR   |Orange|94514     |
|WY   |Green |94339     |
|NV   |Orange|93929     |
+-----+------+----------+
only showing top 10 rows

None


Dataset: https://catalog.data.gov/dataset/airline-on-time-performance-and-causes-of-flight-delays

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col, desc, when

spark = (SparkSession
         .builder
         .appName("SQLSample")
         .getOrCreate()
        )
file = "data/flight-delays.csv"

schema = StructType([
   StructField("date", StringType()),
   StructField("delay", IntegerType()),
   StructField("distance", IntegerType()),
   StructField("origin", StringType()),
   StructField("destination", StringType()),
])
fd_df = spark.read.format('csv').option('header', True).schema(schema).load(file)
fd_df.createOrReplaceTempView("flight_delay_tbl")

# Describe the delays
spark.sql("""SELECT MIN(delay) as min_delay, MAX(delay) as max_delay, AVG(delay) as avg_delay FROM flight_delay_tbl""").show()

spark.sql("""SELECT delay, date, origin, destination,
                CASE
                    WHEN delay > 30 THEN 'Long delay'
                    WHEN delay BETWEEN 1 AND 29 THEN 'Acceptable'
                    WHEN delay = 0 THEN 'Right on'
                    ELSE 'Early bird'
                END AS delay_status
                FROM flight_delay_tbl 
                DESC LIMIT 10""").show()

fd_df.select("delay", "date", "origin", "destination").where("delay > 30").orderBy("delay", ascending=False).show(10)
fd_df.select("delay", "date", "origin", "destination").withColumn("delay_status", when(fd_df["delay"] > 30, "Long delay")
.when((fd_df["delay"] > 0) & (fd_df["delay"] < 30), "Acceptable").when(fd_df["delay"] == 0, "Right on",).otherwise("Early bird")).orderBy("delay", ascending=False).show(10)

+---------+---------+------------------+
|min_delay|max_delay|         avg_delay|
+---------+---------+------------------+
|     -112|     1642|12.079802928761449|
+---------+---------+------------------+

+-----+--------+------+-----------+------------+
|delay|    date|origin|destination|delay_status|
+-----+--------+------+-----------+------------+
|    6|01011245|   ABE|        ATL|  Acceptable|
|   -8|01020600|   ABE|        DTW|  Early bird|
|   -2|01021245|   ABE|        ATL|  Early bird|
|   -4|01020605|   ABE|        ATL|  Early bird|
|   -4|01031245|   ABE|        ATL|  Early bird|
|    0|01030605|   ABE|        ATL|    Right on|
|   10|01041243|   ABE|        ATL|  Acceptable|
|   28|01040605|   ABE|        ATL|  Acceptable|
|   88|01051245|   ABE|        ATL|  Long delay|
|    9|01050605|   ABE|        ATL|  Acceptable|
+-----+--------+------+-----------+------------+

+-----+--------+------+-----------+
|delay|    date|origin|destination|
+-----+--------+------+-----------+

In [6]:
# Creating a separate database (spark managed)

# managed db metadata spark.sql.warehouse.dir
spark.sql("CREATE DATABASE IF NOT EXISTS flight_delay_db")

# fd_df.write.saveAsTable("flight_delay_tbl")

# unmanaged
# fd_df.write.option("path", "/tmp/data/flight_delay").mode("overwrite").saveAsTable("flight_delay_tbl_")

# creating global and local views

jfk = fd_df.select("delay", "date", "destination").where("origin == 'JFK'")
jfk.createOrReplaceGlobalTempView("origin_jfk_global_tmp_view")
jfk.createOrReplaceTempView("origin_jfk_tmp_view")

# prefix global_temp to access global views
tmp_view = spark.sql("SELECT * FROM global_temp.origin_jfk_global_tmp_view")
tmp_view.show(10)

# catalog has the metadata information of the created view
dbs = spark.catalog.listDatabases()
tables = spark.catalog.listTables()
columns = spark.catalog.listColumns("flight_delay_tbl")

# drop 
spark.catalog.dropGlobalTempView("origin_jfk_global_tmp_view")
spark.catalog.dropTempView("origin_jfk_tmp_view")

# read tables into dataframes
data_from_table = spark.table("flight_delay_tbl")
print(data_from_table)

+-----+--------+-----------+
|delay|    date|destination|
+-----+--------+-----------+
|   14|01010900|        LAX|
|   -3|01011200|        LAX|
|    2|01011900|        LAX|
|   11|01011700|        LAS|
|   -1|01010800|        SFO|
|   -4|01011540|        DFW|
|    5|01011705|        SAN|
|   -3|01011530|        SFO|
|   -3|01011630|        SJU|
|    2|01011345|        LAX|
+-----+--------+-----------+
only showing top 10 rows

DataFrame[date: string, delay: int, distance: int, origin: string, destination: string]


In [7]:
# save dataframe as a parquet file

sample = [{
    "name": "test",
    "age": 24,
    "sex": "m"
},{
    "name": "test-one",
    "age": 26,
    "sex": "f"
}]
data_frame = spark.createDataFrame(sample)

(data_frame
     .write
     .option("path", "tmp/data/parquet")
     .option("compression", "snappy")
     .mode("overwrite")
     .save()
)

# load parquet files
revive = spark.read.load("tmp/data/parquet")
revive.show()
# Avro - is used by Kafka



+---+--------+---+
|age|    name|sex|
+---+--------+---+
| 26|test-one|  f|
| 24|    test|  m|
+---+--------+---+



                                                                                

In [8]:
# read images

image_dir = "data/images/cctv_train_images/"
image_df = spark.read.format("image").load(image_dir)
image_df.printSchema()

image_df.select("image.height", "image.width", "image.nChannels").show(5)


root
 |-- image: struct (nullable = true)
 |    |-- origin: string (nullable = true)
 |    |-- height: integer (nullable = true)
 |    |-- width: integer (nullable = true)
 |    |-- nChannels: integer (nullable = true)
 |    |-- mode: integer (nullable = true)
 |    |-- data: binary (nullable = true)
 |-- label: integer (nullable = true)

+------+-----+---------+
|height|width|nChannels|
+------+-----+---------+
|   288|  384|        3|
|   288|  384|        3|
|   288|  384|        3|
|   288|  384|        3|
|   288|  384|        3|
+------+-----+---------+
only showing top 5 rows



In [9]:
# Spark SQL UDFs
# enable seamless integration of complex operations, 
# like machine learning models, in Spark SQL, simplifying data querying 
from pyspark.sql.types import LongType

# define and register the udf
def cube(n):
    return n * n * n

spark.udf.register("cube", cube, LongType())

schema = StructType([StructField("number", IntegerType(), True)])

# Create a DataFrame with the specified schema
numbers_df = spark.createDataFrame([(number,) for number in numbers], schema)
numbers_df.write.mode("overwrite").saveAsTable("numbers_tbl")

# extract data from table and pass it to the udf and get the output
spark.sql("SELECT number, cube(number) as predicted FROM numbers_tbl").show()

NameError: name 'numbers' is not defined

In [None]:
# Panda UDF

# Import pandas
import pandas as pd

# Import various pyspark SQL functions including pandas_udf
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

def cubed(a: pd.Series) -> pd.Series:
    return a * a * a

# create panda udf
cubed_udf = pandas_udf(cubed, returnType=LongType())

# panda series 
x = pd.Series([1, 2, 3])
cubed(x)

data_frame = spark.range(1, 5)
data_frame.select("id", cubed_udf(col("id"))).show()

# using higher order functions such as explode, collect_list might cause out of memory errors when dealing with larger datasets
# udf are safer in these situations

In [None]:
# Reading from external source
# sample
query = "SELECT c.colA, c.coln FROM c WHERE c.origin = 'SEA'"
readConfig = {
  "Endpoint" : "https://[ACCOUNT].documents.azure.com:443/", 
  "Masterkey" : "[MASTER KEY]",
  "Database" : "[DATABASE]",
  "preferredRegions" : "Central US;East US2",
  "Collection" : "[COLLECTION]",
  "SamplingRatio" : "1.0",
  "schema_samplesize" : "1000",
  "query_pagesize" : "2147483647",
  "query_custom" : query
}

cosmo_df = (
spark
    .read
    .format("com.microsoft.azure.cosmosdb.spark")
    .option(**readConfig)
    .load()
)

# Join strategies
Broadcast Hash Join (BHJ) 
    -> send the samller dataset as the broadcast variable to the larger dataset side, provided there is enough memory on the driver and executor
Shuffle Hash Join (SHJ)
Shuffle sort merge Join (SMJ)
    -> when we have two large datasets and would like to merge based on a common key
    equi joins, it is a efficient if the two datasets are bucketed by the merge key as this would avoid exchange 
Broadcast Nested Loop Hash Join (BLHJ)
Shuffle and replicate nested loop Hash Join (SRNHJ)

using the metrics to analyse the job, stages and task to make improvements

# Streaming structure
* state-less
* state-full

-> under statefull there are two types managed and unmanaged
### Managed:
* Streaming aggregations
* Stream - stream joins
* Streaming deduplicationn
### Unmanaged - need you to define cleanup
* MapGroupsWithState
* FlatMapGroupsWithState

say each sensor is expected to send at most one reading per minute and we want to detect if any sensor is reporting an unusually high number of times.
To find such anomalies, we can count the number of readings received from each sensor in five-minute intervals.

id, reading, created_at
1, 1, now()
1, 1, now()
2, 1, now()
3, 1, now()
1, 1, now() + 5min
2, 1, now() + 5min
3, 1, now() + 5min

SELECT * FROM df Group by id orderBy created_at ASC

get the last created_at and traves down by aggregating count for interval of 5

for streams create interval windows and update according to the created_at value and compute the value

* window(col, value) -> dynamically grouped computed value
* window(col, time_frame, sliding_value) -> resource extensive as the group does not know when to stop, adding withWatermark(col, time_frame) here the group will conclude after the watermark delay.

# Stream - Static joins
# Stream - Stream joins

adImpressionSchema = id, content, impression_time
clickSchema = id,  ad_id, click_time

adImpressionStream = spark.readStream...
clickStream = spark.readStream..

adImpressionWithWatermark = adImpressionStream.withWaterMark('impression_time', 10 minutes).selectExpr('id as impressionId', 'content')
clickStreamWithWatermark = adImpressionStream.withWaterMark('click_time', 10 minutes).selectExpr('id as ClickId', 'ad_is as clickedImpressionId')



# the following is resource heavy since it unbounded
adImpressionStream.join(clickStream).expr(""" ad_id = id """)
# optimise by providing watermark (say the margin is set from the ad created time)
adImpressionWithWatermark.join(clickStreamWithWatermark).expr(
"""
impressionId = clickedImpressionId
click_time BETWEEN impression_time AND impression_time + inteval 1 hour
"""
)

impression_time + inteval 1 hour will define the buffer time

# Modeling arbitary stateful operations with mapGroupsWithState

If we need to trigger some custom actions based on the stream input.
we can have a udf and pass it to the mapGroupsWithState which expects a key, input stream, previous state
the custom function can process based on the input data.
If the input is triggered based on events and there might be cases where there could be inactivity on the key and this might cause the state to consume more resources
* setting a timeout can mitigate this issue, after a timeout during the next micro batch call this key will be trigeered and the GroupState.hasTimedOut returns true and this can be used to remove the key from the state.
* use the system processing time timeouts, where the system time will be used to trigger/call timeout keys

# flatMapGroupsWithState
Overcomes the limitation where the result can be a empty as apposed to mapGroupsWithState, it returns a iterator

# Performance tuning for stream queries
* since the resources are kept running 24/7 managing resources is a crucial step
* cluster resource management(the number of cores)
* Unlike batch processing the executors deal with little amounts of data so, the shuffle partition, rate limiting if there is a sudden spike in the incoming data, this will move the the data to buffer and process accordingly

# Databases
* Online transaction processing workloads
* Online analytical processing workloads

Data lake is a distributed storage but does not guarantee ACID
Data lakehouse overcomes the problems posed by Datalake

# Building Lakehouses with Apache Spark and Delta Lake
* Dataset - https://www.kaggle.com/datasets/wordsforthewise/lending-club

In [None]:
# Initialise the spark application

import sys

from pyspark.sql import SparkSession

deltaSpark = (
    SparkSession
    .builder
    .config("spark.jars.packages", "io.delta:delta-core_2.12:3.0.0")
    .appName("deltaSpark")
    .getOrCreate()
)

# Load the dataset and write to table

filePath = "data/loan-risks.snappy.parquet"
deltaTmpDir = "tmp/risky-loans/"

# read from file and write to delta format
(deltaSpark
     .read
     .format("parquet") # default format
     .load(filePath)
     .write
     .format("delta")
     .mode("overwrite")
     .save(deltaTmpDir)
)
# creating a view on the data 

# (deltaSpark
     # .read
     # .format("delta")
     # .load(deltaTmpDir)
     # .createOrReplaceTempView("loans_delta")
# )
