# API Review

## Create the spark session

In [1]:
import pyspark

#starting spark session
spark = pyspark.sql.SparkSession.builder.getOrCreate()

## Create Dataframes

In [2]:
import pandas as pd
import numpy as np

np.random.seed(123)

pd_df = pd.DataFrame(dict(n=np.arange(20), group=np.random.choice(list("abc"), 20)))

pd_df

Unnamed: 0,n,group
0,0,c
1,1,b
2,2,c
3,3,c
4,4,a
5,5,c
6,6,c
7,7,b
8,8,c
9,9,b


#### Convert pd df to a spark dataframe

In [3]:
df = spark.createDataFrame(pd_df)

#must run .show() to see the spark dataframe 
df.show(2)

+---+-----+
|  n|group|
+---+-----+
|  0|    c|
|  1|    b|
+---+-----+
only showing top 2 rows



In [4]:
df.describe().show()

+-------+-----------------+-----+
|summary|                n|group|
+-------+-----------------+-----+
|  count|               20|   20|
|   mean|              9.5| null|
| stddev|5.916079783099616| null|
|    min|                0|    a|
|    max|               19|    c|
+-------+-----------------+-----+



In [5]:
from pydataset import data

#create spark df from pydataset mpg dataset
mpg = spark.createDataFrame(data("mpg"))
mpg.show(2)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 2 rows



## Create Columns

#### This returns a column object:

In [6]:
mpg.hwy

Column<'hwy'>

#### We can use *.select()* to select multiple column objects. 



#### To select the values in the column object, we follow it with *.show()*. 

In [7]:
# select 3 columns and show 2 rows
mpg.select(mpg.hwy, mpg.cty, mpg.model).show(2)

+---+---+-----+
|hwy|cty|model|
+---+---+-----+
| 29| 18|   a4|
| 29| 21|   a4|
+---+---+-----+
only showing top 2 rows



In [8]:
# select 1 column, 
#then select that column and add one to each of the values, 
#return and show both columns. 
mpg.select(mpg.hwy, mpg.hwy + 1).show(2)

+---+---------+
|hwy|(hwy + 1)|
+---+---------+
| 29|       30|
| 29|       30|
+---+---------+
only showing top 2 rows



In [9]:
# select & alias hwy column name
mpg.select(mpg.hwy.alias("highway_mileage")).show(2)

+---------------+
|highway_mileage|
+---------------+
|             29|
|             29|
+---------------+
only showing top 2 rows



In [10]:
# create a var col1 to store the column object of hwy, aliased as highway_mileage
col1 = mpg.hwy.alias("highway_mileage")

# create a var col2 to store the column object of hwy divided by 2, aliased as highway_mileage_halved
col2 = (mpg.hwy/2).alias("highway_mileage_halved")

# select both, referencing the new variables, col1 and col2
mpg.select(col1, col2).show(1)

+---------------+----------------------+
|highway_mileage|highway_mileage_halved|
+---------------+----------------------+
|             29|                  14.5|
+---------------+----------------------+
only showing top 1 row



In [11]:
from pyspark.sql.functions import col, expr

In [12]:
col("hwy")

Column<'hwy'>

In [13]:
# pd_df.col1, pd_df['col1'] --> pandas Series
# mpg.hwy, col("hwy") --> spark

In [14]:
#add 2 cols together and divide by 2 to get avg col
avg_col = (col("hwy") + col("cty")) / 2

#alias all cols
mpg.select(
    col("hwy").alias("highway_mileage"),
    mpg.cty.alias("city_mileage"),
    avg_col.alias("avg_mileage")
).show(2)

+---------------+------------+-----------+
|highway_mileage|city_mileage|avg_mileage|
+---------------+------------+-----------+
|             29|          18|       23.5|
|             29|          21|       25.0|
+---------------+------------+-----------+
only showing top 2 rows



#### Another way to do what we did above, using *.expr() *

In [15]:
mpg.select(
    expr("hwy"),  # the same as `col`
    expr("hwy + 1"),  # an arithmetic expression
    expr("hwy AS highway_mileage"),  # using an alias
    expr("hwy + 1 AS highway_incremented"),  # a combination of the above
).show(5)


+---+---------+---------------+-------------------+
|hwy|(hwy + 1)|highway_mileage|highway_incremented|
+---+---------+---------------+-------------------+
| 29|       30|             29|                 30|
| 29|       30|             29|                 30|
| 31|       32|             31|                 32|
| 30|       31|             30|                 31|
| 26|       27|             26|                 27|
+---+---------+---------------+-------------------+
only showing top 5 rows



#### Bringing together all the different ways to accomplish the same task...select a column & alias it. 

In [16]:
mpg.select(
    mpg.hwy.alias("highway"),
    col("hwy").alias("highway"),
    expr("hwy").alias("highway"),
    expr("hwy AS highway"),
).show(5)

+-------+-------+-------+-------+
|highway|highway|highway|highway|
+-------+-------+-------+-------+
|     29|     29|     29|     29|
|     29|     29|     29|     29|
|     31|     31|     31|     31|
|     30|     30|     30|     30|
|     26|     26|     26|     26|
+-------+-------+-------+-------+
only showing top 5 rows



## Spark SQL

In [17]:
# register the table with spark
mpg.createOrReplaceTempView("mpg")

In [18]:
#run sql query
#allows us to run SQL commands to manipulate and access data
spark.sql(
    """
    SELECT hwy, cty, (hwy + cty) / 2 AS avg
    FROM mpg
    """
).show(2)

+---+---+----+
|hwy|cty| avg|
+---+---+----+
| 29| 18|23.5|
| 29| 21|25.0|
+---+---+----+
only showing top 2 rows



## Type Casting

In [19]:
mpg.dtypes

[('manufacturer', 'string'),
 ('model', 'string'),
 ('displ', 'double'),
 ('year', 'bigint'),
 ('cyl', 'bigint'),
 ('trans', 'string'),
 ('drv', 'string'),
 ('cty', 'bigint'),
 ('hwy', 'bigint'),
 ('fl', 'string'),
 ('class', 'string')]

In [20]:
mpg.printSchema()

root
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- displ: double (nullable = true)
 |-- year: long (nullable = true)
 |-- cyl: long (nullable = true)
 |-- trans: string (nullable = true)
 |-- drv: string (nullable = true)
 |-- cty: long (nullable = true)
 |-- hwy: long (nullable = true)
 |-- fl: string (nullable = true)
 |-- class: string (nullable = true)



In [21]:
#change datatype
#printShema to double check dtype
mpg.select(mpg.hwy.cast("string")).printSchema()


root
 |-- hwy: string (nullable = true)



In [22]:
# shows null because can't be converted. 
mpg.select(mpg.model, mpg.model.cast("int")).show(2)

+-----+-----+
|model|model|
+-----+-----+
|   a4| null|
|   a4| null|
+-----+-----+
only showing top 2 rows



## Built in Functions

In [23]:
# avg and mean are aliases of each other 
from pyspark.sql.functions import concat, sum, avg, min, max, count, mean
# from pyspark.sql.functions import *

