## Spark Download

First of all, you need to download spark and unzip it in a folder on your computer. You can download the zip file in here: https://spark.apache.org/downloads.html <br/>
After that you need to add 3 environment variables. For Windows users, you can follow those steps:
- Press Windows+R and run the command: sysdm.cpl
- Click on Advanced > Environment Variables
- Add those variables in system variables by clicking new:
    - SPARK_HOME: add the path where you unzipped the spark zip file
    - HADOOP_HOME: add the same path in here
    - JAVA_HOME: add the java directory path (if you don't have java installed, just search download java jdk on google. I am using the jdk1.8.0)
- You should also add the bin folder of spark in your path variable (YOURPATH/bin)
- Download the hadoop.dll and winutils.exe from here: https://github.com/cdarlint/winutils, and put them in the spark bin folder


To make sure that your spark is working you can just run the command "pyspark" in cmd.<br/>



In [2]:
# Import findspark to find spark and import it
import findspark
findspark.init()
# Import spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import count

In [2]:
# Initiate a session
spark = (SparkSession
        .builder
        .appName("PythonMnMCount")
        .getOrCreate())

In [3]:
# get the M&M data set file name
mnm_file = "data/mnm_dataset.csv"
# read the file into a Spark DataFrame
mnm_df = (spark.read.format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load(mnm_file))

In [4]:
# Show the DataFrame
mnm_df.show(5, truncate=False) #if truncate is true, truncate strings longer than 20

+-----+------+-----+
|State|Color |Count|
+-----+------+-----+
|TX   |Red   |20   |
|NV   |Blue  |66   |
|CO   |Blue  |79   |
|OR   |Blue  |71   |
|WA   |Yellow|93   |
+-----+------+-----+
only showing top 5 rows



In [5]:
# aggregate count of all colors and groupBy state and color
# orderBy descending order
count_mnm_df = (mnm_df.select("State", "Color", "Count")
                .groupBy("State", "Color")
                .sum("Count")
                .orderBy("sum(Count)", ascending=False))


In [6]:
# show all the resulting aggregation for all the dates and colors
count_mnm_df.show(n=60, truncate=False)
print("Total Rows = %d" % (count_mnm_df.count()))

+-----+------+----------+
|State|Color |sum(Count)|
+-----+------+----------+
|CA   |Yellow|100956    |
|WA   |Green |96486     |
|CA   |Brown |95762     |
|TX   |Green |95753     |
|TX   |Red   |95404     |
|CO   |Yellow|95038     |
|NM   |Red   |94699     |
|OR   |Orange|94514     |
|WY   |Green |94339     |
|NV   |Orange|93929     |
|TX   |Yellow|93819     |
|CO   |Green |93724     |
|CO   |Brown |93692     |
|CA   |Green |93505     |
|NM   |Brown |93447     |
|CO   |Blue  |93412     |
|WA   |Red   |93332     |
|WA   |Brown |93082     |
|WA   |Yellow|92920     |
|NM   |Yellow|92747     |
|NV   |Brown |92478     |
|TX   |Orange|92315     |
|AZ   |Brown |92287     |
|AZ   |Green |91882     |
|WY   |Red   |91768     |
|AZ   |Orange|91684     |
|CA   |Red   |91527     |
|WA   |Orange|91521     |
|NV   |Yellow|91390     |
|UT   |Orange|91341     |
|NV   |Green |91331     |
|NM   |Orange|91251     |
|NM   |Green |91160     |
|WY   |Blue  |91002     |
|UT   |Red   |90995     |
|CO   |Orang

In [7]:
# find the aggregate count for California by filtering
ca_count_mnm_df = (mnm_df.select("*")
                   .where(mnm_df.State == 'CA')
                   .groupBy("Color")
                   .sum("Count")
                   .orderBy("sum(Count)", ascending=False))

In [8]:
ca_count_mnm_df.show(truncate = False)

+------+----------+
|Color |sum(Count)|
+------+----------+
|Yellow|100956    |
|Brown |95762     |
|Green |93505     |
|Red   |91527     |
|Orange|90311     |
|Blue  |89123     |
+------+----------+



In [9]:
spark.stop()

# Operators and Schemas

In [10]:
import pyspark

In [11]:
conf = pyspark.SparkConf().setAppName('HelloWorld').setMaster('local')
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)

In [12]:
# Create an RDD of tuples (name, age)
dataRDD = sc.parallelize([("Brooke", 20), ("Denny", 31), ("Jules", 30),
 ("TD", 35), ("Brooke", 25)])
# Use map and reduceByKey transformations with their lambda
# expressions to aggregate and then compute average
agesRDD = (dataRDD
 .map(lambda x: (x[0], (x[1], 1)))
 .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
 .map(lambda x: (x[0], x[1][0]/x[1][1])))

In [13]:
# Same operation with operators
# Create a DataFrame using SparkSession
from pyspark.sql.functions import avg

spark = (SparkSession
 .builder
 .appName("AuthorsAges")
 .getOrCreate())
# Create a DataFrame
data_df = spark.createDataFrame([("Brooke", 20), ("Denny", 31), ("Jules", 30),
 ("TD", 35), ("Brooke", 25)],["name", "age"])


In [14]:
data_df.show()

+------+---+
|  name|age|
+------+---+
|Brooke| 20|
| Denny| 31|
| Jules| 30|
|    TD| 35|
|Brooke| 25|
+------+---+



