# Chapter 3 Underneath an RDD and The DataFrame API

# Creating structures from list

In [23]:
from pyspark.sql import SparkSession, Row
import pyspark.sql.functions as F
from pyspark.sql.types import *
spark = (
    SparkSession
    .builder
    .appName("structuring_Spark")
    .getOrCreate()
    )
sc = spark.sparkContext

## Creating an RDD

In [24]:
# creating RDD
dataRDD = sc.parallelize(
    [("Brooke", 20), ("Denny", 31), ("Jules", 30), ("TD", 35), ("Brooke", 25)]
    )
# aggregating to get mean age
agesRDD = (
    dataRDD
    .map(lambda x: (x[0], (x[1], 1)))
    .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])) 
    .map(lambda x: (x[0], x[1][0]/x[1][1]))
    )
print(agesRDD.take(10))

[('Brooke', 22.5), ('TD', 35.0), ('Denny', 31.0), ('Jules', 30.0)]


## Now with DataFrame

In [25]:
# creating df from list
data_df = spark.createDataFrame(
    [("Brooke", 20), ("Denny", 31), ("Jules", 30), ("TD", 35), ("Brooke", 25)],
    ["name", "age"]
    )
# aggregating
agg_df = data_df.groupBy("name").agg(F.avg("age"))
agg_df.show()

+------+--------+
|  name|avg(age)|
+------+--------+
|Brooke|    22.5|
| Denny|    31.0|
| Jules|    30.0|
|    TD|    35.0|
+------+--------+



# The DataFrame API

In [26]:
schema = "`Id` INT, `First` STRING, `Last` STRING, `Url` STRING, " + \
    "`Published` STRING, `Hits` INT, `Campaigns` ARRAY<STRING>"
data = [
    [1, "Jules", "Damji", "https://tinyurl.1", "1/4/2016", 4535, ["twitter", "LinkedIn"]],
    [2, "Brooke","Wenig", "https://tinyurl.2", "5/5/2018", 8908, ["twitter", "LinkedIn"]],
    [3, "Denny", "Lee", "https://tinyurl.3", "6/7/2019", 7659, ["web", "twitter", "FB", "LinkedIn"]],
    [4, "Tathagata", "Das", "https://tinyurl.4", "5/12/2018", 10568, ["twitter", "FB"]],
    [5, "Matei","Zaharia", "https://tinyurl.5", "5/14/2014", 40578, ["web", "twitter", "FB", "LinkedIn"]],
    [6, "Reynold", "Xin", "https://tinyurl.6", "3/2/2015", 25568, ["twitter", "LinkedIn"]]
    ]
blogs_df = spark.createDataFrame(data, schema)
blogs_df.show()
print(blogs_df.printSchema())

+---+---------+-------+-----------------+---------+-----+--------------------+
| Id|    First|   Last|              Url|Published| Hits|           Campaigns|
+---+---------+-------+-----------------+---------+-----+--------------------+
|  1|    Jules|  Damji|https://tinyurl.1| 1/4/2016| 4535| [twitter, LinkedIn]|
|  2|   Brooke|  Wenig|https://tinyurl.2| 5/5/2018| 8908| [twitter, LinkedIn]|
|  3|    Denny|    Lee|https://tinyurl.3| 6/7/2019| 7659|[web, twitter, FB...|
|  4|Tathagata|    Das|https://tinyurl.4|5/12/2018|10568|       [twitter, FB]|
|  5|    Matei|Zaharia|https://tinyurl.5|5/14/2014|40578|[web, twitter, FB...|
|  6|  Reynold|    Xin|https://tinyurl.6| 3/2/2015|25568| [twitter, LinkedIn]|
+---+---------+-------+-----------------+---------+-----+--------------------+

