In [1]:
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [2]:
# instantiate SparkSession
spark = (SparkSession
    .builder
    .appName("Example-3_8")
    .getOrCreate())

22/02/27 12:43:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# Programmatic way to define a schema
fire_schema = StructType([
    StructField('CallNumber', IntegerType(), True),
    StructField('UnitID', StringType(), True),
    StructField('IncidentNumber', IntegerType(), True),
    StructField('CallType', StringType(), True),
    StructField('CallDate', StringType(), True),
    StructField('WatchDate', StringType(), True),
    StructField('CallFinalDisposition', StringType(), True),
    StructField('AvailableDtTm', StringType(), True),
    StructField('Address', StringType(), True),
    StructField('City', StringType(), True),
    StructField('Zipcode', IntegerType(), True),
    StructField('Battalion', StringType(), True),
    StructField('StationArea', StringType(), True),
    StructField('Box', StringType(), True),
    StructField('OriginalPriority', StringType(), True),
    StructField('Priority', StringType(), True),
    StructField('FinalPriority', IntegerType(), True),
    StructField('ALSUnit', BooleanType(), True),
    StructField('CallTypeGroup', StringType(), True),
    StructField('NumAlarms', IntegerType(), True),
    StructField('UnitType', StringType(), True),
    StructField('UnitSequenceInCallDispatch', IntegerType(), True),
    StructField('FirePreventionDistrict', StringType(), True),
    StructField('SupervisorDistrict', StringType(), True),
    StructField('Neighborhood', StringType(), True),
    StructField('Location', StringType(), True),
    StructField('RowID', StringType(), True),
    StructField('Delay', FloatType(), True)
])

In [4]:
# read csv file
fire_df = (spark.read.csv("../../data/sf-fire-calls.csv",
                          header=True,
                          schema=fire_schema))

In [5]:
# save as parquet, define the path to store parquet files
parquet_path = "../../data/parquet/"
fire_df.write.format("parquet").save(parquet_path)

22/02/27 12:43:58 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

In [6]:
# save as Hive metastore, define the table name
# parquet_table = "../../data/parquet_table/"
# fire_df.write \
#     .option("path", parquet_table) \
#     .format("parquet") \
#     .saveAsTable("parquet_table")

In [7]:
# read parquet files
fire_parquet = spark.read.parquet(parquet_path, schema=fire_schema)

**Projections and filters.** A *projection* in relational parlance is a way to return only the rows matching a certain relational condition by using filters. In Spark, projections are done with the `select()` method, while filters can be expressed using the `filter()` or `where()` method. We can use this technique to examine specific aspects of our SF Fire Department data set:

In [8]:
fire_parquet.select("IncidentNumber", "AvailableDtTm", "CallType") \
    .where(col("CallType") != "Medical Incident") \
    .orderBy("IncidentNumber") \
    .show(5, truncate=False)

+--------------+----------------------+-----------------------------+
|IncidentNumber|AvailableDtTm         |CallType                     |
+--------------+----------------------+-----------------------------+
|30636         |04/12/2000 10:18:53 PM|Alarms                       |
|30773         |04/13/2000 10:34:32 AM|Citizen Assist / Service Call|
|30781         |04/13/2000 10:53:48 AM|Alarms                       |
|30840         |04/13/2000 01:39:00 PM|Structure Fire               |
|30942         |04/13/2000 07:42:53 PM|Outside Fire                 |
+--------------+----------------------+-----------------------------+
only showing top 5 rows



What if we want to know how many distinct `CallTypes` were recorded as the causes of the fire calls? These simple and expressive queries do the job:

In [9]:
# In Python, return number of distinct types of calls using countDistinct()
(fire_parquet
    .select("CallType")
    .where(col("CallType").isNotNull())
    .agg(countDistinct("CallType").alias("DistinctCallTypes"))
    .show())

+-----------------+
|DistinctCallTypes|
+-----------------+
|               30|
+-----------------+



We can list the distinct call types in the data set using these queries:

In [10]:
# In Python, filter for only distinct non-null CallTypes from all the rows
(fire_parquet
    .select("CallType")
    .where(col("CallType").isNotNull())
    .distinct()
    .show(10, False))

+-----------------------------+
|CallType                     |
+-----------------------------+
|Elevator / Escalator Rescue  |
|Aircraft Emergency           |
|Alarms                       |
|Odor (Strange / Unknown)     |
|Citizen Assist / Service Call|
|HazMat                       |
|Explosion                    |
|Oil Spill                    |
|Vehicle Fire                 |
|Suspicious Package           |
+-----------------------------+
only showing top 10 rows