In [24]:
mpg.select(
    sum(mpg.hwy) / count(mpg.hwy).alias("average_1"),
    avg(mpg.hwy).alias("average_2"),
    min(mpg.hwy),
    max(mpg.hwy)).show()

+--------------------------------------+-----------------+--------+--------+
|(sum(hwy) / count(hwy) AS `average_1`)|        average_2|min(hwy)|max(hwy)|
+--------------------------------------+-----------------+--------+--------+
|                     23.44017094017094|23.44017094017094|      12|      44|
+--------------------------------------+-----------------+--------+--------+



In [25]:
#concat cols
mpg.select(concat(mpg.manufacturer, mpg.model)).show(5)


+---------------------------+
|concat(manufacturer, model)|
+---------------------------+
|                     audia4|
|                     audia4|
|                     audia4|
|                     audia4|
|                     audia4|
+---------------------------+
only showing top 5 rows



#### The function for string literals: lit

In [26]:
from pyspark.sql.functions import lit


In [27]:
#use lit() for space characters or other word characters
mpg.select(concat(mpg.cyl, lit(" cylinders"))).show(5)

+-----------------------+
|concat(cyl,  cylinders)|
+-----------------------+
|            4 cylinders|
|            4 cylinders|
|            4 cylinders|
|            4 cylinders|
|            6 cylinders|
+-----------------------+
only showing top 5 rows



#### More String Manipulation

In [28]:
from pyspark.sql.functions import regexp_extract, regexp_replace

In [29]:
#creating spark df from pd dataframe
textdf = spark.createDataFrame(
    pd.DataFrame(
        {
            "address": [
                "600 Navarro St ste 600, San Antonio, TX 78205",
                "3130 Broadway St, San Antonio, TX 78209",
                "303 Pearl Pkwy, San Antonio, TX 78215",
                "1255 SW Loop 410, San Antonio, TX 78227",
            ]
        }
    )
)

textdf.show(truncate=False)

+---------------------------------------------+
|address                                      |
+---------------------------------------------+
|600 Navarro St ste 600, San Antonio, TX 78205|
|3130 Broadway St, San Antonio, TX 78209      |
|303 Pearl Pkwy, San Antonio, TX 78215        |
|1255 SW Loop 410, San Antonio, TX 78227      |
+---------------------------------------------+



#### Using regexp_extract - extract at least one capture group and create new column of that. 

In [30]:
textdf.select(
    "address",
    #select address, extract address, capture group start w/ and 1 or more digits, and just return 1st capture group
    regexp_extract("address", r"^(\d+)", 1).alias("street_no"),
    #after start of string, digits, and space, give me everything up until the comma, and return 1st capture group
    regexp_extract("address", r"^\d+\s([\w\s]+?),", 1).alias("street"),
).show(truncate=False)

+---------------------------------------------+---------+------------------+
|address                                      |street_no|street            |
+---------------------------------------------+---------+------------------+
|600 Navarro St ste 600, San Antonio, TX 78205|600      |Navarro St ste 600|
|3130 Broadway St, San Antonio, TX 78209      |3130     |Broadway St       |
|303 Pearl Pkwy, San Antonio, TX 78215        |303      |Pearl Pkwy        |
|1255 SW Loop 410, San Antonio, TX 78227      |1255     |SW Loop 410       |
+---------------------------------------------+---------+------------------+



#### regexp_replace lets us make substitutions based on a regular expression.

In [31]:
textdf.select(
    "address",
    #replaces address col, from the start, everything up until the comma and space
    regexp_replace("address", r"^.*?,\s*", "").alias("city_state_zip"),
).show(truncate=False)

+---------------------------------------------+---------------------+
|address                                      |city_state_zip       |
+---------------------------------------------+---------------------+
|600 Navarro St ste 600, San Antonio, TX 78205|San Antonio, TX 78205|
|3130 Broadway St, San Antonio, TX 78209      |San Antonio, TX 78209|
|303 Pearl Pkwy, San Antonio, TX 78215        |San Antonio, TX 78215|
|1255 SW Loop 410, San Antonio, TX 78227      |San Antonio, TX 78227|
+---------------------------------------------+---------------------+




## Filtering with .filter and .where

In [32]:
#class is a reserved word (so has to be called in brackets)
mpg.filter(mpg.cyl == 4).where(mpg["class"] == "subcompact").show()

+------------+-----------+-----+----+---+----------+---+---+---+---+----------+
|manufacturer|      model|displ|year|cyl|     trans|drv|cty|hwy| fl|     class|
+------------+-----------+-----+----+---+----------+---+---+---+---+----------+
|       honda|      civic|  1.6|1999|  4|manual(m5)|  f| 28| 33|  r|subcompact|
|       honda|      civic|  1.6|1999|  4|  auto(l4)|  f| 24| 32|  r|subcompact|
|       honda|      civic|  1.6|1999|  4|manual(m5)|  f| 25| 32|  r|subcompact|
|       honda|      civic|  1.6|1999|  4|manual(m5)|  f| 23| 29|  p|subcompact|
|       honda|      civic|  1.6|1999|  4|  auto(l4)|  f| 24| 32|  r|subcompact|
|       honda|      civic|  1.8|2008|  4|manual(m5)|  f| 26| 34|  r|subcompact|
|       honda|      civic|  1.8|2008|  4|  auto(l5)|  f| 25| 36|  r|subcompact|
|       honda|      civic|  1.8|2008|  4|  auto(l5)|  f| 24| 36|  c|subcompact|
|       honda|      civic|  2.0|2008|  4|manual(m6)|  f| 21| 29|  p|subcompact|
|     hyundai|    tiburon|  2.0|1999|  4

## Conditionals with When and Otherwise

In [33]:
from pyspark.sql.functions import when

In [34]:
#select display when if less than 2, assign it to small,
#when less than 3, assign it to medium, 
#otherwise, give it large
#and then alias all the when/otherwise functions.
mpg.select(
    mpg.displ,
    (
        when(mpg.displ < 2, "small")
        .when(mpg.displ < 3, "medium")
        .otherwise("large")
        .alias("engine_size")
    ),
).show(10)

+-----+-----------+
|displ|engine_size|
+-----+-----------+
|  1.8|      small|
|  1.8|      small|
|  2.0|     medium|
|  2.0|     medium|
|  2.8|     medium|
|  2.8|     medium|
|  3.1|      large|
|  1.8|      small|
|  1.8|      small|
|  2.0|     medium|
+-----+-----------+
only showing top 10 rows



## Sorting & Ordering 

In [35]:
#sorts ascending by default
mpg.sort(mpg.hwy).show(8)