root
 |-- Id: integer (nullable = true)
 |-- First: string (nullable = true)
 |-- Last: string (nullable = true)
 |-- Url: string (nullable = true)
 |-- Published: string (nullable = true)
 |-- Hits: integer (

In [27]:
# Or with classes
schema = StructType([
   StructField("Id", IntegerType(), False),
   StructField("First", StringType(), False),
   StructField("Last", StringType(), False),
   StructField("Url", StringType(), False),
   StructField("Published", StringType(), False),
   StructField("Hits", IntegerType(), False),
   StructField("Campaigns", ArrayType(StringType()), False)]
   )
blogs_df = spark.createDataFrame(data, schema)
blogs_df.show()
print(blogs_df.printSchema())

+---+---------+-------+-----------------+---------+-----+--------------------+
| Id|    First|   Last|              Url|Published| Hits|           Campaigns|
+---+---------+-------+-----------------+---------+-----+--------------------+
|  1|    Jules|  Damji|https://tinyurl.1| 1/4/2016| 4535| [twitter, LinkedIn]|
|  2|   Brooke|  Wenig|https://tinyurl.2| 5/5/2018| 8908| [twitter, LinkedIn]|
|  3|    Denny|    Lee|https://tinyurl.3| 6/7/2019| 7659|[web, twitter, FB...|
|  4|Tathagata|    Das|https://tinyurl.4|5/12/2018|10568|       [twitter, FB]|
|  5|    Matei|Zaharia|https://tinyurl.5|5/14/2014|40578|[web, twitter, FB...|
|  6|  Reynold|    Xin|https://tinyurl.6| 3/2/2015|25568| [twitter, LinkedIn]|
+---+---------+-------+-----------------+---------+-----+--------------------+

root
 |-- Id: integer (nullable = false)
 |-- First: string (nullable = false)
 |-- Last: string (nullable = false)
 |-- Url: string (nullable = false)
 |-- Published: string (nullable = false)
 |-- Hits: inte

# Columns manipulation

In [28]:

blogs_df.select(F.expr("Hits") * 2).show(2)
blogs_df.select(F.col("Hits") * 2).show(2)
blogs_df.select(F.expr("Hits * 2")).show(2)
# show heavy hitters
blogs_df.withColumn("Big Hitters", (F.expr("Hits > 10000"))).show()
print(blogs_df.schema)

+----------+
|(Hits * 2)|
+----------+
|      9070|
|     17816|
+----------+
only showing top 2 rows

+----------+
|(Hits * 2)|
+----------+
|      9070|
|     17816|
+----------+
only showing top 2 rows

+----------+
|(Hits * 2)|
+----------+
|      9070|
|     17816|
+----------+
only showing top 2 rows

+---+---------+-------+-----------------+---------+-----+--------------------+-----------+
| Id|    First|   Last|              Url|Published| Hits|           Campaigns|Big Hitters|
+---+---------+-------+-----------------+---------+-----+--------------------+-----------+
|  1|    Jules|  Damji|https://tinyurl.1| 1/4/2016| 4535| [twitter, LinkedIn]|      false|
|  2|   Brooke|  Wenig|https://tinyurl.2| 5/5/2018| 8908| [twitter, LinkedIn]|      false|
|  3|    Denny|    Lee|https://tinyurl.3| 6/7/2019| 7659|[web, twitter, FB...|      false|
|  4|Tathagata|    Das|https://tinyurl.4|5/12/2018|10568|       [twitter, FB]|       true|
|  5|    Matei|Zaharia|https://tinyurl.5|5/14/2014|405

In [29]:
# concatenating columns
(
    blogs_df
    .withColumn("AuthorsId", (F.concat(F.expr("First"), F.expr("Last"), F.expr("Id")))) 
    .select(F.col("AuthorsId"))
    .show(4)
)

+-------------+
|    AuthorsId|
+-------------+
|  JulesDamji1|
| BrookeWenig2|
|    DennyLee3|
|TathagataDas4|
+-------------+
only showing top 4 rows



In [30]:
# sorting
blogs_df.sort(F.col("Id").desc()).show()

+---+---------+-------+-----------------+---------+-----+--------------------+
| Id|    First|   Last|              Url|Published| Hits|           Campaigns|
+---+---------+-------+-----------------+---------+-----+--------------------+
|  6|  Reynold|    Xin|https://tinyurl.6| 3/2/2015|25568| [twitter, LinkedIn]|
|  5|    Matei|Zaharia|https://tinyurl.5|5/14/2014|40578|[web, twitter, FB...|
|  4|Tathagata|    Das|https://tinyurl.4|5/12/2018|10568|       [twitter, FB]|
|  3|    Denny|    Lee|https://tinyurl.3| 6/7/2019| 7659|[web, twitter, FB...|
|  2|   Brooke|  Wenig|https://tinyurl.2| 5/5/2018| 8908| [twitter, LinkedIn]|
|  1|    Jules|  Damji|https://tinyurl.1| 1/4/2016| 4535| [twitter, LinkedIn]|
+---+---------+-------+-----------------+---------+-----+--------------------+



# Rows

In [31]:
blog_row = Row(6, "Reynold", "Xin", "https://tinyurl.6", 255568, "3/2/2015",
["twitter", "LinkedIn"])
blog_row

<Row(6, 'Reynold', 'Xin', 'https://tinyurl.6', 255568, '3/2/2015', ['twitter', 'LinkedIn'])>

In [32]:
# creating dataframe from rows
rows = [Row("Matei Zaharia", "CA"), Row("Reynold Xin", "CA")]
authors_df = spark.createDataFrame(rows, ["Authors", "State"])
authors_df.show()

+-------------+-----+
|      Authors|State|
+-------------+-----+
|Matei Zaharia|   CA|
|  Reynold Xin|   CA|
+-------------+-----+



# Data Readers and Data Writers

In [33]:
fire_schema = StructType([
    StructField('CallNumber', IntegerType(), True),
    StructField('UnitID', StringType(), True),
    StructField('IncidentNumber', IntegerType(), True),
    StructField('CallType', StringType(), True),
    StructField('CallDate', StringType(), True),
    StructField('WatchDate', StringType(), True),
    StructField('CallFinalDisposition', StringType(), True),
    StructField('AvailableDtTm', StringType(), True),
    StructField('Address', StringType(), True),
    StructField('City', StringType(), True),
    StructField('Zipcode', IntegerType(), True),
    StructField('Battalion', StringType(), True),
    StructField('StationArea', StringType(), True),
    StructField('Box', StringType(), True),
    StructField('OriginalPriority', StringType(), True),
    StructField('Priority', StringType(), True),
    StructField('FinalPriority', IntegerType(), True),
    StructField('ALSUnit', BooleanType(), True),
    StructField('CallTypeGroup', StringType(), True),
    StructField('NumAlarms', IntegerType(), True),
    StructField('UnitType', StringType(), True),
    StructField('UnitSequenceInCallDispatch', IntegerType(), True),
    StructField('FirePreventionDistrict', StringType(), True),
    StructField('SupervisorDistrict', StringType(), True),
    StructField('Neighborhood', StringType(), True),
    StructField('Location', StringType(), True),
    StructField('RowID', StringType(), True),
    StructField('Delay', FloatType(), True)
    ])
sf_fire_file = "./data/sf-fire-calls.csv"
fire_df = spark.read.csv(sf_fire_file, header=True, schema=fire_schema)

In [34]:
fire_df.show(10)

+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------+
|CallNumber|UnitID|IncidentNumber|        CallType|  CallDate| WatchDate|CallFinalDisposition|       AvailableDtTm|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|    Delay|
+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+------------

In [35]:
# saving as parquet
parquet_path = "./data/saved_parquet"
fire_df.write.format("parquet").save(parquet_path, mode="overwrite")

25/05/05 22:29:20 WARN MemoryManager: Total allocation exceeds 95,00% (991 166 452 bytes) of heap memory
Scaling row group sizes to 92,31% for 8 writers
25/05/05 22:29:20 WARN MemoryManager: Total allocation exceeds 95,00% (991 166 452 bytes) of heap memory
Scaling row group sizes to 82,05% for 9 writers
25/05/05 22:29:20 WARN MemoryManager: Total allocation exceeds 95,00% (991 166 452 bytes) of heap memory
Scaling row group sizes to 73,85% for 10 writers
25/05/05 22:29:20 WARN MemoryManager: Total allocation exceeds 95,00% (991 166 452 bytes) of heap memory
Scaling row group sizes to 67,13% for 11 writers
25/05/05 22:29:21 WARN MemoryManager: Total allocation exceeds 95,00% (991 166 452 bytes) of heap memory
Scaling row group sizes to 73,85% for 10 writers
25/05/05 22:29:21 WARN MemoryManager: Total allocation exceeds 95,00% (991 166 452 bytes) of heap memory
Scaling row group sizes to 82,05% for 9 writers
25/05/05 22:29:21 WARN MemoryManager: Total allocation exceeds 95,00% (991 166 

In [36]:
# saving as table
parquet_table = "test_table"
_ = spark.sql(f"DROP TABLE IF EXISTS {parquet_table}")
fire_df.write.format("parquet").saveAsTable(parquet_table)

25/05/05 22:29:22 WARN MemoryManager: Total allocation exceeds 95,00% (991 166 452 bytes) of heap memory
Scaling row group sizes to 92,31% for 8 writers
25/05/05 22:29:22 WARN MemoryManager: Total allocation exceeds 95,00% (991 166 452 bytes) of heap memory
Scaling row group sizes to 82,05% for 9 writers
25/05/05 22:29:22 WARN MemoryManager: Total allocation exceeds 95,00% (991 166 452 bytes) of heap memory
Scaling row group sizes to 73,85% for 10 writers
25/05/05 22:29:22 WARN MemoryManager: Total allocation exceeds 95,00% (991 166 452 bytes) of heap memory
Scaling row group sizes to 67,13% for 11 writers
25/05/05 22:29:22 WARN MemoryManager: Total allocation exceeds 95,00% (991 166 452 bytes) of heap memory
Scaling row group sizes to 73,85% for 10 writers
25/05/05 22:29:22 WARN MemoryManager: Total allocation exceeds 95,00% (991 166 452 bytes) of heap memory
Scaling row group sizes to 82,05% for 9 writers
25/05/05 22:29:22 WARN MemoryManager: Total allocation exceeds 95,00% (991 166 

In [37]:
spark.read.table(parquet_table).show(10)
_ = spark.sql(f"DROP TABLE IF EXISTS {parquet_table}")

+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------+
|CallNumber|UnitID|IncidentNumber|        CallType|  CallDate| WatchDate|CallFinalDisposition|       AvailableDtTm|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|      UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|    Delay|
+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+

# Some transformations

In [38]:
# where
few_fire_df = (fire_df
    .select("IncidentNumber", "AvailableDtTm", "CallType")
    .where(F.col("CallType") != "Medical Incident"))
few_fire_df.show(5, truncate=False)

+--------------+----------------------+--------------+
|IncidentNumber|AvailableDtTm         |CallType      |
+--------------+----------------------+--------------+
|2003235       |01/11/2002 01:51:44 AM|Structure Fire|
|2003250       |01/11/2002 04:16:46 AM|Vehicle Fire  |
|2003259       |01/11/2002 06:01:58 AM|Alarms        |
|2003279       |01/11/2002 08:03:26 AM|Structure Fire|
|2003301       |01/11/2002 09:46:44 AM|Alarms        |
+--------------+----------------------+--------------+
only showing top 5 rows



In [39]:
# filter
few_fire_df = (fire_df
    .select("IncidentNumber", "AvailableDtTm", "CallType")
    .filter(F.col("CallType") != "Medical Incident"))
few_fire_df.show(5, truncate=False)

+--------------+----------------------+--------------+
|IncidentNumber|AvailableDtTm         |CallType      |
+--------------+----------------------+--------------+
|2003235       |01/11/2002 01:51:44 AM|Structure Fire|
|2003250       |01/11/2002 04:16:46 AM|Vehicle Fire  |
|2003259       |01/11/2002 06:01:58 AM|Alarms        |
|2003279       |01/11/2002 08:03:26 AM|Structure Fire|
|2003301       |01/11/2002 09:46:44 AM|Alarms        |
+--------------+----------------------+--------------+
only showing top 5 rows



In [40]:
# count distinct
(
    fire_df
    .select("CallType")
    .where(F.col("CallType").isNotNull())
    .agg(F.countDistinct("CallType").alias("DistinctCallTypes"))
    .show()
)

[Stage 103:>                                                      (0 + 11) / 11]

+-----------------+
|DistinctCallTypes|
+-----------------+
|               30|
+-----------------+



                                                                                

In [41]:
# output distinct ones themselves
(
    fire_df
    .select("CallType")
    .where(F.col("CallType").isNotNull())
    .distinct()
    .show(10, False)
)

+-----------------------------+
|CallType                     |
+-----------------------------+
|Elevator / Escalator Rescue  |
|Aircraft Emergency           |
|Alarms                       |
|Odor (Strange / Unknown)     |
|Citizen Assist / Service Call|
|HazMat                       |
|Explosion                    |
|Oil Spill                    |
|Vehicle Fire                 |
|Suspicious Package           |
+-----------------------------+
only showing top 10 rows



In [42]:
# renaming F.columns
new_fire_df = fire_df.withColumnRenamed("Delay", "ResponseDelayedinMins")
(
    new_fire_df
    .select("ResponseDelayedinMins")
    .where(F.col("ResponseDelayedinMins") > 5)
    .show(5, False)
)

+---------------------+
|ResponseDelayedinMins|
+---------------------+
|5.35                 |
|6.25                 |
|5.2                  |
|5.6                  |
|7.25                 |
+---------------------+
only showing top 5 rows



In [43]:
# formating
fire_ts_df = (
    new_fire_df
    .withColumn("IncidentDate", F.to_timestamp(F.col("CallDate"), "dd/MM/yyyy"))
    .drop("CallDate")
    .withColumn("OnWatchDate", F.to_timestamp(F.col("WatchDate"), "dd/MM/yyyy"))
    .drop("WatchDate")
    .withColumn("AvailableDtTS", F.to_timestamp(F.col("AvailableDtTm"), "dd/MM/yyyy hh:mm:ss a"))
    .drop("AvailableDtTm")
    )
# Select the converted F.columns
(
    fire_ts_df
    .select("IncidentDate", "OnWatchDate", "AvailableDtTS")
    .show(5, False)
)

+-------------------+-------------------+-------------------+
|IncidentDate       |OnWatchDate        |AvailableDtTS      |
+-------------------+-------------------+-------------------+
|2002-11-01 00:00:00|2002-10-01 00:00:00|2002-11-01 01:51:44|
|2002-11-01 00:00:00|2002-10-01 00:00:00|2002-11-01 03:01:18|
|2002-11-01 00:00:00|2002-10-01 00:00:00|2002-11-01 02:39:50|
|2002-11-01 00:00:00|2002-10-01 00:00:00|2002-11-01 04:16:46|
|2002-11-01 00:00:00|2002-10-01 00:00:00|2002-11-01 06:01:58|
+-------------------+-------------------+-------------------+
only showing top 5 rows



In [44]:
# date_trunc (kinda)
(
    fire_ts_df
    .select(F.year('IncidentDate'))
    .distinct()
    .orderBy(F.year('IncidentDate'))
    .show()
)

[Stage 114:>                                                      (0 + 11) / 11]

+------------------+
|year(IncidentDate)|
+------------------+
|              NULL|
|              2000|
|              2001|
|              2002|
|              2003|
|              2004|
|              2005|
|              2006|
|              2007|
|              2008|
|              2009|
|              2010|
|              2011|
|              2012|
|              2013|
|              2014|
|              2015|
|              2016|
|              2017|
|              2018|
+------------------+



                                                                                

In [45]:
# groupBy
(   
    fire_ts_df
    .select("CallType")
    .where(F.col("CallType").isNotNull())
    .groupBy("CallType")
    .count()
    .orderBy("count", ascending=False)
    .show(n=10, truncate=False)
)

+-------------------------------+------+
|CallType                       |count |
+-------------------------------+------+
|Medical Incident               |113794|
|Structure Fire                 |23319 |
|Alarms                         |19406 |
|Traffic Collision              |7013  |
|Citizen Assist / Service Call  |2524  |
|Other                          |2166  |
|Outside Fire                   |2094  |
|Vehicle Fire                   |854   |
|Gas Leak (Natural and LP Gases)|764   |
|Water Rescue                   |755   |
+-------------------------------+------+
only showing top 10 rows



In [46]:
# AKA describe()
(
    fire_ts_df.select(
        F.sum("NumAlarms"),
        F.avg("ResponseDelayedinMins"),
        F.min("ResponseDelayedinMins"),
        F.max("ResponseDelayedinMins"),
    ).show()
)

+--------------+--------------------------+--------------------------+--------------------------+
|sum(NumAlarms)|avg(ResponseDelayedinMins)|min(ResponseDelayedinMins)|max(ResponseDelayedinMins)|
+--------------+--------------------------+--------------------------+--------------------------+
|        176170|         3.892364154521585|               0.016666668|                   1844.55|
+--------------+--------------------------+--------------------------+--------------------------+