In [15]:
avg_df = data_df.groupBy("name").agg(avg("age"))
avg_df.show()

+------+--------+
|  name|avg(age)|
+------+--------+
|Brooke|    22.5|
| Jules|    30.0|
|    TD|    35.0|
| Denny|    31.0|
+------+--------+



In [16]:
# Create a schema programmatically for a DataFrame
from pyspark.sql.types import *
schema = StructType([StructField("author", StringType(), False),
 StructField("title", StringType(), False),
 StructField("pages", IntegerType(), False)])
# Same structure using DDL
schema_DDL = "author STRING, title STRING, pages INT"

In [17]:
# Define schema for our data using DDL
schema = """`Id` INT, 
`First` STRING, 
`Last` STRING, 
`Url` STRING,
`Published` STRING, 
`Hits` INT, 
`Campaigns` ARRAY<STRING>"""
# Create our static data
data = [[1, "Jules", "Damji", "https://tinyurl.1", "1/4/2016", 4535, ["twitter","LinkedIn"]],
 [2, "Brooke","Wenig", "https://tinyurl.2", "5/5/2018", 8908, ["twitter",
"LinkedIn"]],
 [3, "Denny", "Lee", "https://tinyurl.3", "6/7/2019", 7659, ["web",
"twitter", "FB", "LinkedIn"]],
 [4, "Tathagata", "Das", "https://tinyurl.4", "5/12/2018", 10568,
["twitter", "FB"]],
 [5, "Matei","Zaharia", "https://tinyurl.5", "5/14/2014", 40578, ["web",
"twitter", "FB", "LinkedIn"]],
 [6, "Reynold", "Xin", "https://tinyurl.6", "3/2/2015", 25568,
["twitter", "LinkedIn"]]
 ]
# Create a DataFrame using the schema
blogs_df = spark.createDataFrame(data, schema)



In [18]:
blogs_df.show(truncate = False)

+---+---------+-------+-----------------+---------+-----+----------------------------+
|Id |First    |Last   |Url              |Published|Hits |Campaigns                   |
+---+---------+-------+-----------------+---------+-----+----------------------------+
|1  |Jules    |Damji  |https://tinyurl.1|1/4/2016 |4535 |[twitter, LinkedIn]         |
|2  |Brooke   |Wenig  |https://tinyurl.2|5/5/2018 |8908 |[twitter, LinkedIn]         |
|3  |Denny    |Lee    |https://tinyurl.3|6/7/2019 |7659 |[web, twitter, FB, LinkedIn]|
|4  |Tathagata|Das    |https://tinyurl.4|5/12/2018|10568|[twitter, FB]               |
|5  |Matei    |Zaharia|https://tinyurl.5|5/14/2014|40578|[web, twitter, FB, LinkedIn]|
|6  |Reynold  |Xin    |https://tinyurl.6|3/2/2015 |25568|[twitter, LinkedIn]         |
+---+---------+-------+-----------------+---------+-----+----------------------------+



In [19]:
# print the schema used by Spark to build the DataFrame
blogs_df.printSchema()

root
 |-- Id: integer (nullable = true)
 |-- First: string (nullable = true)
 |-- Last: string (nullable = true)
 |-- Url: string (nullable = true)
 |-- Published: string (nullable = true)
 |-- Hits: integer (nullable = true)
 |-- Campaigns: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [20]:
# To show the schema and use it to build a Dataframe using the same schema
blogs_df.schema

StructType(List(StructField(Id,IntegerType,true),StructField(First,StringType,true),StructField(Last,StringType,true),StructField(Url,StringType,true),StructField(Published,StringType,true),StructField(Hits,IntegerType,true),StructField(Campaigns,ArrayType(StringType,true),true)))

# Columns and Rows
In Spark’s supported languages, columns are objects with public methods (represented by the
Column type).


In [21]:
from pyspark.sql.functions import *

In [22]:
# Print the blogs_df columns list
blogs_df.columns

['Id', 'First', 'Last', 'Url', 'Published', 'Hits', 'Campaigns']

In [23]:
# Here we're looking at the hits column
blogs_df.select("Hits").show(2)

+----+
|Hits|
+----+
|4535|
|8908|
+----+
only showing top 2 rows



In [24]:
# we can use the expr function to calculate the Hits column multiplied by 2
blogs_df.select(expr("Hits * 2")).show(2)

+----------+
|(Hits * 2)|
+----------+
|      9070|
|     17816|
+----------+
only showing top 2 rows



In [25]:
# We can also use the col function to calculate the value
blogs_df.select(col("Hits") * 2).show(2)

+----------+
|(Hits * 2)|
+----------+
|      9070|
|     17816|
+----------+
only showing top 2 rows



In [26]:
# Use an expression to compute big hitters for blogs
# This adds a new column, Big Hitters, based on the conditional expression
blogs_df.withColumn("Big Hitters", (expr("Hits > 10000"))).show()

