In [0]:
%fs ls /databricks-datasets/learning-spark-v2/sf-fire/sf-fire-calls.csv

path,name,size,modificationTime
dbfs:/databricks-datasets/learning-spark-v2/sf-fire/sf-fire-calls.csv,sf-fire-calls.csv,1137925359,1576280979000


# Spark is a in-memory data processing framework, why do we need to cache?

# 1. Create a DF (Without Cache)

In [0]:
fire_df = (spark
           .read
           .format("csv")
           .option("header", "true")
           .option("inferSchema", "true") # spark will read first block of data to make guess about the columns
           .load("/databricks-datasets/learning-spark-v2/sf-fire/sf-fire-calls.csv"))

# 2. Try an action on fire_df (loads data from disk)

In [0]:
# groupBy, agg, select are transformations and write is an action
from pyspark.sql.functions import * 
(fire_df
 .groupBy("Zipcode of Incident")
 .agg(max("Delay").alias("MaxDelay"), min("Delay").alias("MinDelay"))
 .select("Zipcode of Incident", "MaxDelay", "MinDelay")
 .write
 .format("noop")
 .mode("overwrite")
 .save("/FileStore/temp"))

In [0]:
fire_df.show(5)

+-----------+-------+---------------+----------------+----------+----------+----------------------+--------------------+--------------------+----+-------------------+---------+------------+----+------------+--------+--------------+--------+---------------+---------+--------+------------------------------+------------------------+-------------------+---------------+--------------------+-------------+------------------+
|Call Number|Unit ID|Incident Number|        CallType| Call Date|Watch Date|Call Final Disposition|      Available DtTm|             Address|City|Zipcode of Incident|Battalion|Station Area| Box|OrigPriority|Priority|Final Priority|ALS Unit|Call Type Group|NumAlarms|UnitType|Unit sequence in call dispatch|Fire Prevention District|Supervisor District|   Neighborhood|            Location|        RowID|             Delay|
+-----------+-------+---------------+----------------+----------+----------+----------------------+--------------------+--------------------+----+----------

# 3. Try another action on fire_df (loads data again from the disk)

In [0]:
(fire_df
 .select("CallType")
 .where("CallType is not null")
 .groupBy("CallType")
 .count()
 .orderBy("count", ascending=False)
 .write
 .format("noop")
 .mode("overwrite")
 .save("/FileStore/temp"))

# 4. DF Creation (With Cache)

In [0]:
fire_df = (spark
           .read
           .format("csv")
           .option("header", "true")
           .option("inferSchema", "true") # spark will read first block of data to make guess about the columns
           .load("/databricks-datasets/learning-spark-v2/sf-fire/sf-fire-calls.csv"))

fire_df.cache()

Out[5]: DataFrame[Call Number: int, Unit ID: string, Incident Number: int, CallType: string, Call Date: date, Watch Date: date, Call Final Disposition: string, Available DtTm: string, Address: string, City: string, Zipcode of Incident: int, Battalion: string, Station Area: string, Box: string, OrigPriority: string, Priority: string, Final Priority: int, ALS Unit: boolean, Call Type Group: string, NumAlarms: int, UnitType: string, Unit sequence in call dispatch: int, Fire Prevention District: string, Supervisor District: string, Neighborhood: string, Location: string, RowID: string, Delay: double]

# 5. Try an action on fire_df (loads data from disk as this is the 1st action after caching and keeps in memory won't remove from memory as in previous operations where we didnot caching)

In [0]:
# groupBy, agg, select are transformations and write is an action
from pyspark.sql.functions import * 
(fire_df
 .groupBy("Zipcode of Incident")
 .agg(max("Delay").alias("MaxDelay"), min("Delay").alias("MinDelay"))
 .select("Zipcode of Incident", "MaxDelay", "MinDelay")
 .write
 .format("noop")
 .mode("overwrite")
 .save("/FileStore/temp"))

# 6. Try another action on fire_df (uses cached data)

In [0]:
(fire_df
 .select("CallType")
 .where("CallType is not null")
 .groupBy("CallType")
 .count()
 .orderBy("count", ascending=False)
 .write
 .format("noop")
 .mode("overwrite")
 .save("/FileStore/temp"))

# Unpersist DF

In [0]:
fire_df.unpersist()

Out[8]: DataFrame[Call Number: int, Unit ID: string, Incident Number: int, CallType: string, Call Date: date, Watch Date: date, Call Final Disposition: string, Available DtTm: string, Address: string, City: string, Zipcode of Incident: int, Battalion: string, Station Area: string, Box: string, OrigPriority: string, Priority: string, Final Priority: int, ALS Unit: boolean, Call Type Group: string, NumAlarms: int, UnitType: string, Unit sequence in call dispatch: int, Fire Prevention District: string, Supervisor District: string, Neighborhood: string, Location: string, RowID: string, Delay: double]