+------------+-------------------+-----+----+---+----------+---+---+---+---+------+
|manufacturer|              model|displ|year|cyl|     trans|drv|cty|hwy| fl| class|
+------------+-------------------+-----+----+---+----------+---+---+---+---+------+
|        jeep| grand cherokee 4wd|  4.7|2008|  8|  auto(l5)|  4|  9| 12|  e|   suv|
|       dodge|ram 1500 pickup 4wd|  4.7|2008|  8|  auto(l5)|  4|  9| 12|  e|pickup|
|       dodge|ram 1500 pickup 4wd|  4.7|2008|  8|manual(m6)|  4|  9| 12|  e|pickup|
|       dodge|        durango 4wd|  4.7|2008|  8|  auto(l5)|  4|  9| 12|  e|   suv|
|       dodge|  dakota pickup 4wd|  4.7|2008|  8|  auto(l5)|  4|  9| 12|  e|pickup|
|   chevrolet|    k1500 tahoe 4wd|  5.3|2008|  8|  auto(l4)|  4| 11| 14|  e|   suv|
|        jeep| grand cherokee 4wd|  6.1|2008|  8|  auto(l5)|  4| 11| 14|  p|   suv|
|       dodge|        durango 4wd|  5.9|1999|  8|  auto(l4)|  4| 11| 15|  r|   suv|
+------------+-------------------+-----+----+---+----------+---+---+---+---+

In [36]:
from pyspark.sql.functions import asc, desc

In [37]:
mpg.sort(mpg.hwy.desc())
# is the same as
mpg.sort(col("hwy").desc())
# is the same as
mpg.sort(desc("hwy")).show(5)

+------------+----------+-----+----+---+----------+---+---+---+---+----------+
|manufacturer|     model|displ|year|cyl|     trans|drv|cty|hwy| fl|     class|
+------------+----------+-----+----+---+----------+---+---+---+---+----------+
|  volkswagen|     jetta|  1.9|1999|  4|manual(m5)|  f| 33| 44|  d|   compact|
|  volkswagen|new beetle|  1.9|1999|  4|manual(m5)|  f| 35| 44|  d|subcompact|
|  volkswagen|new beetle|  1.9|1999|  4|  auto(l4)|  f| 29| 41|  d|subcompact|
|      toyota|   corolla|  1.8|2008|  4|manual(m5)|  f| 28| 37|  r|   compact|
|       honda|     civic|  1.8|2008|  4|  auto(l5)|  f| 24| 36|  c|subcompact|
+------------+----------+-----+----+---+----------+---+---+---+---+----------+
only showing top 5 rows



In [38]:
#sort multiple cols either asc or desc
mpg.sort(desc("class"), mpg.cyl.asc(), col("hwy").desc()).show()

+------------+------------------+-----+----+---+----------+---+---+---+---+-----+
|manufacturer|             model|displ|year|cyl|     trans|drv|cty|hwy| fl|class|
+------------+------------------+-----+----+---+----------+---+---+---+---+-----+
|      subaru|      forester awd|  2.5|2008|  4|manual(m5)|  4| 20| 27|  r|  suv|
|      subaru|      forester awd|  2.5|2008|  4|  auto(l4)|  4| 20| 26|  r|  suv|
|      subaru|      forester awd|  2.5|1999|  4|manual(m5)|  4| 18| 25|  r|  suv|
|      subaru|      forester awd|  2.5|2008|  4|manual(m5)|  4| 19| 25|  p|  suv|
|      subaru|      forester awd|  2.5|1999|  4|  auto(l4)|  4| 18| 24|  r|  suv|
|      subaru|      forester awd|  2.5|2008|  4|  auto(l4)|  4| 18| 23|  p|  suv|
|      toyota|       4runner 4wd|  2.7|1999|  4|manual(m5)|  4| 15| 20|  r|  suv|
|      toyota|       4runner 4wd|  2.7|1999|  4|  auto(l4)|  4| 16| 20|  r|  suv|
|        jeep|grand cherokee 4wd|  3.0|2008|  6|  auto(l5)|  4| 17| 22|  d|  suv|
|      toyota|  

## Grouping & Aggregating

In [39]:
#different ways to group by columns
mpg.groupBy(mpg.cyl)
mpg.groupBy(col("cyl"))
mpg.groupBy("cyl")

<pyspark.sql.group.GroupedData at 0x7fa16078d9a0>

In [40]:
#aggregate after groupby
mpg.groupBy(mpg.cyl).agg(avg(mpg.cty), avg(mpg.hwy)).show()

+---+------------------+-----------------+
|cyl|          avg(cty)|         avg(hwy)|
+---+------------------+-----------------+
|  6| 16.21518987341772|22.82278481012658|
|  5|              20.5|            28.75|
|  8|12.571428571428571|17.62857142857143|
|  4|21.012345679012345|28.80246913580247|
+---+------------------+-----------------+



In [41]:
#groupby multiple cols
mpg.groupBy("cyl", "class").agg(avg(mpg.cty), avg(mpg.hwy)).show()

+---+----------+------------------+------------------+
|cyl|     class|          avg(cty)|          avg(hwy)|
+---+----------+------------------+------------------+
|  5|   compact|              21.0|              29.0|
|  5|subcompact|              20.0|              28.5|
|  6|subcompact|              17.0|24.714285714285715|
|  6|    pickup|              14.5|              17.9|
|  4|subcompact|22.857142857142858| 30.80952380952381|
|  8|       suv|12.131578947368421|16.789473684210527|
|  8|    pickup|              11.8|              15.8|
|  8|   midsize|              16.0|              24.0|
|  4|   midsize|              20.5|           29.1875|
|  8|   2seater|              15.4|              24.8|
|  6|   compact|16.923076923076923|25.307692307692307|
|  6|   minivan|              15.6|              22.2|
|  4|   compact|            21.375|          29.46875|
|  8|subcompact|              14.8|              21.6|
|  6|   midsize|17.782608695652176| 26.26086956521739|
|  4|   mi

#### Rollup will do the same aggregations, but also include overall totals. 

#### Here the null value in cyl indicates the total count.

In [42]:
mpg.rollup("cyl").count().sort("cyl").show()

+----+-----+
| cyl|count|
+----+-----+
|null|  234|
|   4|   81|
|   5|    4|
|   6|   79|
|   8|   70|
+----+-----+



#### Here the null value in cyl indicates the total count.

In [43]:
mpg.rollup("cyl").agg(expr("avg(hwy)")).sort("cyl").show()

+----+-----------------+
| cyl|         avg(hwy)|
+----+-----------------+
|null|23.44017094017094|
|   4|28.80246913580247|
|   5|            28.75|
|   6|22.82278481012658|
|   8|17.62857142857143|
+----+-----------------+



#### Here the null values in cyl and class is showing overall average.
#### Then shows overall average for 4 cyl.
#### Next null is for 5 cyl's overall average, then overall avg for 6 cyl,  then overall avg for 8 cyl

In [44]:
#rollup w/ multiple columns, take mean hwy, and sort cyl and class
mpg.rollup("cyl", "class").mean("hwy").sort(col("cyl"), col("class")).show()

+----+----------+------------------+
| cyl|     class|          avg(hwy)|
+----+----------+------------------+
|null|      null| 23.44017094017094|
|   4|      null| 28.80246913580247|
|   4|   compact|          29.46875|
|   4|   midsize|           29.1875|
|   4|   minivan|              24.0|
|   4|    pickup|20.666666666666668|
|   4|subcompact| 30.80952380952381|
|   4|       suv|             23.75|
|   5|      null|             28.75|
|   5|   compact|              29.0|
|   5|subcompact|              28.5|
|   6|      null| 22.82278481012658|
|   6|   compact|25.307692307692307|
|   6|   midsize| 26.26086956521739|
|   6|   minivan|              22.2|
|   6|    pickup|              17.9|
|   6|subcompact|24.714285714285715|
|   6|       suv|              18.5|
|   8|      null| 17.62857142857143|
|   8|   2seater|              24.8|
+----+----------+------------------+
only showing top 20 rows