+---+---------+-------+-----------------+---------+-----+--------------------+-----------+
| Id|    First|   Last|              Url|Published| Hits|           Campaigns|Big Hitters|
+---+---------+-------+-----------------+---------+-----+--------------------+-----------+
|  1|    Jules|  Damji|https://tinyurl.1| 1/4/2016| 4535| [twitter, LinkedIn]|      false|
|  2|   Brooke|  Wenig|https://tinyurl.2| 5/5/2018| 8908| [twitter, LinkedIn]|      false|
|  3|    Denny|    Lee|https://tinyurl.3| 6/7/2019| 7659|[web, twitter, FB...|      false|
|  4|Tathagata|    Das|https://tinyurl.4|5/12/2018|10568|       [twitter, FB]|       true|
|  5|    Matei|Zaharia|https://tinyurl.5|5/14/2014|40578|[web, twitter, FB...|       true|
|  6|  Reynold|    Xin|https://tinyurl.6| 3/2/2015|25568| [twitter, LinkedIn]|       true|
+---+---------+-------+-----------------+---------+-----+--------------------+-----------+



In [27]:
# Concatenate three columns, create a new column, and show the
# newly created concatenated column
blogs_df \
.withColumn("AuthorsId", (concat(expr("First"), expr("Last"), expr("Id")))) \
.select(col("AuthorsId")) \
.show(4) \

+-------------+
|    AuthorsId|
+-------------+
|  JulesDamji1|
| BrookeWenig2|
|    DennyLee3|
|TathagataDas4|
+-------------+
only showing top 4 rows



In [28]:
# Sort by Id column in descending order
blogs_df.sort(col("Id").desc()).show()
# blogs_df.sort(blogs_df.Id.desc()).show() # Second method

+---+---------+-------+-----------------+---------+-----+--------------------+
| Id|    First|   Last|              Url|Published| Hits|           Campaigns|
+---+---------+-------+-----------------+---------+-----+--------------------+
|  6|  Reynold|    Xin|https://tinyurl.6| 3/2/2015|25568| [twitter, LinkedIn]|
|  5|    Matei|Zaharia|https://tinyurl.5|5/14/2014|40578|[web, twitter, FB...|
|  4|Tathagata|    Das|https://tinyurl.4|5/12/2018|10568|       [twitter, FB]|
|  3|    Denny|    Lee|https://tinyurl.3| 6/7/2019| 7659|[web, twitter, FB...|
|  2|   Brooke|  Wenig|https://tinyurl.2| 5/5/2018| 8908| [twitter, LinkedIn]|
|  1|    Jules|  Damji|https://tinyurl.1| 1/4/2016| 4535| [twitter, LinkedIn]|
+---+---------+-------+-----------------+---------+-----+--------------------+



In [32]:
from pyspark.sql import Row

blog_row = Row(6, "Reynold", "Xin", "https://tinyurl.6", 255568, "3/2/2015",
 ["twitter", "LinkedIn"])
# access using index for individual items
blog_row[0]


6

In [33]:
# We can create DataFrames using rows
rows = [Row("Matei Zaharia", "CA")]
authors_df = spark.createDataFrame(rows, ["Authors", "State"])
authors_df.show()

+-------------+-----+
|      Authors|State|
+-------------+-----+
|Matei Zaharia|   CA|
+-------------+-----+



In [34]:
# Now we can use the blog row and add it to the blogs_df
blog_row = [Row(7, "Reynold", "Xin", "https://tinyurl.6", "3/2/2015", 255568,
 ["twitter", "LinkedIn"])]
newRow = spark.createDataFrame(blog_row, list(blogs_df.schema.fieldNames()))
blogs_df.union(newRow).show()

+---+---------+-------+-----------------+---------+------+--------------------+
| Id|    First|   Last|              Url|Published|  Hits|           Campaigns|
+---+---------+-------+-----------------+---------+------+--------------------+
|  1|    Jules|  Damji|https://tinyurl.1| 1/4/2016|  4535| [twitter, LinkedIn]|
|  2|   Brooke|  Wenig|https://tinyurl.2| 5/5/2018|  8908| [twitter, LinkedIn]|
|  3|    Denny|    Lee|https://tinyurl.3| 6/7/2019|  7659|[web, twitter, FB...|
|  4|Tathagata|    Das|https://tinyurl.4|5/12/2018| 10568|       [twitter, FB]|
|  5|    Matei|Zaharia|https://tinyurl.5|5/14/2014| 40578|[web, twitter, FB...|
|  6|  Reynold|    Xin|https://tinyurl.6| 3/2/2015| 25568| [twitter, LinkedIn]|
|  7|  Reynold|    Xin|https://tinyurl.6| 3/2/2015|255568| [twitter, LinkedIn]|
+---+---------+-------+-----------------+---------+------+--------------------+



# Common DataFrame Operations


## Using DataFrameReader and DataFrameWriter

Reading and writing are simple in Spark because of these high-level abstractions and
contributions from the community to connect to a wide variety of data sources,
including common NoSQL stores, RDBMSs, streaming engines such as Apache Kafka
and Kinesis, and more.<br/>
To get started, let’s read a large CSV file containing data on San Francisco Fire
Department calls.<br/>
We will define a schema for this file and use the DataFrameReader class and its methods to tell Spark what to do. Because this file contains 28 columns and over 4,380,660 records, it’s more efficient to define a schema than have Spark infer it.


