# Learning Spark - Chapter 3 (Python)
## Apache Spark’s Structured APIs

In [55]:
# import sys
from pyspark.sql import SparkSession

from pyspark.sql.functions import avg
from pyspark.sql.functions import col

In [56]:
from pyspark.sql import *

In [57]:
# Create a DataFrame using SparkSession
spark = (SparkSession
.builder
.appName("AuthorsAges")
.getOrCreate())

In [58]:
# Create a DataFrame
data_df = (spark.createDataFrame([("Brooke", 20), ("Denny", 31), ("Jules", 30),("TD", 35), ("Brooke", 25)], ["name", "age"]))

In [59]:
data_df.explain(True)

== Parsed Logical Plan ==
LogicalRDD [name#637, age#638L], false

== Analyzed Logical Plan ==
name: string, age: bigint
LogicalRDD [name#637, age#638L], false

== Optimized Logical Plan ==
LogicalRDD [name#637, age#638L], false

== Physical Plan ==
*(1) Scan ExistingRDD[name#637,age#638L]



In [60]:
data_df.show()

+------+---+
|  name|age|
+------+---+
|Brooke| 20|
| Denny| 31|
| Jules| 30|
|    TD| 35|
|Brooke| 25|
+------+---+



In [61]:
# Group the same names together, aggregate their ages, and compute an average
avg_df = data_df.groupBy("name").agg(avg("age"))

In [62]:
avg_df.explain(True)

== Parsed Logical Plan ==
'Aggregate ['name], [unresolvedalias('name, None), avg('age) AS avg(age)#653]
+- LogicalRDD [name#637, age#638L], false

== Analyzed Logical Plan ==
name: string, avg(age): double
Aggregate [name#637], [name#637, avg(age#638L) AS avg(age)#653]
+- LogicalRDD [name#637, age#638L], false

== Optimized Logical Plan ==
Aggregate [name#637], [name#637, avg(age#638L) AS avg(age)#653]
+- LogicalRDD [name#637, age#638L], false

== Physical Plan ==
*(2) HashAggregate(keys=[name#637], functions=[avg(age#638L)], output=[name#637, avg(age)#653])
+- Exchange hashpartitioning(name#637, 200), ENSURE_REQUIREMENTS, [id=#487]
   +- *(1) HashAggregate(keys=[name#637], functions=[partial_avg(age#638L)], output=[name#637, sum#658, count#659L])
      +- *(1) Scan ExistingRDD[name#637,age#638L]



In [63]:
# Show the results of the final execution
data_df.select("name").where(col("name").isNotNull()).show()

+------+
|  name|
+------+
|Brooke|
| Denny|
| Jules|
|    TD|
|Brooke|
+------+



### The DataFrame API
#### Schemas and Creating DataFrames

In [64]:
from pyspark.sql.types import *

In [65]:
schema = StructType([StructField("author", StringType(), False),
StructField("title", StringType(), False),
StructField("pages", IntegerType(), False)])

In [66]:
schema = "author STRING, title STRING, pages INT"

In [67]:
# Define schema for our data using DDL
schema = "`Id` INT, `First` STRING, `Last` STRING, `Url` STRING,`Published` STRING, `Hits` INT, `Campaigns` ARRAY<STRING>"

In [68]:
# Create our static data
data = [[1, "Jules", "Damji", "https://tinyurl.1", "1/4/2016", 4535, ["twitter","LinkedIn"]],
        [2, "Brooke","Wenig", "https://tinyurl.2", "5/5/2018", 8908, ["twitter", "LinkedIn"]],
        [3, "Denny", "Lee", "https://tinyurl.3", "6/7/2019", 7659, ["web","twitter", "FB", "LinkedIn"]],
        [4, "Tathagata", "Das", "https://tinyurl.4", "5/12/2018", 10568,["twitter", "FB"]],
        [5, "Matei","Zaharia", "https://tinyurl.5", "5/14/2014", 40578, ["web","twitter", "FB", "LinkedIn"]],
        [6, "Reynold", "Xin", "https://tinyurl.6", "3/2/2015", 25568, ["twitter", "LinkedIn"]]
       ]

In [69]:
# Create a DataFrame using the schema defined above
blogs_df = spark.createDataFrame(data, schema)

In [70]:
blogs_df.explain(True)

== Parsed Logical Plan ==
LogicalRDD [Id#666, First#667, Last#668, Url#669, Published#670, Hits#671, Campaigns#672], false

== Analyzed Logical Plan ==
Id: int, First: string, Last: string, Url: string, Published: string, Hits: int, Campaigns: array<string>
LogicalRDD [Id#666, First#667, Last#668, Url#669, Published#670, Hits#671, Campaigns#672], false

== Optimized Logical Plan ==
LogicalRDD [Id#666, First#667, Last#668, Url#669, Published#670, Hits#671, Campaigns#672], false

== Physical Plan ==
*(1) Scan ExistingRDD[Id#666,First#667,Last#668,Url#669,Published#670,Hits#671,Campaigns#672]



In [71]:
# Show the DataFrame; it should reflect our table above
blogs_df.show()

+---+---------+-------+-----------------+---------+-----+--------------------+
| Id|    First|   Last|              Url|Published| Hits|           Campaigns|
+---+---------+-------+-----------------+---------+-----+--------------------+
|  1|    Jules|  Damji|https://tinyurl.1| 1/4/2016| 4535| [twitter, LinkedIn]|
|  2|   Brooke|  Wenig|https://tinyurl.2| 5/5/2018| 8908| [twitter, LinkedIn]|
|  3|    Denny|    Lee|https://tinyurl.3| 6/7/2019| 7659|[web, twitter, FB...|
|  4|Tathagata|    Das|https://tinyurl.4|5/12/2018|10568|       [twitter, FB]|
|  5|    Matei|Zaharia|https://tinyurl.5|5/14/2014|40578|[web, twitter, FB...|
|  6|  Reynold|    Xin|https://tinyurl.6| 3/2/2015|25568| [twitter, LinkedIn]|
+---+---------+-------+-----------------+---------+-----+--------------------+



In [72]:
blogs_df.schema

StructType(List(StructField(Id,IntegerType,true),StructField(First,StringType,true),StructField(Last,StringType,true),StructField(Url,StringType,true),StructField(Published,StringType,true),StructField(Hits,IntegerType,true),StructField(Campaigns,ArrayType(StringType,true),true)))

#### Rows

In [73]:
from pyspark.sql import Row
blog_row = Row(6, "Reynold", "Xin", "https://tinyurl.6", 255568, "3/2/2015",["twitter", "LinkedIn"])

In [74]:
# access using index for individual items
blog_row[6]

['twitter', 'LinkedIn']

In [75]:
rows = [Row("Matei Zaharia", "CA"), Row("Reynold Xin", "CA")]
authors_df = spark.createDataFrame(rows, ["Authors", "State"])

In [76]:
authors_df.show()

+-------------+-----+
|      Authors|State|
+-------------+-----+
|Matei Zaharia|   CA|
|  Reynold Xin|   CA|
+-------------+-----+



## Fire deparment

In [77]:
# In Python, define a schema
from pyspark.sql.types import *

In [78]:
# Programmatic way to define a schema
fire_schema = StructType([StructField('CallNumber', IntegerType(), True),
StructField('UnitID', StringType(), True),
StructField('IncidentNumber', IntegerType(), True),
StructField('CallType', StringType(), True),
StructField('CallDate', StringType(), True),
StructField('WatchDate', StringType(), True),
StructField('CallFinalDisposition', StringType(), True),
StructField('AvailableDtTm', StringType(), True),
StructField('Address', StringType(), True),
StructField('City', StringType(), True),
StructField('Zipcode', IntegerType(), True),
StructField('Battalion', StringType(), True),
StructField('StationArea', StringType(), True),
StructField('Box', StringType(), True),
StructField('OriginalPriority', StringType(), True),
StructField('Priority', StringType(), True),
StructField('FinalPriority', IntegerType(), True),
StructField('ALSUnit', BooleanType(), True),
StructField('CallTypeGroup', StringType(), True),
StructField('NumAlarms', IntegerType(), True),
StructField('UnitType', StringType(), True),
StructField('UnitSequenceInCallDispatch', IntegerType(), True),
StructField('FirePreventionDistrict', StringType(), True),
StructField('SupervisorDistrict', StringType(), True),
StructField('Neighborhood', StringType(), True),
StructField('Location', StringType(), True),
StructField('RowID', StringType(), True),
StructField('Delay', FloatType(), True)])

In [79]:
sf_fire_file = "C:/Users/jorgedario.mendez/OneDrive - Bosonit/Documentos/3. Libro/LearningSparkV2-master/databricks-datasets/learning-spark-v2/sf-fire/sf-fire-calls.csv"

In [80]:
fire_df = spark.read.csv(sf_fire_file, header=True, schema=fire_schema)

In [81]:
fire_df.explain(True)

== Parsed Logical Plan ==
Relation[CallNumber#722,UnitID#723,IncidentNumber#724,CallType#725,CallDate#726,WatchDate#727,CallFinalDisposition#728,AvailableDtTm#729,Address#730,City#731,Zipcode#732,Battalion#733,StationArea#734,Box#735,OriginalPriority#736,Priority#737,FinalPriority#738,ALSUnit#739,CallTypeGroup#740,NumAlarms#741,UnitType#742,UnitSequenceInCallDispatch#743,FirePreventionDistrict#744,SupervisorDistrict#745,... 4 more fields] csv

== Analyzed Logical Plan ==
CallNumber: int, UnitID: string, IncidentNumber: int, CallType: string, CallDate: string, WatchDate: string, CallFinalDisposition: string, AvailableDtTm: string, Address: string, City: string, Zipcode: int, Battalion: string, StationArea: string, Box: string, OriginalPriority: string, Priority: string, FinalPriority: int, ALSUnit: boolean, CallTypeGroup: string, NumAlarms: int, UnitType: string, UnitSequenceInCallDispatch: int, FirePreventionDistrict: string, SupervisorDistrict: string, ... 4 more fields
Relation[CallN

#### Saving a DataFrame as a Parquet file or SQL table.

In [109]:
#parquetPath = "C:/Users/jorgedario.mendez/OneDrive - Bosonit/Documentos/3. Libro/LearningSparkV2-master/JM Jupyter/parquepython1/"
#fire_df.write.format("parquet").save(parquetPath)

In [110]:
#parquetTable = "parquetablepython6"
#fire_df.write.format("parquet").saveAsTable(parquetTable)

#### Transformations and actions

In [84]:
from pyspark.sql.functions import col

In [85]:
few_fire_df = (fire_df.select("IncidentNumber", "AvailableDtTm", "CallType").where(col("CallType") != "Medical Incident"))

few_fire_df.show(5, truncate=False)

+--------------+----------------------+--------------+
|IncidentNumber|AvailableDtTm         |CallType      |
+--------------+----------------------+--------------+
|2003235       |01/11/2002 01:51:44 AM|Structure Fire|
|2003250       |01/11/2002 04:16:46 AM|Vehicle Fire  |
|2003259       |01/11/2002 06:01:58 AM|Alarms        |
|2003279       |01/11/2002 08:03:26 AM|Structure Fire|
|2003301       |01/11/2002 09:46:44 AM|Alarms        |
+--------------+----------------------+--------------+
only showing top 5 rows



In [86]:
from pyspark.sql.functions import *

In [87]:
# In Python, return number of distinct types of calls using countDistinct()
fire_df.select("CallType").where(col("CallType").isNotNull()).agg(countDistinct("CallType").alias("DistictCallTypes")).show()

+----------------+
|DistictCallTypes|
+----------------+
|              30|
+----------------+



In [111]:
#distinct call types in the data
(fire_df
 .select("CallType")
 .where(col("CallType").isNotNull())
 .distinct()
 .show(10, truncate = False))

+-----------------------------------+
|CallType                           |
+-----------------------------------+
|Elevator / Escalator Rescue        |
|Marine Fire                        |
|Aircraft Emergency                 |
|Confined Space / Structure Collapse|
|Administrative                     |
|Alarms                             |
|Odor (Strange / Unknown)           |
|Citizen Assist / Service Call      |
|HazMat                             |
|Watercraft in Distress             |
+-----------------------------------+
only showing top 10 rows



#### Renaming, adding, and dropping columns.

In [89]:
new_fire_df = fire_df.withColumnRenamed("Delay", "ResponseDelayedinMins")
(new_fire_df
.select("ResponseDelayedinMins")
.where(col("ResponseDelayedinMins") > 5)
.show(5, False))

+---------------------+
|ResponseDelayedinMins|
+---------------------+
|5.35                 |
|6.25                 |
|5.2                  |
|5.6                  |
|7.25                 |
+---------------------+
only showing top 5 rows



In [90]:
#Change date format
fire_ts_df = (new_fire_df
.withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy"))
.drop("CallDate")
.withColumn("OnWatchDate", to_timestamp(col("WatchDate"), "MM/dd/yyyy"))
.drop("WatchDate")
.withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"),"MM/dd/yyyy hh:mm:ss a"))
.drop("AvailableDtTm"))

In [117]:
fire_ts_df.select("IncidentDate", "OnWatchDate", "AvailableDtTS").show(5, False)

+-------------------+-------------------+-------------------+
|IncidentDate       |OnWatchDate        |AvailableDtTS      |
+-------------------+-------------------+-------------------+
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 01:51:44|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 03:01:18|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 02:39:50|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 04:16:46|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 06:01:58|
+-------------------+-------------------+-------------------+
only showing top 5 rows



In [92]:
(fire_ts_df
.select(year('IncidentDate'))
.distinct()
.orderBy(year('IncidentDate'))
.show())

+------------------+
|year(IncidentDate)|
+------------------+
|              2000|
|              2001|
|              2002|
|              2003|
|              2004|
|              2005|
|              2006|
|              2007|
|              2008|
|              2009|
|              2010|
|              2011|
|              2012|
|              2013|
|              2014|
|              2015|
|              2016|
|              2017|
|              2018|
+------------------+



#### Aggregations

what were the most common types of fire calls?

In [93]:
(fire_ts_df
.select("CallType")
.where(col("CallType").isNotNull())
.groupBy("CallType")
.count()
.orderBy("count", ascending=False)
.show(n=10, truncate=False))

+-------------------------------+------+
|CallType                       |count |
+-------------------------------+------+
|Medical Incident               |113794|
|Structure Fire                 |23319 |
|Alarms                         |19406 |
|Traffic Collision              |7013  |
|Citizen Assist / Service Call  |2524  |
|Other                          |2166  |
|Outside Fire                   |2094  |
|Vehicle Fire                   |854   |
|Gas Leak (Natural and LP Gases)|764   |
|Water Rescue                   |755   |
+-------------------------------+------+
only showing top 10 rows



### Other common DataFrame operations

the Data‐
Frame API provides descriptive statistical methods like min(), max(), sum(), and avg()

In [94]:
import pyspark.sql.functions as F

In [95]:
(fire_ts_df
.select(F.sum("NumAlarms"), F.avg("ResponseDelayedinMins"), 
        F.min("ResponseDelayedinMins"), F.max("ResponseDelayedinMins"))
.show())

+--------------+--------------------------+--------------------------+--------------------------+
|sum(NumAlarms)|avg(ResponseDelayedinMins)|min(ResponseDelayedinMins)|max(ResponseDelayedinMins)|
+--------------+--------------------------+--------------------------+--------------------------+
|        176170|         3.892364154521585|               0.016666668|                   1844.55|
+--------------+--------------------------+--------------------------+--------------------------+



### End-to-End DataFrame Example

#####  What were all the different types of fire calls in 2018?

In [96]:
(fire_ts_df.select("CallType").where(col("CallType").isNotNull()).distinct().show(truncate = False))

+--------------------------------------------+
|CallType                                    |
+--------------------------------------------+
|Elevator / Escalator Rescue                 |
|Marine Fire                                 |
|Aircraft Emergency                          |
|Confined Space / Structure Collapse         |
|Administrative                              |
|Alarms                                      |
|Odor (Strange / Unknown)                    |
|Citizen Assist / Service Call               |
|HazMat                                      |
|Watercraft in Distress                      |
|Explosion                                   |
|Oil Spill                                   |
|Vehicle Fire                                |
|Suspicious Package                          |
|Extrication / Entrapped (Machinery, Vehicle)|
|Other                                       |
|Outside Fire                                |
|Traffic Collision                           |
|Assist Polic

##### What months within the year 2018 saw the highest number of fire calls?

In [97]:
(fire_ts_df.select("IncidentDate")
 .where(year(col("IncidentDate")) == "2018")
 .groupBy(month(col("IncidentDate")))
 .count()
 .orderBy("count", ascending=False)
 .show(n=12, truncate=False))

+-------------------+-----+
|month(IncidentDate)|count|
+-------------------+-----+
|10                 |1068 |
|5                  |1047 |
|3                  |1029 |
|8                  |1021 |
|1                  |1007 |
|7                  |974  |
|6                  |974  |
|9                  |951  |
|4                  |947  |
|2                  |919  |
|11                 |199  |
+-------------------+-----+



##### Which neighborhood in San Francisco generated the most fire calls in 2018?

In [98]:
(fire_ts_df
 .where(col("City") == 'San Francisco')
 .groupBy(col("Neighborhood"))
 .count()
 .orderBy("count", ascending = False)
 .show(n = 5, truncate = False))

+------------------------------+-----+
|Neighborhood                  |count|
+------------------------------+-----+
|Tenderloin                    |7067 |
|South of Market               |5323 |
|Mission                       |4666 |
|Financial District/South Beach|3907 |
|Bayview Hunters Point         |2643 |
+------------------------------+-----+
only showing top 5 rows



##### Which neighborhoods had the worst response times to fire calls in 2018?

In [99]:
fire_ts_df.select("Neighborhood","ResponseDelayedinMins").show()

+--------------------+---------------------+
|        Neighborhood|ResponseDelayedinMins|
+--------------------+---------------------+
|     Pacific Heights|                 2.95|
|Bayview Hunters P...|                  4.7|
|          Tenderloin|            2.4333334|
|      Bernal Heights|                  1.5|
|    Western Addition|            3.4833333|
|Financial Distric...|                 1.75|
|Oceanview/Merced/...|            2.7166667|
|          Tenderloin|            1.7833333|
|           Japantown|            1.5166667|
| Castro/Upper Market|            2.7666667|
|             Mission|            2.1833334|
|           Excelsior|                  2.5|
|            Nob Hill|            2.4166667|
|      Outer Richmond|                 4.95|
|             Mission|            1.4166666|
|             Mission|            2.5333333|
|             Mission|            1.8833333|
|  West of Twin Peaks|                 5.35|
|      Inner Richmond|                  2.0|
|      Inn

In [100]:
(fire_ts_df.select("Neighborhood","ResponseDelayedinMins")
 .where(col("ResponseDelayedinMins").isNotNull())
 .groupBy(col("Neighborhood"))
 .agg(avg(col("ResponseDelayedinMins")).alias("avgTimeDelay"))
 .orderBy("avgTimeDelay", ascending = False)
 .show(n = 10, truncate = False))

+---------------------+------------------+
|Neighborhood         |avgTimeDelay      |
+---------------------+------------------+
|Treasure Island      |5.47149999499321  |
|Presidio             |4.9653753549934505|
|Mission Bay          |4.530760579728938 |
|McLaren Park         |4.309822855580256 |
|None                 |4.307180858534226 |
|Twin Peaks           |4.294008398269729 |
|Golden Gate Park     |4.249903662329421 |
|Lakeshore            |4.201812146300208 |
|Bayview Hunters Point|4.150424644461803 |
|Seacliff             |4.137820510795483 |
+---------------------+------------------+
only showing top 10 rows



##### Which week in the year in 2018 had the most fire calls?

In [101]:
(fire_ts_df.select("IncidentDate")
 .where(year(col("IncidentDate")) == "2018")
 .groupBy(weekofyear(col("IncidentDate")))
 .count()
 .orderBy("count", ascending=False)
 .show(n=52, truncate=False))

+------------------------+-----+
|weekofyear(IncidentDate)|count|
+------------------------+-----+
|22                      |259  |
|40                      |255  |
|43                      |250  |
|25                      |249  |
|1                       |246  |
|44                      |244  |
|32                      |243  |
|13                      |243  |
|11                      |240  |
|5                       |236  |
|18                      |236  |
|23                      |235  |
|2                       |234  |
|31                      |234  |
|42                      |234  |
|19                      |233  |
|34                      |232  |
|10                      |232  |
|8                       |232  |
|28                      |231  |
|21                      |231  |
|16                      |228  |
|7                       |228  |
|9                       |228  |
|38                      |226  |
|6                       |225  |
|33                      |225  |
|20       

#####  Is there a correlation between neighborhood, zip code, and number of fire calls?

In [102]:
(fire_ts_df
 .where(col("Zipcode").isNotNull())
 .groupBy(col("Zipcode"))
 .count()
 .orderBy("count", ascending = False)
 .show(n = 20, truncate = False))

+-------+-----+
|Zipcode|count|
+-------+-----+
|94102  |21840|
|94103  |20897|
|94110  |14801|
|94109  |14686|
|94124  |9236 |
|94112  |8421 |
|94115  |7812 |
|94107  |6941 |
|94122  |6355 |
|94133  |6246 |
|94117  |5804 |
|94114  |5175 |
|94118  |5157 |
|94134  |5009 |
|94121  |4555 |
|94132  |4321 |
|94105  |4236 |
|94108  |4084 |
|94116  |3933 |
|94123  |3719 |
+-------+-----+
only showing top 20 rows



##### How can we use Parquet files or SQL tables to store this data and read it back?

In [103]:
# parquetPath = "/fire_ts_df/"
# fire_ts_df.write.format("parquet").save(parquetPath)

In [104]:
# parquetTable = "fire_ts_df_table1"
# fire_ts_df.write.format("parquet").saveAsTable(parquetTable)

### Typed Objects, Untyped Objects, and Generic Rows

In [105]:
from pyspark.sql import Row
row = Row(350, True, "Learning Spark 2E", None)

In [106]:
row[0]

350

In [107]:
row[1]

True

In [108]:
row[2]

'Learning Spark 2E'