In [11]:
# create a new dataframe new_fire_parquet from fire_parquet
# with column "Delay" renaming "ResponseDelayedinMins"
new_fire_parquet = fire_parquet.withColumnRenamed("Delay",
                                                  "ResponseDelayedinMins")
# select ResponseDelayedinMins > 5 mins
(new_fire_parquet
    .select("ResponseDelayedinMins")
    .where(col("ResponseDelayedinMins") > 5)
    .show(5, False))

+---------------------+
|ResponseDelayedinMins|
+---------------------+
|5.0833335            |
|7.2166667            |
|8.666667             |
|5.7166667            |
|16.016666            |
+---------------------+
only showing top 5 rows



Because DataFrame transformations are immutable, when we rename a column using `withColumnRenamed()` we get a new DataFrame while retaining the original with the old column name.

In [12]:
# transform CallDate from "MM/dd/yyyy" to timestamp with name IncidentDate and drop CallDate
# transform WatchDate from "MM/dd/yyyy" to timestamp with name OnWatchDate and drop WatchDate
# transform AvailableDtTm from "MM/dd/yyyy hh:mm:ss a" to timestamp with name AvailableDtTS and drop AvailableDtTm
fire_ts_parquet = (new_fire_parquet
    .withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy"))
    .drop("CallDate")
    .withColumn("OnWatchDate", to_timestamp(col("WatchDate"), "MM/dd/yyyy"))
    .drop("WatchDate")
    .withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"),
    "MM/dd/yyyy hh:mm:ss a"))
    .drop("AvailableDtTm"))
# Select the converted columns
(fire_ts_parquet
    .select("IncidentDate", "OnWatchDate", "AvailableDtTS")
    .show(5, False))

+-------------------+-------------------+-------------------+
|IncidentDate       |OnWatchDate        |AvailableDtTS      |
+-------------------+-------------------+-------------------+
|2000-07-03 00:00:00|2000-07-02 00:00:00|2000-07-03 07:38:35|
|2000-07-03 00:00:00|2000-07-02 00:00:00|2000-07-03 07:05:25|
|2000-07-03 00:00:00|2000-07-02 00:00:00|2000-07-03 08:03:06|
|2000-07-03 00:00:00|2000-07-03 00:00:00|2000-07-03 09:01:03|
|2000-07-03 00:00:00|2000-07-03 00:00:00|2000-07-03 11:26:57|
+-------------------+-------------------+-------------------+
only showing top 5 rows



In [13]:
# select distinct year(IncidentDate) from fire_ts_parquet order by 1
(fire_ts_parquet
    .select(year("IncidentDate"))
    .distinct()
    .orderBy(year("IncidentDate"))
    .show())

+------------------+
|year(IncidentDate)|
+------------------+
|              2000|
|              2001|
|              2002|
|              2003|
|              2004|
|              2005|
|              2006|
|              2007|
|              2008|
|              2009|
|              2010|
|              2011|
|              2012|
|              2013|
|              2014|
|              2015|
|              2016|
|              2017|
|              2018|
+------------------+



In [14]:
# select CallType, count(*) as `count` from fire_ts_parquet where CallType is not null group by CallType
(fire_ts_parquet
    .select("CallType")
    .where(col("CallType").isNotNull())
    .groupBy("CallType")
    .count()
    .orderBy("count", ascending=False)
    .show(n=10, truncate=False))

+-------------------------------+------+
|CallType                       |count |
+-------------------------------+------+
|Medical Incident               |113794|
|Structure Fire                 |23319 |
|Alarms                         |19406 |
|Traffic Collision              |7013  |
|Citizen Assist / Service Call  |2524  |
|Other                          |2166  |
|Outside Fire                   |2094  |
|Vehicle Fire                   |854   |
|Gas Leak (Natural and LP Gases)|764   |
|Water Rescue                   |755   |
+-------------------------------+------+
only showing top 10 rows



In [15]:
import pyspark.sql.functions as F

(fire_ts_parquet
    .select(F.sum("NumAlarms"), 
            F.avg("ResponseDelayedinMins"),
            F.min("ResponseDelayedinMins"),
            F.max("ResponseDelayedinMins"))
    .show())

+--------------+--------------------------+--------------------------+--------------------------+
|sum(NumAlarms)|avg(ResponseDelayedinMins)|min(ResponseDelayedinMins)|max(ResponseDelayedinMins)|
+--------------+--------------------------+--------------------------+--------------------------+
|        176170|         3.892364154521585|               0.016666668|                   1844.55|
+--------------+--------------------------+--------------------------+--------------------------+



22/02/27 15:13:17 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 1046439 ms exceeds timeout 120000 ms
22/02/27 15:13:17 WARN SparkContext: Killing executors is not supported by current scheduler.