In [35]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [36]:
# Programmatic way to define a schema
fire_schema = StructType([StructField('CallNumber', IntegerType(), True),
 StructField('UnitID', StringType(), True),
 StructField('IncidentNumber', IntegerType(), True),
 StructField('CallType', StringType(), True),
 StructField('CallDate', StringType(), True),
 StructField('WatchDate', StringType(), True),
 StructField('CallFinalDisposition', StringType(), True),
 StructField('AvailableDtTm', StringType(), True),
 StructField('Address', StringType(), True),
 StructField('City', StringType(), True),
 StructField('Zipcode', IntegerType(), True),
 StructField('Battalion', StringType(), True),
 StructField('StationArea', StringType(), True),
 StructField('Box', StringType(), True),
 StructField('OriginalPriority', StringType(), True),
 StructField('Priority', StringType(), True),
 StructField('FinalPriority', IntegerType(), True),
 StructField('ALSUnit', BooleanType(), True),
 StructField('CallTypeGroup', StringType(), True),
 StructField('NumAlarms', IntegerType(), True),
 StructField('UnitType', StringType(), True),
 StructField('UnitSequenceInCallDispatch', IntegerType(), True),
 StructField('FirePreventionDistrict', StringType(), True),
 StructField('SupervisorDistrict', StringType(), True),
 StructField('Neighborhood', StringType(), True),
 StructField('Location', StringType(), True),
 StructField('RowID', StringType(), True),
 StructField('Delay', FloatType(), True)])

# Use the DataFrameReader interface to read a CSV file
sf_fire_file = "data/sf-fire-calls.csv"
fire_df = spark.read.csv(sf_fire_file, header=True, schema=fire_schema)


In [37]:
spark.conf.set("spark.sql.parquet.compression.codec", "gzip")

In [38]:
# You can get an error if you need more memory to perform the operation, or if you don't have hadoop.dll in your spark bin directory
parquet_path = "data/fire_calls_parquet"
fire_df.write.format("parquet").save(parquet_path)

In [39]:
# You can also save a parquet table which registers metadata with the Hive metastore
parquet_table = "parquet_fire_call_table" # name of the table
fire_df.write.format("parquet").saveAsTable(parquet_table)

### Projections and filters
A projection in relational parlance is a way to return only the
rows matching a certain relational condition by using filters. In Spark, projections are
done with the select() method, while filters can be expressed using the filter() or
where() method. We can use this technique to examine specific aspects of our SF Fire
Department data set:


In [40]:
# Filter on the initial DataFrame
few_fire_df = (fire_df
 .select("IncidentNumber", "AvailableDtTm", "CallType")
 .where(col("CallType") != "Medical Incident"))
# Show the 5 first rows of the resulting DataFrame without truncating the long strings
few_fire_df.show(5, truncate=False)


+--------------+----------------------+--------------+
|IncidentNumber|AvailableDtTm         |CallType      |
+--------------+----------------------+--------------+
|2003235       |01/11/2002 01:51:44 AM|Structure Fire|
|2003250       |01/11/2002 04:16:46 AM|Vehicle Fire  |
|2003259       |01/11/2002 06:01:58 AM|Alarms        |
|2003279       |01/11/2002 08:03:26 AM|Structure Fire|
|2003301       |01/11/2002 09:46:44 AM|Alarms        |
+--------------+----------------------+--------------+
only showing top 5 rows



In [41]:
# To count the number of distinct call types
(fire_df.select("CallType")
 .where(col("CallType")
        .isNotNull())
 .agg(countDistinct("CallType")
      .alias("DistinctCallTypes")).show())


+-----------------+
|DistinctCallTypes|
+-----------------+
|               30|
+-----------------+



In [42]:
# Show the distinct call types
(fire_df
 .select("CallType")
 .where(col("CallType").isNotNull())
 .distinct()
 .show(10, False))

+-----------------------------------+
|CallType                           |
+-----------------------------------+
|Elevator / Escalator Rescue        |
|Marine Fire                        |
|Aircraft Emergency                 |
|Confined Space / Structure Collapse|
|Administrative                     |
|Alarms                             |
|Odor (Strange / Unknown)           |
|Citizen Assist / Service Call      |
|HazMat                             |
|Watercraft in Distress             |
+-----------------------------------+
only showing top 10 rows



In [43]:
# Rename column using withColumnRenamed
new_fire_df = fire_df.withColumnRenamed("Delay", "ResponseDelayedinMins")
(new_fire_df
 .select("ResponseDelayedinMins")
 .where(col("ResponseDelayedinMins") > 5)
 .show(5, False))


+---------------------+
|ResponseDelayedinMins|
+---------------------+
|5.35                 |
|6.25                 |
|5.2                  |
|5.6                  |
|7.25                 |
+---------------------+
only showing top 5 rows



In [44]:
# Transform date to timestamp
fire_ts_df = (new_fire_df
 .withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy"))
 .drop("CallDate")
 .withColumn("OnWatchDate", to_timestamp(col("WatchDate"), "MM/dd/yyyy"))
 .drop("WatchDate")
 .withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"),
 "MM/dd/yyyy hh:mm:ss a"))
 .drop("AvailableDtTm"))

(fire_df.select("CallDate", "WatchDate", "AvailableDtTm").show(5, False))
(fire_ts_df
 .select("IncidentDate", "OnWatchDate", "AvailableDtTS")
 .show(5, False))