## Crosstables & Pivot Tables

#### Crosstab is a simple way to get counts. 

In [45]:
#crosstab cyl by class
mpg.crosstab("class", "cyl").show()

+----------+---+---+---+---+
| class_cyl|  4|  5|  6|  8|
+----------+---+---+---+---+
|   midsize| 16|  0| 23|  2|
|subcompact| 21|  2|  7|  5|
|   2seater|  0|  0|  0|  5|
|    pickup|  3|  0| 10| 20|
|   minivan|  1|  0| 10|  0|
|       suv|  8|  0| 16| 38|
|   compact| 32|  2| 13|  0|
+----------+---+---+---+---+



#### We can use pivot to compute different aggregations than count. 

In [46]:
#groupby class (to get the row), and pivot on cyl (to get cols)
#and agg method to get values
mpg.groupby("class").pivot("cyl").mean("hwy").show()

+----------+------------------+----+------------------+------------------+
|     class|                 4|   5|                 6|                 8|
+----------+------------------+----+------------------+------------------+
|subcompact| 30.80952380952381|28.5|24.714285714285715|              21.6|
|   compact|          29.46875|29.0|25.307692307692307|              null|
|   minivan|              24.0|null|              22.2|              null|
|       suv|             23.75|null|              18.5|16.789473684210527|
|   midsize|           29.1875|null| 26.26086956521739|              24.0|
|    pickup|20.666666666666668|null|              17.9|              15.8|
|   2seater|              null|null|              null|              24.8|
+----------+------------------+----+------------------+------------------+



## Missing Values

In [47]:
#creating df with missing values
#wrap spark.createDataFrame around creation of pandas df to turn it into a spark df
df = spark.createDataFrame(
    pd.DataFrame(
        {"x": [1, 2, np.nan, 4, 5, np.nan], "y": [np.nan, 0, 0, 3, 1, np.nan]}
    )
)
df.show()

+---+---+
|  x|  y|
+---+---+
|1.0|NaN|
|2.0|0.0|
|NaN|0.0|
|4.0|3.0|
|5.0|1.0|
|NaN|NaN|
+---+---+



In [48]:
#drops across all nulls
df.na.drop().show()

+---+---+
|  x|  y|
+---+---+
|2.0|0.0|
|4.0|3.0|
|5.0|1.0|
+---+---+



In [49]:
#fill nulls w/ 0
df.na.fill(0).show()

+---+---+
|  x|  y|
+---+---+
|1.0|0.0|
|2.0|0.0|
|0.0|0.0|
|4.0|3.0|
|5.0|1.0|
|0.0|0.0|
+---+---+



In [50]:
#select subset to fill w/ 0
#leaves y cols w/ NaNs
df.na.fill(0, subset="x").show()

+---+---+
|  x|  y|
+---+---+
|1.0|NaN|
|2.0|0.0|
|0.0|0.0|
|4.0|3.0|
|5.0|1.0|
|0.0|NaN|
+---+---+



In [51]:
#drop a full subset (dropping only y nulls)
#drops 1st and 6th row, but left rows where x was NaN
df.na.drop(subset="y").show()

+---+---+
|  x|  y|
+---+---+
|2.0|0.0|
|NaN|0.0|
|4.0|3.0|
|5.0|1.0|
+---+---+



## Transformations of Dataframes

In [52]:
# how is spark thinking about our df? 
mpg.explain()


