# 02 Transformations and Actions

- Read a CSV file to a dataframe using schema inference
- Transform dataframe's columns (modify data types)
- Execute SQL queries using spark.sql()
- Use dataframe API to get the same results as SQL queries 
  - Perform transformations and actions
- Identify the role of the Catalyst optimizer for queries optimization 
  - SQL/DataFrame -> unresolved logical plan -> resolved logical plan -> optimized logical plan -> physical plan

In [1]:
# 1. Create a spark session

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
spark.version

'4.0.1'

In [2]:
# 2. Create dataframe from csv file at "../../datasets/sf-fire-calls.csv", with header, infer the schema by the values

raw_fire_df = (
    spark.read.format("csv")
        .option("header", "true")
        .option("inferSchema", "true")
        .load("../../datasets/sf-fire-calls.csv")
)

raw_fire_df.show(n=5)

+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------+
|CallNumber|UnitID|IncidentNumber|        CallType|  CallDate| WatchDate|CallFinalDisposition|       AvailableDtTm|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|    Delay|
+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+------------

In [3]:
# 3. Count the number of rows

raw_fire_df.count()

175296

In [4]:
# 4. Print the inferred schema

raw_fire_df.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 

In [5]:
# 5. Display columns: AvailableDtTm, ZipCode and CallDate

raw_fire_df.select("AvailableDtTm", "CallDate", "ZipCode").show(truncate=False)

+----------------------+----------+-------+
|AvailableDtTm         |CallDate  |ZipCode|
+----------------------+----------+-------+
|01/11/2002 01:51:44 AM|01/11/2002|94109  |
|01/11/2002 03:01:18 AM|01/11/2002|94124  |
|01/11/2002 02:39:50 AM|01/11/2002|94102  |
|01/11/2002 04:16:46 AM|01/11/2002|94110  |
|01/11/2002 06:01:58 AM|01/11/2002|94109  |
|01/11/2002 08:03:26 AM|01/11/2002|94105  |
|01/11/2002 09:46:44 AM|01/11/2002|94112  |
|01/11/2002 09:58:53 AM|01/11/2002|94102  |
|01/11/2002 12:06:57 PM|01/11/2002|94115  |
|01/11/2002 01:08:40 PM|01/11/2002|94114  |
|01/11/2002 03:31:02 PM|01/11/2002|94110  |
|01/11/2002 02:59:04 PM|01/11/2002|94112  |
|01/11/2002 04:22:49 PM|01/11/2002|94109  |
|01/11/2002 04:18:33 PM|01/11/2002|94121  |
|01/11/2002 04:09:08 PM|01/11/2002|94110  |
|01/11/2002 04:09:08 PM|01/11/2002|94110  |
|01/11/2002 04:09:08 PM|01/11/2002|94110  |
|01/11/2002 04:34:23 PM|01/11/2002|94116  |
|01/11/2002 04:51:31 PM|01/11/2002|94118  |
|01/11/2002 04:51:12 PM|01/11/20

In [6]:
# 6. Make the following transformations
# transform column AvailableDtTm to timestamp using pyspark.sql.functions.to_timestamp() 
# transform column CallDate to date using pyspark.sql.functions.to_date()
# transform column Zipcode to string using pyspark.sql.functions.expr()
# see https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html

from pyspark.sql.functions import to_timestamp, to_date, expr

fire_df = raw_fire_df.withColumns({
    "AvailableDtTm": to_timestamp("AvailableDtTm", "MM/dd/yyyy hh:mm:ss a"),
    "ZipCode": expr("cast(Zipcode as string)"),
    "CallDate": to_date("CallDate", "MM/dd/yyyy")
})

fire_df.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: date (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: timestamp (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- ZipCode: string (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 

In [8]:
# 7. Save dataframe to table "sf_fire_calls"

fire_df.write.mode("overwrite").saveAsTable("sf_fire_calls")

In [9]:
# 8. Load data from table "sf_fire_calls" and print the number of rows

fire_df2 = spark.table("sf_fire_calls")
fire_df2.count()

175296

In [None]:
# 9. Execute an sql using spark.sql that finds the 5 ZipCodes with the higher number of calls

spark.sql("""
    SELECT ZipCode, COUNT(*) AS cnt
    FROM sf_fire_calls
    GROUP BY ZipCode
    ORDER BY cnt DESC
    LIMIT 5
""").show()

## Transformations

In [None]:
# 10. Issue using spark.sql() an SQL command for the following query: 
# Among all records with a CallType that is not NULL, 
# find the three CallType, Zipcode combinations 
# that occur most often, and report how many times each occurs.

spark.sql("""
select CallType, Zipcode, count(*) as count
from sf_fire_calls
where CallType is not null
group by CallType, Zipcode
order by count desc
limit 3
""").show()

In [None]:
# 11. Create equivalent result with the previous SQL command using the Spark DataFrame API. 
# Create intermediate dataframes using 1 or 2 method invocations at a time.

df1 = fire_df.select("CallType", "Zipcode")
df2 = df1.where("CallType is not null")
df3 = df2.groupBy("CallType", "Zipcode").count()
df4 = df3.orderBy("count", ascending=False)
df5 = df4.limit(3)
df5.show()

In [None]:
# 12. Produce the same result using the composable API

result_df = (
    fire_df.select("CallType", "Zipcode")
            .where("CallType is not null")
            .groupBy("CallType", "Zipcode")
            .count()
            .orderBy("count", ascending=False)
            .limit(3)
)

result_df.show()

In [None]:
# 13. Call explain for the produced final dataframe (CATALYST OPTIMIZER)

result_df.explain(mode="extended")

![](https://www.databricks.com/wp-content/uploads/2018/05/Catalyst-Optimizer-diagram.png)

In [None]:
# 14. List all different call types that are recorded in the sf_fire_calls
# table, ignoring entries where the call type is not specified (i.e. NULL).

spark.sql(
    """
select distinct CallType as distinct_call_types
from sf_fire_calls
where CallType is not null
"""
).show(truncate=False)

In [None]:
# 15. Ditto using Dataframe API

(
    fire_df.where("CallType is not null")
    .selectExpr("CallType as distinct_call_type")
    .distinct()
    .show()
)

In [None]:
# 16. Using SQL answer: 
# What neighborhoods had the worst response time on average in 2018?

spark.sql(
    """
    SELECT Neighborhood, AVG(Delay) AS avg_delay
    FROM sf_fire_calls
    WHERE year(CallDate) = 2018
    GROUP BY Neighborhood
    ORDER BY avg_delay desc;
    """
).show(truncate=False)

In [None]:
# 17. Ditto using Dataframe API

from pyspark.sql.functions import year, avg, col

(
    spark.table("sf_fire_calls")
         .where(year(col("CallDate")) == 2018)
         .groupBy("Neighborhood")
         .agg(avg("Delay").alias("avg_delay"))
         .orderBy(col("avg_delay").desc())
         .show(truncate=False)
)


In [None]:
# 17. Alternatively

from pyspark.sql.functions import year, avg, col

(
    spark.table("sf_fire_calls")
         .filter(year(col("CallDate")) == 2018)
         .groupBy("Neighborhood")
         .agg(avg("Delay").alias("avg_delay"))
         .orderBy(col("avg_delay").desc())
         .show(truncate=False)
)