+----------+----------+----------------------+
|CallDate  |WatchDate |AvailableDtTm         |
+----------+----------+----------------------+
|01/11/2002|01/10/2002|01/11/2002 01:51:44 AM|
|01/11/2002|01/10/2002|01/11/2002 03:01:18 AM|
|01/11/2002|01/10/2002|01/11/2002 02:39:50 AM|
|01/11/2002|01/10/2002|01/11/2002 04:16:46 AM|
|01/11/2002|01/10/2002|01/11/2002 06:01:58 AM|
+----------+----------+----------------------+
only showing top 5 rows

+-------------------+-------------------+-------------------+
|IncidentDate       |OnWatchDate        |AvailableDtTS      |
+-------------------+-------------------+-------------------+
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 01:51:44|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 03:01:18|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 02:39:50|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 04:16:46|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 06:01:58|
+-------------------+-------------------+---------------

In [45]:
# Let's see how many years of fire calls we've got in the dataframe
(fire_ts_df
 .select(year('IncidentDate'))
 .distinct()
 .orderBy(year('IncidentDate'))
 .show())


+------------------+
|year(IncidentDate)|
+------------------+
|              2000|
|              2001|
|              2002|
|              2003|
|              2004|
|              2005|
|              2006|
|              2007|
|              2008|
|              2009|
|              2010|
|              2011|
|              2012|
|              2013|
|              2014|
|              2015|
|              2016|
|              2017|
|              2018|
+------------------+



## Aggregations

In [46]:
fire_df.columns

['CallNumber',
 'UnitID',
 'IncidentNumber',
 'CallType',
 'CallDate',
 'WatchDate',
 'CallFinalDisposition',
 'AvailableDtTm',
 'Address',
 'City',
 'Zipcode',
 'Battalion',
 'StationArea',
 'Box',
 'OriginalPriority',
 'Priority',
 'FinalPriority',
 'ALSUnit',
 'CallTypeGroup',
 'NumAlarms',
 'UnitType',
 'UnitSequenceInCallDispatch',
 'FirePreventionDistrict',
 'SupervisorDistrict',
 'Neighborhood',
 'Location',
 'RowID',
 'Delay']

In [47]:
# The count of call types
(fire_df
.select("CallType")
 .groupBy("CallType")
 .count()
.orderBy("count", ascending=False)
.show())