== Physical Plan ==
*(1) Scan ExistingRDD[manufacturer#146,model#147,displ#148,year#149L,cyl#150L,trans#151,drv#152,cty#153L,hwy#154L,fl#155,class#156]




Only a single step above ^

This one below shows another step after "Scan ExistingRDD", a "Project" that contains the names of the columns we are looking for.

In [53]:
mpg.select(mpg.cyl, mpg.hwy).explain()


== Physical Plan ==
*(1) Project [cyl#150L, hwy#154L]
+- *(1) Scan ExistingRDD[manufacturer#146,model#147,displ#148,year#149L,cyl#150L,trans#151,drv#152,cty#153L,hwy#154L,fl#155,class#156]




And now we are going to do a more advanced select calcluation, but this is still just a single step. 

In [54]:
mpg.select(((mpg.cyl + mpg.hwy) / 2).alias("avg_mpg")).explain()


== Physical Plan ==
*(1) Project [(cast((cyl#150L + hwy#154L) as double) / 2.0) AS avg_mpg#1490]
+- *(1) Scan ExistingRDD[manufacturer#146,model#147,displ#148,year#149L,cyl#150L,trans#151,drv#152,cty#153L,hwy#154L,fl#155,class#156]




Notice that our filter below is also a single step.



In [55]:
mpg.filter(mpg.cyl == 6).explain()

== Physical Plan ==
*(1) Filter (isnotnull(cyl#150L) AND (cyl#150L = 6))
+- *(1) Scan ExistingRDD[manufacturer#146,model#147,displ#148,year#149L,cyl#150L,trans#151,drv#152,cty#153L,hwy#154L,fl#155,class#156]




In [56]:
#adds a step
mpg.select("cyl", "hwy").filter(expr("cyl = 6")).explain()

== Physical Plan ==
*(1) Project [cyl#150L, hwy#154L]
+- *(1) Filter (isnotnull(cyl#150L) AND (cyl#150L = 6))
   +- *(1) Scan ExistingRDD[manufacturer#146,model#147,displ#148,year#149L,cyl#150L,trans#151,drv#152,cty#153L,hwy#154L,fl#155,class#156]




In [57]:
#order of plan does not change, b/c spark does it in the most optimal way
mpg.filter(expr("cyl = 6")).select("cyl", "hwy").explain()

== Physical Plan ==
*(1) Project [cyl#150L, hwy#154L]
+- *(1) Filter (isnotnull(cyl#150L) AND (cyl#150L = 6))
   +- *(1) Scan ExistingRDD[manufacturer#146,model#147,displ#148,year#149L,cyl#150L,trans#151,drv#152,cty#153L,hwy#154L,fl#155,class#156]




## More DF Manipulations

For these examples, we'll be working with a dataset of observations of the weather in seattle.



In [58]:
from vega_datasets import data

weather = data.seattle_weather().assign(date=lambda df: df.date.astype(str))
weather = spark.createDataFrame(weather)
weather.show(6)

+----------+-------------+--------+--------+----+-------+
|      date|precipitation|temp_max|temp_min|wind|weather|
+----------+-------------+--------+--------+----+-------+
|2012-01-01|          0.0|    12.8|     5.0| 4.7|drizzle|
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|
|2012-01-05|          1.3|     8.9|     2.8| 6.1|   rain|
|2012-01-06|          2.5|     4.4|     2.2| 2.2|   rain|
+----------+-------------+--------+--------+----+-------+
only showing top 6 rows



In [59]:
# print number of rows & columns
print(weather.count(), "rows", len(weather.columns), "columns")


1461 rows 6 columns


In [60]:
# get the date range of the dataset and assign values to min_date and max_date variables
#.first() to get first item
min_date, max_date = weather.select(min("date"), max("date")).first()
min_date, max_date

('2012-01-01', '2015-12-31')

In [61]:
#w/out .first(), just returns df w/ values
weather.select(min("date"), max("date")).show()

+----------+----------+
| min(date)| max(date)|
+----------+----------+
|2012-01-01|2015-12-31|
+----------+----------+



In [62]:
# compute temp average 
#withColumn ---> creating this new col from the expr
#and then drop the old cols
weather = weather.withColumn(
    "temp_avg", expr("ROUND(temp_min + temp_max) / 2")
).drop("temp_max", "temp_min")

weather.show(6)

+----------+-------------+----+-------+--------+
|      date|precipitation|wind|weather|temp_avg|
+----------+-------------+----+-------+--------+
|2012-01-01|          0.0| 4.7|drizzle|     9.0|
|2012-01-02|         10.9| 4.5|   rain|     6.5|
|2012-01-03|          0.8| 2.3|   rain|     9.5|
|2012-01-04|         20.3| 4.7|   rain|     9.0|
|2012-01-05|          1.3| 6.1|   rain|     6.0|
|2012-01-06|          2.5| 2.2|   rain|     3.5|
+----------+-------------+----+-------+--------+
only showing top 6 rows



#### Calculate total rainfall

In [63]:
from pyspark.sql.functions import month, year, quarter


In [64]:
#new col: month, creating from month of date col
#once created, then group by month, and aggregate by summing the precipitation and alias it
#then sort by month ascending (default)
(
    weather.withColumn("month", month("date"))
    .groupBy("month")
    .agg(sum("precipitation").alias("total_rainfall"))
    .sort("month")
    .show()
)

+-----+------------------+
|month|    total_rainfall|
+-----+------------------+
|    1|465.99999999999994|
|    2|             422.0|
|    3|             606.2|
|    4|             375.4|
|    5|             207.5|
|    6|             132.9|
|    7|              48.2|
|    8|             163.7|
|    9|235.49999999999997|
|   10|             503.4|
|   11|             642.5|
|   12|             622.7|
+-----+------------------+



#### Let's now take a look at the average temperature for each type of weather in December 2013:

In [65]:
#filter month and year, group by weather and get mean of temp_avg
(
    weather.filter(month("date") == 12)
    .filter(year("date") == 2013)
    .groupBy("weather")
    .agg(mean("temp_avg"))
    .show()
)

+-------+-----------------+
|weather|    avg(temp_avg)|
+-------+-----------------+
|    fog|7.555555555555555|
|    sun|2.977272727272727|
+-------+-----------------+



#### Let's now find out how many days had freezing temperatures in each month of 2013.

In [66]:
#filter year to 2013
#create freezing_temps col and it's going to be a bool of whether temp_avg is <=0, casted as int(so it'll be 1 or 0)
#create month col
#group by month col and sum freezing_temps and alias it
(
    weather.filter(year("date") == 2013)
    .withColumn("freezing_temps", (weather.temp_avg <= 0).cast("int"))
    .withColumn("month", month("date"))
    .groupBy("month")
    .agg(sum("freezing_temps").alias("no_of_days_with_freezing_temps"))
    .sort("month")
    .show()
)

+-----+------------------------------+
|month|no_of_days_with_freezing_temps|
+-----+------------------------------+
|    1|                             3|
|    2|                             0|
|    3|                             0|
|    4|                             0|
|    5|                             0|
|    6|                             0|
|    7|                             0|
|    8|                             0|
|    9|                             0|
|   10|                             0|
|   11|                             0|
|   12|                             5|
+-----+------------------------------+



In [68]:
#breakdown
(
    weather.filter(year("date") == 2013)
    .withColumn("freezing_temps", (weather.temp_avg <= 0).cast("int"))
    #.withColumn("month", month("date"))
    #.groupBy("month")
    #.agg(sum("freezing_temps").alias("no_of_days_with_freezing_temps"))
    #.sort("month")
    .show())

+----------+-------------+----+-------+--------+--------------+
|      date|precipitation|wind|weather|temp_avg|freezing_temps|
+----------+-------------+----+-------+--------+--------------+
|2013-01-01|          0.0| 2.7|    sun|     1.0|             0|
|2013-01-02|          0.0| 3.2|    sun|     2.5|             0|
|2013-01-03|          4.1| 3.0|   rain|     2.5|             0|
|2013-01-04|          2.5| 2.8|   rain|     6.0|             0|
|2013-01-05|          3.0| 3.1|   rain|     5.5|             0|
|2013-01-06|          2.0| 3.0|   rain|     5.0|             0|
|2013-01-07|          2.3| 7.3|   rain|     7.0|             0|
|2013-01-08|         16.3| 6.3|   rain|     8.5|             0|
|2013-01-09|         38.4| 5.1|   rain|     6.0|             0|
|2013-01-10|          0.3| 2.1|   snow|     1.5|             0|
|2013-01-11|          0.0| 1.9|drizzle|     0.0|             1|
|2013-01-12|          0.0| 2.0|    sun|    -0.5|             1|
|2013-01-13|          0.0| 1.5|    sun| 

#### One last example, let's calculate the average temperature for each quarter of each year:



In [69]:
#create new col quarter
#create new col year
#group by year and quarter and get mean temp avg and sort by both
(
    weather.withColumn("quarter", quarter("date"))
    .withColumn("year", year("date"))
    .groupBy("year", "quarter")
    .agg(mean("temp_avg").alias("temp_avg"))
    .sort("year", "quarter")
    .show()
)



+----+-------+------------------+
|year|quarter|          temp_avg|
+----+-------+------------------+
|2012|      1| 5.587912087912088|
|2012|      2|12.675824175824175|
|2012|      3|            18.375|
|2012|      4| 8.581521739130435|
|2013|      1| 6.405555555555556|
|2013|      2|14.505494505494505|
|2013|      3| 19.47826086956522|
|2013|      4| 8.032608695652174|
|2014|      1| 7.205555555555556|
|2014|      2|14.296703296703297|
|2014|      3|19.858695652173914|
|2014|      4|  9.88586956521739|
|2015|      1| 8.972222222222221|
|2015|      2|15.258241758241759|
|2015|      3|19.407608695652176|
|2015|      4| 8.956521739130435|
+----+-------+------------------+



#### We could use a pivot table instead: 

In [70]:
(
    weather.withColumn("quarter", quarter("date")) #create quarter
    .withColumn("year", year("date")) #create year
    .groupBy("year") #rows
    .pivot("quarter") #cols
    .agg(expr("ROUND(MEAN(temp_avg), 2) AS temp_avg")) #values avg of temp_avg, rounding to 2 decimals, and alias it
    .sort("year")
    .show()
)



+----+----+-----+-----+----+
|year|   1|    2|    3|   4|
+----+----+-----+-----+----+
|2012|5.59|12.68|18.38|8.58|
|2013|6.41|14.51|19.48|8.03|
|2014|7.21| 14.3|19.86|9.89|
|2015|8.97|15.26|19.41|8.96|
+----+----+-----+-----+----+



In [72]:
#or change rows to quarter and cols to year instead
(
    weather.withColumn("quarter", quarter("date")) #create quarter
    .withColumn("year", year("date")) #create year
    .groupBy("quarter") #rows
    .pivot("year") #cols
    .agg(expr("ROUND(MEAN(temp_avg), 2) AS temp_avg")) #values avg of temp_avg, rounding to 2 decimals, and alias it
    .sort("quarter")
    .show()
)



+-------+-----+-----+-----+-----+
|quarter| 2012| 2013| 2014| 2015|
+-------+-----+-----+-----+-----+
|      1| 5.59| 6.41| 7.21| 8.97|
|      2|12.68|14.51| 14.3|15.26|
|      3|18.38|19.48|19.86|19.41|
|      4| 8.58| 8.03| 9.89| 8.96|
+-------+-----+-----+-----+-----+



## Joins

We'll start by creating some data that we can join together:



In [73]:
users = spark.createDataFrame(
    pd.DataFrame(
        {
            "id": [1, 2, 3, 4, 5, 6],
            "name": ["bob", "joe", "sally", "adam", "jane", "mike"],
            "role_id": [1, 2, 3, 3, np.nan, np.nan],
        }
    )
)
roles = spark.createDataFrame(
    pd.DataFrame(
        {
            "id": [1, 2, 3, 4],
            "name": ["admin", "author", "reviewer", "commenter"],
        }
    )
)
print("--- users ---")
users.show()
print("--- roles ---")
roles.show()

--- users ---
+---+-----+-------+
| id| name|role_id|
+---+-----+-------+
|  1|  bob|    1.0|
|  2|  joe|    2.0|
|  3|sally|    3.0|
|  4| adam|    3.0|
|  5| jane|    NaN|
|  6| mike|    NaN|
+---+-----+-------+

--- roles ---
+---+---------+
| id|     name|
+---+---------+
|  1|    admin|
|  2|   author|
|  3| reviewer|
|  4|commenter|
+---+---------+



To join two dataframes together, we'll need to call the .join method on one of them and supply the other as an argument. 


In addition, we'll need to supply the condition on which we are joining. 



In our case, we are joining where the role_id column on the users table is equal to the id column on the roles table.

In [74]:
#inner join - kept values that existed on both tables
users.join(roles, on=users.role_id == roles.id).show()

+---+-----+-------+---+--------+
| id| name|role_id| id|    name|
+---+-----+-------+---+--------+
|  1|  bob|    1.0|  1|   admin|
|  3|sally|    3.0|  3|reviewer|
|  4| adam|    3.0|  3|reviewer|
|  2|  joe|    2.0|  2|  author|
+---+-----+-------+---+--------+



By default, spark will perform an inner join, meaning that records from both dataframes will have a match with the other.


We can also specify either a left or a right join, which will keep all of the records from either the left or right side, even if those records don't have a match with the other dataframe.

In [75]:
#include left table- users
users.join(roles, on=users.role_id == roles.id, how="left").show()

+---+-----+-------+----+--------+
| id| name|role_id|  id|    name|
+---+-----+-------+----+--------+
|  5| jane|    NaN|null|    null|
|  6| mike|    NaN|null|    null|
|  1|  bob|    1.0|   1|   admin|
|  3|sally|    3.0|   3|reviewer|
|  4| adam|    3.0|   3|reviewer|
|  2|  joe|    2.0|   2|  author|
+---+-----+-------+----+--------+



In [76]:
#includes right table- roles
users.join(roles, on=users.role_id == roles.id, how="right").show()

+----+-----+-------+---+---------+
|  id| name|role_id| id|     name|
+----+-----+-------+---+---------+
|   1|  bob|    1.0|  1|    admin|
|null| null|   null|  4|commenter|
|   3|sally|    3.0|  3| reviewer|
|   4| adam|    3.0|  3| reviewer|
|   2|  joe|    2.0|  2|   author|
+----+-----+-------+---+---------+



Notice that examples above have a duplicate id column. There are several ways we could go about dealing with this:

alias each dataframe + explicitly select columns after joining (this could also be implemented with spark SQL)
rename duplicated columns before merging
drop duplicated columns after the merge (.drop(right.id))



# Wrangling

In this lesson, we will acquire and prepare the data we will use in the rest of this module.

- Acquiring Data
- Data Prep
- Train Test Split

In [77]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import * #import everything

spark = SparkSession.builder.getOrCreate()

## Acquisition

Spark lets us read data in from a variety of data sources using what it calls a DataFrameReader. We can access the read property of our spark object and then set various options and read from a data source.

### Using Data Schemas

In [78]:
df = spark.read.csv("source.csv", sep=",", header=True, inferSchema=True)

In [79]:
#another way to do above:
(
    spark.read.format("csv")
    .option("sep", ",")
    .option("inferSchema", True)
    .option("header", True)
    .load("source.csv")
)

DataFrame[source_id: string, source_username: string]

In [80]:
df.show()

+---------+--------------------+
|source_id|     source_username|
+---------+--------------------+
|   100137|    Merlene Blodgett|
|   103582|         Carmen Cura|
|   106463|     Richard Sanchez|
|   119403|      Betty De Hoyos|
|   119555|      Socorro Quiara|
|   119868| Michelle San Miguel|
|   120752|      Eva T. Kleiber|
|   124405|           Lori Lara|
|   132408|       Leonard Silva|
|   135723|        Amy Cardenas|
|   136202|    Michelle Urrutia|
|   136979|      Leticia Garcia|
|   137943|    Pamela K. Baccus|
|   138605|        Marisa Ozuna|
|   138650|      Kimberly Green|
|   138650|Kimberly Green-Woods|
|   138793| Guadalupe Rodriguez|
|   138810|       Tawona Martin|
|   139342|     Jessica Mendoza|
|   139344|        Isis Mendoza|
+---------+--------------------+
only showing top 20 rows



In [81]:
df.printSchema()

root
 |-- source_id: string (nullable = true)
 |-- source_username: string (nullable = true)



#### Different datatypes
- StringType
- DoubleType
- IntegerType
- LongType
- ShortType
- TimestampType
- FloatType
- DateType

In [82]:
from pyspark.sql.types import StructType, StructField, StringType

#importing above, means we can define our schema and set datatypes
#lets us be sure about the structure of our data, and can significantly increase the speed of loading data 
#(inferring the schema can be a costly operation for large datasets).
schema = StructType(
    [
        StructField("source_id", StringType()),
        StructField("source_username", StringType()),
        
    ]

)

#read csv with schema variable from above
spark.read.csv("source.csv", header=True, schema=schema)

DataFrame[source_id: string, source_username: string]

### Writing Data
A spark dataframe can be written to a local destination using the `.write` property. Several common output formats are:

- `csv`: for writing to a local csv file(s)
- `parquet`: Parquet is a very popular columnar storage format for Hadoop.
- `json`: for writing to a local json file(s)
- `jdbc`: for writing to a SQL database table

In [83]:
from pydataset import data

mpg = spark.createDataFrame(data("mpg"))

# write to json, mode=overwrite, in case it's been written there before
mpg.write.json("mpg_json", mode="overwrite")

In [85]:
# write to csv
mpg.write.csv("mpg_csv", mode="overwrite")

In summary: 

In [86]:
cases = spark.read.csv("case.csv", header=True, inferSchema=True)

mpg.write.csv("mpg_csv", mode="overwrite")

## Data Prep

In [87]:
#take a look at data, clean it up by adding vertical=True
cases.show(5, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 SLA_due_date         | 9/26/20 0:42         
 case_late            | NO                   
 num_days_late        | -998.5087616000001   
 case_closed          | YES                  
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 1/1/18 0:46          
 case_closed_date     | 1/3/18 8:11          
 SLA_due_date         | 1/5/18 8:30          
 case_late            | NO                   
 num_days_late        | -2.0126041

### Column Renaming

In [88]:
#withColumnRenamed(old_col_to_rename, new_col_renames_old_col)
cases = cases.withColumnRenamed("SLA_due_date", "case_due_date")

#hasn't performed the action yet, just reassigned

### Data Types

In [89]:
#look at current dtypes
cases.printSchema()

#case_closed and case_late col are currently strings, let's change dtype to bool

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: string (nullable = true)
 |-- case_closed_date: string (nullable = true)
 |-- case_due_date: string (nullable = true)
 |-- case_late: string (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: string (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: integer (nullable = true)



In [92]:
#recreate case_closed col (same as og col)
#expr case_closed == YES, returns true or false
#do the same for case_late col
cases = cases.withColumn("case_closed", expr('case_closed=="YES"'))\
            .withColumn("case_late", expr('case_late=="YES"'))

cases.select("case_closed", "case_late").show(5)

+-----------+---------+
|case_closed|case_late|
+-----------+---------+
|       true|    false|
|       true|    false|
|       true|    false|
|       true|    false|
|       true|     true|
+-----------+---------+
only showing top 5 rows



In [93]:
#double check dtype change to bool
cases.printSchema()

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: string (nullable = true)
 |-- case_closed_date: string (nullable = true)
 |-- case_due_date: string (nullable = true)
 |-- case_late: boolean (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: boolean (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: integer (nullable = true)



In [94]:
#council district shows up as int, but it is actually a category, so needs to be a string
cases.groupBy("council_district").count().show()

+----------------+------+
|council_district| count|
+----------------+------+
|               1|119309|
|               6| 74095|
|               3|102706|
|               5|114609|
|               9| 40916|
|               4| 93778|
|               8| 42345|
|               7| 72445|
|              10| 62926|
|               2|114745|
|               0|  3830|
+----------------+------+



In [95]:
#withColumn overwrites og col if the same col name is used
#if withColumn has new name, then it will create a whole new col
cases = cases.withColumn('council_district', col('council_district').cast('string'))

cases.printSchema()

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: string (nullable = true)
 |-- case_closed_date: string (nullable = true)
 |-- case_due_date: string (nullable = true)
 |-- case_late: boolean (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: boolean (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: string (nullable = true)



In [96]:
#date cols are string dtypes, need to be converted to date dtypes
cases.select('case_opened_date', 'case_closed_date', 'case_due_date').show(5)

+----------------+----------------+-------------+
|case_opened_date|case_closed_date|case_due_date|
+----------------+----------------+-------------+
|     1/1/18 0:42|    1/1/18 12:29| 9/26/20 0:42|
|     1/1/18 0:46|     1/3/18 8:11|  1/5/18 8:30|
|     1/1/18 0:48|     1/2/18 7:57|  1/5/18 8:30|
|     1/1/18 1:29|     1/2/18 8:13| 1/17/18 8:30|
|     1/1/18 1:34|    1/1/18 13:29|  1/1/18 4:34|
+----------------+----------------+-------------+
only showing top 5 rows



In [97]:
#identify current format they are in from above
fmt = 'M/d/yy H:mm'

#recreate each col and convert to timestampe using current fmt that it is in
cases = (
    cases.withColumn('case_opened_date', to_timestamp('case_opened_date', fmt))
    .withColumn('case_closed_date', to_timestamp('case_closed_date', fmt))
    .withColumn('case_due_date', to_timestamp('case_due_date', fmt))
)

#take a look at it
cases.select('case_opened_date', 'case_closed_date', 'case_due_date').show(5)

+-------------------+-------------------+-------------------+
|   case_opened_date|   case_closed_date|      case_due_date|
+-------------------+-------------------+-------------------+
|2018-01-01 00:42:00|2018-01-01 12:29:00|2020-09-26 00:42:00|
|2018-01-01 00:46:00|2018-01-03 08:11:00|2018-01-05 08:30:00|
|2018-01-01 00:48:00|2018-01-02 07:57:00|2018-01-05 08:30:00|
|2018-01-01 01:29:00|2018-01-02 08:13:00|2018-01-17 08:30:00|
|2018-01-01 01:34:00|2018-01-01 13:29:00|2018-01-01 04:34:00|
+-------------------+-------------------+-------------------+
only showing top 5 rows



### Data Transformations

In [98]:
#let's normalize address field
cases.select('request_address').show(5)

+--------------------+
|     request_address|
+--------------------+
|2315  EL PASO ST,...|
|2215  GOLIAD RD, ...|
|102  PALFREY ST W...|
|114  LA GARDE ST,...|
|734  CLEARVIEW DR...|
+--------------------+
only showing top 5 rows



In [99]:
#lower case and trim to make sure there is no leading or trailing whitespace
cases = cases.withColumn('request_address', trim(lower(cases.request_address)))

In [100]:
#double check
cases.select('request_address').show(5)

+--------------------+
|     request_address|
+--------------------+
|2315  el paso st,...|
|2215  goliad rd, ...|
|102  palfrey st w...|
|114  la garde st,...|
|734  clearview dr...|
+--------------------+
only showing top 5 rows



In [101]:
#number of days late can be converted to number of weeks late
cases = cases.withColumn('num_weeks_late', expr('num_days_late / 7 AS num_weeks_late'))

#take a look at change
cases.select('num_days_late', 'num_weeks_late').show(5)

+-------------------+--------------------+
|      num_days_late|      num_weeks_late|
+-------------------+--------------------+
| -998.5087616000001|        -142.6441088|
|-2.0126041669999997|-0.28751488099999994|
|       -3.022337963|-0.43176256614285713|
|       -15.01148148| -2.1444973542857144|
|0.37216435200000003|         0.053166336|
+-------------------+--------------------+
only showing top 5 rows



In [103]:
#add leading 0s to council district to fix format

#converting back to int temporarily
cases = cases.withColumn('council_district', col('council_district').cast('int'))

#'%03d' means at least 3 digits, pad with 0s (fill leading digits w/ 0s)

#converted council_district back to int temporarily, but the final output will be a string.
cases = cases.withColumn(
    "council_district",
    format_string("%03d", col("council_district").cast("int")),
)

cases.select("council_district").show(5)

+----------------+
|council_district|
+----------------+
|             005|
|             003|
|             003|
|             003|
|             007|
+----------------+
only showing top 5 rows



In [None]:
#format_string returns col back to string

In [106]:
cases.printSchema()

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: timestamp (nullable = true)
 |-- case_closed_date: timestamp (nullable = true)
 |-- case_due_date: timestamp (nullable = true)
 |-- case_late: boolean (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: boolean (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: string (nullable = false)
 |-- num_weeks_late: double (nullable = true)



### New Features

In [107]:
#add zipcode by extracting it from the address
#extract from rqst_address w/ regex digits at the end of line, 0 for no capture group
cases = cases.withColumn("zipcode", regexp_extract("request_address", r"\d+$", 0))

cases.select("zipcode").show(5)

+-------+
|zipcode|
+-------+
|  78207|
|  78223|
|  78223|
|  78223|
|  78228|
+-------+
only showing top 5 rows



In [109]:
#add case age: How old the case is; the difference in days between when the case was opened and the current day
#days_to_closed: The number of days between when the case was opened and when it was closed
#case_lifetime: Number of days between when the case was opened and when it was closed, 
            #if the case is still open, the number of days since the case was opened

cases = (
    cases.withColumn(
        "case_age", datediff(current_timestamp(), "case_opened_date")
    )
    .withColumn(
        "days_to_closed", datediff("case_closed_date", "case_opened_date")
    )
    .withColumn(
        "case_lifetime",
        when(expr("! case_closed"), col("case_age")).otherwise(
            col("days_to_closed")
        ),
    )
)

cases.select(
    "case_closed",
    "case_opened_date",
    "case_closed_date",
    "case_age",
    "days_to_closed",
    "case_lifetime",
).where(expr("case_closed")).show(5)

cases.select(
    "case_closed",
    "case_opened_date",
    "case_closed_date",
    "case_age",
    "days_to_closed",
    "case_lifetime",
).where(expr("! case_closed")).show(5)

+-----------+-------------------+-------------------+--------+--------------+-------------+
|case_closed|   case_opened_date|   case_closed_date|case_age|days_to_closed|case_lifetime|
+-----------+-------------------+-------------------+--------+--------------+-------------+
|       true|2018-01-01 00:42:00|2018-01-01 12:29:00|    1231|             0|            0|
|       true|2018-01-01 00:46:00|2018-01-03 08:11:00|    1231|             2|            2|
|       true|2018-01-01 00:48:00|2018-01-02 07:57:00|    1231|             1|            1|
|       true|2018-01-01 01:29:00|2018-01-02 08:13:00|    1231|             1|            1|
|       true|2018-01-01 01:34:00|2018-01-01 13:29:00|    1231|             0|            0|
+-----------+-------------------+-------------------+--------+--------------+-------------+
only showing top 5 rows

+-----------+-------------------+----------------+--------+--------------+-------------+
|case_closed|   case_opened_date|case_closed_date|case_age

### Joining New Dataset

In [110]:
#access dept.csv dataset (contains more info about various dpts)
dept = spark.read.csv("dept.csv", header=True, inferSchema=True)
dept.show(5)

+--------------------+--------------------+----------------------+-------------------+
|       dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+--------------------+--------------------+----------------------+-------------------+
|     311 Call Center|    Customer Service|      Customer Service|                YES|
|               Brush|Solid Waste Manag...|           Solid Waste|                YES|
|     Clean and Green|Parks and Recreation|    Parks & Recreation|                YES|
|Clean and Green N...|Parks and Recreation|    Parks & Recreation|                YES|
|    Code Enforcement|Code Enforcement ...|  DSD/Code Enforcement|                YES|
+--------------------+--------------------+----------------------+-------------------+
only showing top 5 rows



#### Double checking what field to join on by looking at unique values
#### Important not to assume to join just b/c columns are named the same

In [114]:
#double check unique values in dept
dept.groupBy('dept_division').count().show(truncate = False)
                                            #truncate false to show full value

+----------------------------+-----+
|dept_division               |count|
+----------------------------+-----+
|Miscellaneous               |1    |
|Solid Waste                 |1    |
|Field Operations            |1    |
|Streets                     |1    |
|Waste Collection            |1    |
|District 7                  |1    |
|Code Enforcement (IntExp)   |1    |
|District 10                 |1    |
|Vector                      |1    |
|Reservations                |1    |
|Dangerous Premise           |1    |
|311 Call Center             |1    |
|Brush                       |1    |
|Dangerous Premise (IntExp)  |1    |
|Code Enforcement (Internal) |1    |
|Traffic Engineering Design  |1    |
|District 2                  |1    |
|Signals                     |1    |
|Engineering Division        |1    |
|Director's Office Horizontal|1    |
+----------------------------+-----+
only showing top 20 rows



In [115]:
#double check unique values of dept_division in cases
cases.groupBy('dept_division').count().show(truncate = False)

+----------------------------+------+
|dept_division               |count |
+----------------------------+------+
|Miscellaneous               |45123 |
|Solid Waste                 |813   |
|Field Operations            |116915|
|Streets                     |38510 |
|Waste Collection            |215122|
|District 7                  |2     |
|Code Enforcement (IntExp)   |2189  |
|District 10                 |2     |
|Vector                      |538   |
|Reservations                |2     |
|Dangerous Premise           |15479 |
|311 Call Center             |2849  |
|Brush                       |18212 |
|Dangerous Premise (IntExp)  |36    |
|Traffic Engineering Design  |4334  |
|Code Enforcement (Internal) |198   |
|District 2                  |3     |
|Signals                     |20700 |
|Engineering Division        |1375  |
|Director's Office Horizontal|515   |
+----------------------------+------+
only showing top 20 rows



In [116]:
#join on dept_division
cases = (
    #left join on dept_division, b/c if some dpts that have no cases that is okay
    #we want all the cases even if dpt does not show up
    cases.join(dept, "dept_division", "left")
    # drop all the columns except for standardized name, as it has much fewer unique values
    #reduces noise
    .drop(dept.dept_division)
    .drop(dept.dept_name)
    .drop(cases.dept_division) #cause there is standardized_dept_name
    .withColumnRenamed("standardized_dept_name", "department") #better readability
    # convert to a boolean
    .withColumn("dept_subject_to_SLA", col("dept_subject_to_SLA") == "YES")
)

cases.show(2, vertical=True)


-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  el paso st,... 
 council_district     | 005                  
 num_weeks_late       | -142.6441088         
 zipcode              | 78207                
 case_age             | 1231                 
 days_to_closed       | 0                    
 case_lifetime        | 0                    
 department           | Animal Care Services 
 dept_subject_to_SLA  | true                 
-RECORD 1-------------------------

## Data Splitting

In [119]:
#70%, 20% and 10%
train, validate, test = cases.randomSplit([0.7, 0.2, 0.1])

In [120]:
train.count()

588316

In [121]:
validate.count()

168901

In [122]:
test.count()

84487