+--------------------+------+
|            CallType| count|
+--------------------+------+
|    Medical Incident|113794|
|      Structure Fire| 23319|
|              Alarms| 19406|
|   Traffic Collision|  7013|
|Citizen Assist / ...|  2524|
|               Other|  2166|
|        Outside Fire|  2094|
|        Vehicle Fire|   854|
|Gas Leak (Natural...|   764|
|        Water Rescue|   755|
|Odor (Strange / U...|   490|
|   Electrical Hazard|   482|
|Elevator / Escala...|   453|
|Smoke Investigati...|   391|
|          Fuel Spill|   193|
|              HazMat|   124|
|Industrial Accidents|    94|
|           Explosion|    89|
|Train / Rail Inci...|    57|
|  Aircraft Emergency|    36|
+--------------------+------+
only showing top 20 rows



In [48]:
# The number of calls by zip code
(fire_df
.select("Zipcode")
 .groupBy("Zipcode")
 .count()
.orderBy("count", ascending=False)
.show())

+-------+-----+
|Zipcode|count|
+-------+-----+
|  94102|21840|
|  94103|20897|
|  94110|14801|
|  94109|14686|
|  94124| 9236|
|  94112| 8421|
|  94115| 7812|
|  94107| 6941|
|  94122| 6355|
|  94133| 6246|
|  94117| 5804|
|  94114| 5175|
|  94118| 5157|
|  94134| 5009|
|  94121| 4555|
|  94132| 4321|
|  94105| 4236|
|  94108| 4084|
|  94116| 3933|
|  94123| 3719|
+-------+-----+
only showing top 20 rows



In [49]:
# Use of other operations
(fire_ts_df
 .select(sum("NumAlarms"), 
         avg("ResponseDelayedinMins"), 
         min("ResponseDelayedinMins"), 
         max("ResponseDelayedinMins"))
 .withColumnRenamed("sum(NumAlarms)","SumNumAlarms")
 .withColumnRenamed("avg(ResponseDelayedinMins)","AvgResponseDelayedinMins")
 .withColumnRenamed("min(ResponseDelayedinMins)","MinResponseDelayedinMins")
 .withColumnRenamed("max(ResponseDelayedinMins)","MaxResponseDelayedinMins")
 .show())

+------------+------------------------+------------------------+------------------------+
|SumNumAlarms|AvgResponseDelayedinMins|MinResponseDelayedinMins|MaxResponseDelayedinMins|
+------------+------------------------+------------------------+------------------------+
|      176170|       3.892364154521585|             0.016666668|                 1844.55|
+------------+------------------------+------------------------+------------------------+



## Additional questions to answer about the dataframe

• What were all the different types of fire calls in 2018?


In [50]:
fire_ts_df.columns

['CallNumber',
 'UnitID',
 'IncidentNumber',
 'CallType',
 'CallFinalDisposition',
 'Address',
 'City',
 'Zipcode',
 'Battalion',
 'StationArea',
 'Box',
 'OriginalPriority',
 'Priority',
 'FinalPriority',
 'ALSUnit',
 'CallTypeGroup',
 'NumAlarms',
 'UnitType',
 'UnitSequenceInCallDispatch',
 'FirePreventionDistrict',
 'SupervisorDistrict',
 'Neighborhood',
 'Location',
 'RowID',
 'ResponseDelayedinMins',
 'IncidentDate',
 'OnWatchDate',
 'AvailableDtTS']

In [51]:
(fire_ts_df
 .where(year("IncidentDate") == 2018)
.groupBy("CallType")
 .count()
 .orderBy("count", ascending = False)
.show(truncate=False))


+-------------------------------+-----+
|CallType                       |count|
+-------------------------------+-----+
|Medical Incident               |7004 |
|Alarms                         |1144 |
|Structure Fire                 |906  |
|Traffic Collision              |433  |
|Outside Fire                   |153  |
|Other                          |114  |
|Citizen Assist / Service Call  |113  |
|Gas Leak (Natural and LP Gases)|69   |
|Water Rescue                   |43   |
|Elevator / Escalator Rescue    |36   |
|Electrical Hazard              |30   |
|Vehicle Fire                   |28   |
|Smoke Investigation (Outside)  |28   |
|Odor (Strange / Unknown)       |10   |
|Fuel Spill                     |10   |
|Train / Rail Incident          |5    |
|HazMat                         |5    |
|Suspicious Package             |3    |
|Explosion                      |1    |
|Assist Police                  |1    |
+-------------------------------+-----+



• What months within the year 2018 saw the highest number of fire calls?

In [52]:
(fire_ts_df
.where(year("IncidentDate") == 2018)
.groupBy(month("IncidentDate"))
.count()
 .orderBy("count", ascending=False)
.show())


+-------------------+-----+
|month(IncidentDate)|count|
+-------------------+-----+
|                 10| 1068|
|                  5| 1047|
|                  3| 1029|
|                  8| 1021|
|                  1| 1007|
|                  7|  974|
|                  6|  974|
|                  9|  951|
|                  4|  947|
|                  2|  919|
|                 11|  199|
+-------------------+-----+



• Which neighborhood in San Francisco generated the most fire calls in 2018?


In [53]:
(fire_ts_df
 .select("Neighborhood")
.where(year("IncidentDate") == 2018)
.groupBy("Neighborhood")
.count()
.orderBy("count", ascending=False)
.show())

+--------------------+-----+
|        Neighborhood|count|
+--------------------+-----+
|          Tenderloin| 1393|
|     South of Market| 1053|
|             Mission|  913|
|Financial Distric...|  772|
|Bayview Hunters P...|  522|
|    Western Addition|  352|
|     Sunset/Parkside|  346|
|            Nob Hill|  295|
|        Hayes Valley|  291|
|      Outer Richmond|  262|
| Castro/Upper Market|  251|
|         North Beach|  231|
|           Excelsior|  212|
|        Potrero Hill|  210|
|  West of Twin Peaks|  210|
|     Pacific Heights|  191|
|              Marina|  191|
|           Chinatown|  191|
|         Mission Bay|  178|
|      Bernal Heights|  170|
+--------------------+-----+
only showing top 20 rows



• Which neighborhoods had the worst response times to fire calls in 2018?

In [54]:
(fire_ts_df
 .select("Neighborhood","ResponseDelayedinMins")
.where(year("IncidentDate") == 2018)
.groupBy("Neighborhood")
.sum()
.orderBy("sum(ResponseDelayedinMins)", ascending=False)
.show())

+--------------------+--------------------------+
|        Neighborhood|sum(ResponseDelayedinMins)|
+--------------------+--------------------------+
|          Tenderloin|         5713.416682377458|
|     South of Market|         4019.916673846543|
|Financial Distric...|        3353.6333242356777|
|             Mission|        3150.3333284556866|
|Bayview Hunters P...|        2411.9333442747593|
|     Sunset/Parkside|        1240.1333360522985|
|           Chinatown|        1182.3499933183193|
|    Western Addition|        1156.0833313167095|
|            Nob Hill|        1120.9999947845936|
|        Hayes Valley|         980.7833325713873|
|      Outer Richmond|         955.7999980300665|
| Castro/Upper Market|           954.11666418612|
|         North Beach|         898.4166664481163|
|  West of Twin Peaks|         880.1000022888184|
|        Potrero Hill|         880.0166715979576|
|           Excelsior|          834.516668587923|
|     Pacific Heights|         798.4666626155376|


• Which week in the year in 2018 had the most fire calls?


In [55]:
(fire_ts_df
.where(year("IncidentDate") == 2018)
.groupBy(weekofyear("IncidentDate"))
.count()
 .orderBy("count", ascending=False)
.show())

+------------------------+-----+
|weekofyear(IncidentDate)|count|
+------------------------+-----+
|                      22|  259|
|                      40|  255|
|                      43|  250|
|                      25|  249|
|                       1|  246|
|                      44|  244|
|                      32|  243|
|                      13|  243|
|                      11|  240|
|                       5|  236|
|                      18|  236|
|                      23|  235|
|                       2|  234|
|                      31|  234|
|                      42|  234|
|                      19|  233|
|                      10|  232|
|                       8|  232|
|                      34|  232|
|                      21|  231|
+------------------------+-----+
only showing top 20 rows



• Is there a correlation between neighborhood, zip code, and number of fire calls?

In [56]:
from pyspark.ml.feature import StringIndexer

# Correlation between zip code and number of fire calls
corr_zip_code_num_fire_calls = fire_df.select("ZipCode").groupBy("ZipCode").count().corr("Zipcode", "count")

# Create a numerical column fro the neighborhood 
indexer = StringIndexer(inputCol="Neighborhood", outputCol="NeighborhoodIndex")
corr_zip_code_neighborhood = fire_df.select("ZipCode","Neighborhood")
indexed = indexer.fit(corr_zip_code_neighborhood).transform(corr_zip_code_neighborhood)

# Correlation between number of fire calls and neighborhood
corr_neighborhood_number_of_calls = indexed.groupBy('NeighborhoodIndex').count().corr("count", "NeighborhoodIndex")

print(f"The correlation between number of fire calls and zip code is : {corr_zip_code_num_fire_calls}")
print(f"The correlation between number of fire calls and neighborhood is : {corr_neighborhood_number_of_calls}")
print(f"The correlation between neighborhood and zip code is : {indexed.corr('NeighborhoodIndex','ZipCode')}")
      

The correlation between number of fire calls and zip code is : 0.21485589193343801
The correlation between number of fire calls and neighborhood is : -0.776636689751461
The correlation between neighborhood and zip code is : -0.08293454442147614


# Using Spark SQL in Spark Applications


In [3]:
# Launch a Spark session
spark = (SparkSession
 .builder
 .appName("SparkSQLExampleApp")
.master("local")
.enableHiveSupport()
.config("spark.sql.catalogImplementation","hive")
 .getOrCreate())

# Path to file
csv_file = "data/departuredelays.csv"


In [4]:
# Read and create a temporary view
# Infer schema (note that for larger files you
# may want to specify the schema)
df = (spark.read.format("csv")
 .option("inferSchema", "true")
 .option("header", "true")
 .load(csv_file))
df.createOrReplaceTempView("us_delay_flights_tbl")

In [5]:
df.show()

+-------+-----+--------+------+-----------+
|   date|delay|distance|origin|destination|
+-------+-----+--------+------+-----------+
|1011245|    6|     602|   ABE|        ATL|
|1020600|   -8|     369|   ABE|        DTW|
|1021245|   -2|     602|   ABE|        ATL|
|1020605|   -4|     602|   ABE|        ATL|
|1031245|   -4|     602|   ABE|        ATL|
|1030605|    0|     602|   ABE|        ATL|
|1041243|   10|     602|   ABE|        ATL|
|1040605|   28|     602|   ABE|        ATL|
|1051245|   88|     602|   ABE|        ATL|
|1050605|    9|     602|   ABE|        ATL|
|1061215|   -6|     602|   ABE|        ATL|
|1061725|   69|     602|   ABE|        ATL|
|1061230|    0|     369|   ABE|        DTW|
|1060625|   -3|     602|   ABE|        ATL|
|1070600|    0|     369|   ABE|        DTW|
|1071725|    0|     602|   ABE|        ATL|
|1071230|    0|     369|   ABE|        DTW|
|1070625|    0|     602|   ABE|        ATL|
|1071219|    0|     569|   ABE|        ORD|
|1080600|    0|     369|   ABE| 

In [6]:
# Select all flights that have a distance higher than 1000

spark.sql("""SELECT distance, origin, destination
FROM us_delay_flights_tbl WHERE distance > 1000
ORDER BY distance DESC""").show(10)


+--------+------+-----------+
|distance|origin|destination|
+--------+------+-----------+
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
+--------+------+-----------+
only showing top 10 rows



In [7]:
# we’ll find all flights between San Francisco (SFO) and Chicago 
# (ORD) with at least a two-hour delay

spark.sql("""SELECT date, delay, origin, destination
FROM us_delay_flights_tbl WHERE origin = 'SFO' AND destination = 'ORD' AND delay >= 120
ORDER BY distance DESC""").show(10)

+-------+-----+------+-----------+
|   date|delay|origin|destination|
+-------+-----+------+-----------+
|1011410|  124|   SFO|        ORD|
|1022330|  326|   SFO|        ORD|
|1021410|  190|   SFO|        ORD|
|1101410|  184|   SFO|        ORD|
|1190925|  297|   SFO|        ORD|
|1241110|  139|   SFO|        ORD|
|1301800|  167|   SFO|        ORD|
|1011237|  122|   SFO|        ORD|
|1032258|  163|   SFO|        ORD|
|1031920|  193|   SFO|        ORD|
+-------+-----+------+-----------+
only showing top 10 rows



In [8]:
# A more complicated request using CASE clause in SQL to treat different cases 
spark.sql("""SELECT delay, origin, destination,
 CASE
 WHEN delay > 360 THEN 'Very Long Delays'
 WHEN delay > 120 AND delay < 360 THEN 'Long Delays'
 WHEN delay > 60 AND delay < 120 THEN 'Short Delays'
 WHEN delay > 0 and delay < 60 THEN 'Tolerable Delays'
 WHEN delay = 0 THEN 'No Delays'
 ELSE 'Early'
 END AS Flight_Delays
 FROM us_delay_flights_tbl
 ORDER BY origin, delay DESC""").show(10)

+-----+------+-----------+-------------+
|delay|origin|destination|Flight_Delays|
+-----+------+-----------+-------------+
|  333|   ABE|        ATL|  Long Delays|
|  305|   ABE|        ATL|  Long Delays|
|  275|   ABE|        ATL|  Long Delays|
|  257|   ABE|        ATL|  Long Delays|
|  247|   ABE|        DTW|  Long Delays|
|  247|   ABE|        ATL|  Long Delays|
|  219|   ABE|        ORD|  Long Delays|
|  211|   ABE|        ATL|  Long Delays|
|  197|   ABE|        DTW|  Long Delays|
|  192|   ABE|        ORD|  Long Delays|
+-----+------+-----------+-------------+
only showing top 10 rows



In [10]:
# Create a managed table

# Create a database called learn_spark_db
spark.sql("CREATE DATABASE learn_spark_db")
# Tell Spark that we want to use this dataframe
spark.sql("USE learn_spark_db")

# Create a table
spark.sql(
    """CREATE TABLE managed_us_delay_flights_tbl 
    (date STRING, delay INT,distance INT, origin STRING, destination STRING)""")

DataFrame[]

In [13]:
spark.sql("""FROM managed_us_delay_flights_tbl SELECT *""").show()

+----+-----+--------+------+-----------+
|date|delay|distance|origin|destination|
+----+-----+--------+------+-----------+
+----+-----+--------+------+-----------+



In [17]:
# Create an unmanaged table
spark.sql("""CREATE TABLE us_delay_flights_tbl(date STRING, delay INT,
 distance INT, origin STRING, destination STRING)
 USING csv OPTIONS (PATH
 '/data/departuredelays.csv')""")


DataFrame[]

In [18]:
spark.sql("FROM learn_spark_db.us_delay_flights_tbl SELECT *").show(10)

+----+-----+--------+------+-----------+
|date|delay|distance|origin|destination|
+----+-----+--------+------+-----------+
+----+-----+--------+------+-----------+



In [32]:
# Create views

df.createOrReplaceTempView("us_delay_flights_tbl")

# Views are temporary, they disappear when you end the spark session

spark.sql("""
CREATE OR REPLACE GLOBAL TEMP VIEW us_origin_airport_SFO_global_tmp_view AS
 SELECT date, delay, origin, destination FROM us_delay_flights_tbl WHERE
 origin = 'SFO'
 """)

spark.sql("""
CREATE OR REPLACE TEMP VIEW us_origin_airport_JFK_tmp_view AS
 SELECT date, delay, origin, destination from us_delay_flights_tbl WHERE
 origin = 'JFK'
""")

# We use the prefix global_temp.<view_name> because Spark creates
# global temporary views in a global temporary databse called global_temp

spark.sql("""SELECT * FROM global_temp.us_origin_airport_SFO_global_tmp_view""").show()

#The normal temporary view can be accessed directly
spark.sql("""
SELECT *
FROM us_origin_airport_JFK_tmp_view
""").show()


+-------+-----+------+-----------+
|   date|delay|origin|destination|
+-------+-----+------+-----------+
|1011250|   55|   SFO|        JFK|
|1012230|    0|   SFO|        JFK|
|1010705|   -7|   SFO|        JFK|
|1010620|   -3|   SFO|        MIA|
|1010915|   -3|   SFO|        LAX|
|1011005|   -8|   SFO|        DFW|
|1011800|    0|   SFO|        ORD|
|1011740|   -7|   SFO|        LAX|
|1012015|   -7|   SFO|        LAX|
|1012110|   -1|   SFO|        MIA|
|1011610|  134|   SFO|        DFW|
|1011240|   -6|   SFO|        MIA|
|1010755|   -3|   SFO|        DFW|
|1010020|    0|   SFO|        DFW|
|1010705|   -6|   SFO|        LAX|
|1010925|   -3|   SFO|        ORD|
|1010555|   -6|   SFO|        ORD|
|1011105|   -8|   SFO|        DFW|
|1012330|   32|   SFO|        ORD|
|1011330|    3|   SFO|        DFW|
+-------+-----+------+-----------+
only showing top 20 rows

+-------+-----+------+-----------+
|   date|delay|origin|destination|
+-------+-----+------+-----------+
|1010900|   14|   JFK|       

In [37]:
# Access the metadata
spark.catalog.listColumns("us_delay_flights_tbl")
spark.catalog.listTables()
spark.catalog.listDatabases()

[Database(name='default', description='Default Hive database', locationUri='file:/C:/Users/huawie/Learn%20Pyspark/spark-warehouse'),
 Database(name='learn_spark_db', description='', locationUri='file:/C:/Users/huawie/Learn%20Pyspark/spark-warehouse/learn_spark_db.db')]

In [39]:
spark.range(1, 9).show()

+---+
| id|
+---+
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
+---+



In [43]:
# UDF (user defined functions)
from pyspark.sql.types import LongType

# Create cubed function
def cubed(s):
    return s * s * s

# Register UDF
spark.udf.register("cubed", cubed, LongType())

# Generate temporary view
spark.range(1, 9).createOrReplaceTempView("udf_test")

# Use the UDF on the temporary view
spark.sql("""
FROM udf_test
SELECT id, cubed(id) AS Cubed
""").show()

+---+-----+
| id|Cubed|
+---+-----+
|  1|    1|
|  2|    8|
|  3|   27|
|  4|   64|
|  5|  125|
|  6|  216|
|  7|  343|
|  8|  512|
+---+-----+

