### Data Wrangling

https://docs.google.com/presentation/d/1v54Tr4POZj9K4zsaHOn7U22rzdSlTXsyCgE5lfhd-rA/edit?usp=sharing

In [20]:
# schema functions import
from pyspark.sql.types import StructType, StructField, StringType

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.getOrCreate()

#### Read data 

We will use San Antonio's 311 call data for this lesson and exercises. Dowload the CSV data from Google classroom

https://classroom.google.com/u/0/w/Mzg3MTg5NzU1Njk1/tc/Mzg3MTg5NzU1NzE1

In [4]:
# Read in CSV file to spark df
source = (spark.read.csv("csv_files/source.csv",
                     sep=",",
                     header=True,
                     inferSchema=True)
     )

In [7]:
# looking at the spark df
source.show()

+---------+--------------------+
|source_id|     source_username|
+---------+--------------------+
|   100137|    Merlene Blodgett|
|   103582|         Carmen Cura|
|   106463|     Richard Sanchez|
|   119403|      Betty De Hoyos|
|   119555|      Socorro Quiara|
|   119868| Michelle San Miguel|
|   120752|      Eva T. Kleiber|
|   124405|           Lori Lara|
|   132408|       Leonard Silva|
|   135723|        Amy Cardenas|
|   136202|    Michelle Urrutia|
|   136979|      Leticia Garcia|
|   137943|    Pamela K. Baccus|
|   138605|        Marisa Ozuna|
|   138650|      Kimberly Green|
|   138650|Kimberly Green-Woods|
|   138793| Guadalupe Rodriguez|
|   138810|       Tawona Martin|
|   139342|     Jessica Mendoza|
|   139344|        Isis Mendoza|
+---------+--------------------+
only showing top 20 rows



In [19]:
# Another way to read in data:

(
    spark.read.format("csv")
    .option("sep", ",")
    .option("inferSchema", True)
    .option("header", True)
    .load("csv_files/source.csv")
).show()

+---------+--------------------+
|source_id|     source_username|
+---------+--------------------+
|   100137|    Merlene Blodgett|
|   103582|         Carmen Cura|
|   106463|     Richard Sanchez|
|   119403|      Betty De Hoyos|
|   119555|      Socorro Quiara|
|   119868| Michelle San Miguel|
|   120752|      Eva T. Kleiber|
|   124405|           Lori Lara|
|   132408|       Leonard Silva|
|   135723|        Amy Cardenas|
|   136202|    Michelle Urrutia|
|   136979|      Leticia Garcia|
|   137943|    Pamela K. Baccus|
|   138605|        Marisa Ozuna|
|   138650|      Kimberly Green|
|   138650|Kimberly Green-Woods|
|   138793| Guadalupe Rodriguez|
|   138810|       Tawona Martin|
|   139342|     Jessica Mendoza|
|   139344|        Isis Mendoza|
+---------+--------------------+
only showing top 20 rows



#### Data Schemas
Spark includes a concept of a data schema  
Specify the types of our data ahead of time

In [21]:
# getting data schema object
StructType(
    [
        StructField("source_id", StringType()),
        StructField("source_username", StringType()),
    ]
)

StructType(List(StructField(source_id,StringType,true),StructField(source_username,StringType,true)))

In [25]:
# from pyspark.sql.types import StructType, StructField, StringType

data_schema = StructType(
    [
        StructField("source_id", StringType()),
        StructField("source_username", StringType()),
    ]
)


# Read csv, but now we specify >> the schema:
source = spark.read.csv("csv_files/source.csv", header = True, schema = data_schema)

In [26]:
# printing the schema
source.printSchema()

root
 |-- source_id: string (nullable = true)
 |-- source_username: string (nullable = true)



#### Writing data

In [27]:
# write data to a destination using .write property
source.write.json("source_json", mode = "overwrite")

#### Data Preparation

In [28]:
# Read the case.csv file

df = spark.read.csv("csv_files/case.csv", header = True, inferSchema = True)

In [36]:
# shape of dataframe
'There are ' + str(len(df.columns)) + ' columns and ' + str(df.count()) + ' rows.'

'There are 14 columns and 841704 rows.'

In [37]:
# look at first three records
df.show(14)

+----------+----------------+----------------+------------+---------+-------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|   case_id|case_opened_date|case_closed_date|SLA_due_date|case_late|num_days_late|case_closed|   dept_division|service_request_type|   SLA_days|case_status|source_id|     request_address|council_district|
+----------+----------------+----------------+------------+---------+-------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|1014127332|     1/1/18 0:42|    1/1/18 12:29|9/26/20 0:42|       NO| -998.5087616|        YES|Field Operations|        Stray Animal|      999.0|     Closed| svcCRMLS|2315  EL PASO ST,...|               5|
|1014127333|     1/1/18 0:46|     1/3/18 8:11| 1/5/18 8:30|       NO| -2.012604167|        YES|     Storm Water|Removal Of Obstru...|4.322222222|     Closed| svcCRMSS|2215  GOL

In [38]:
#datatypes?
df.dtypes

[('case_id', 'int'),
 ('case_opened_date', 'string'),
 ('case_closed_date', 'string'),
 ('SLA_due_date', 'string'),
 ('case_late', 'string'),
 ('num_days_late', 'double'),
 ('case_closed', 'string'),
 ('dept_division', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'double'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'int')]

**Things to do:**

1. **Rename Columns:**
    - 'SLA_due_date -> case_due_date



2. **Correct Data Types:**
    - case_closed and case_late to boolean
    - council_district as a string
    - case_opened_date, case_closed_date and case_due_date to datetime format


3. **Data Transformation:**
    - request_address: trim and lowercase
    - format council district with leading zeros
    - convert the number of days a case is late to a number of weeks
    
    
4. **New features:**
    - zip_code : extract from address
    - case_age 
    - days_to_closed
    - case_lifetime
    
    
5. **Join cases data with department data:**


#### Rename Columns:

In [40]:
# Rename 'SLA_due_date' to 'case_due_date' using .withColumnRenamed
df.withColumnRenamed('SLA_due_date', 'case_due_date')

DataFrame[case_id: int, case_opened_date: string, case_closed_date: string, case_due_date: string, case_late: string, num_days_late: double, case_closed: string, dept_division: string, service_request_type: string, SLA_days: double, case_status: string, source_id: string, request_address: string, council_district: int]

In [41]:
# writing over original df
df = df.withColumnRenamed('SLA_due_date', 'case_due_date')

In [42]:
# df preview
df.show(2)

+----------+----------------+----------------+-------------+---------+-------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|   case_id|case_opened_date|case_closed_date|case_due_date|case_late|num_days_late|case_closed|   dept_division|service_request_type|   SLA_days|case_status|source_id|     request_address|council_district|
+----------+----------------+----------------+-------------+---------+-------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|1014127332|     1/1/18 0:42|    1/1/18 12:29| 9/26/20 0:42|       NO| -998.5087616|        YES|Field Operations|        Stray Animal|      999.0|     Closed| svcCRMLS|2315  EL PASO ST,...|               5|
|1014127333|     1/1/18 0:46|     1/3/18 8:11|  1/5/18 8:30|       NO| -2.012604167|        YES|     Storm Water|Removal Of Obstru...|4.322222222|     Closed| svcCRMSS|2215

#### Correct Data Types:

In [39]:
df.dtypes

[('case_id', 'int'),
 ('case_opened_date', 'string'),
 ('case_closed_date', 'string'),
 ('SLA_due_date', 'string'),
 ('case_late', 'string'),
 ('num_days_late', 'double'),
 ('case_closed', 'string'),
 ('dept_division', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'double'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'int')]

In [43]:
# correct data types: case_closed and case_late to boolean

# preview
df.select("case_closed", "case_late").show(5)

+-----------+---------+
|case_closed|case_late|
+-----------+---------+
|        YES|       NO|
|        YES|       NO|
|        YES|       NO|
|        YES|       NO|
|        YES|      YES|
+-----------+---------+
only showing top 5 rows



In [54]:
# use .withColumn to change columns from string to boolean values
df.withColumn('case_closed', expr('case_closed == "YES"'))\
.withColumn('case_late', expr('case_late == "YES"'))

DataFrame[case_id: int, case_opened_date: string, case_closed_date: string, case_due_date: string, case_late: boolean, num_days_late: double, case_closed: boolean, dept_division: string, service_request_type: string, SLA_days: double, case_status: string, source_id: string, request_address: string, council_district: int]

In [55]:
#check the columns
df.select("case_closed", "case_late").show(5)

+-----------+---------+
|case_closed|case_late|
+-----------+---------+
|        YES|       NO|
|        YES|       NO|
|        YES|       NO|
|        YES|       NO|
|        YES|      YES|
+-----------+---------+
only showing top 5 rows



In [56]:
# council_district cast as string
df.select('council_district').show(4)

+----------------+
|council_district|
+----------------+
|               5|
|               3|
|               3|
|               3|
+----------------+
only showing top 4 rows



In [58]:
# council_district as a string instead of int
df = df.withColumn('council_district', col('council_district').cast('string'))

In [63]:
# view the column


In [64]:
# check datatypes
df.dtypes

[('case_id', 'int'),
 ('case_opened_date', 'string'),
 ('case_closed_date', 'string'),
 ('case_due_date', 'string'),
 ('case_late', 'string'),
 ('num_days_late', 'double'),
 ('case_closed', 'string'),
 ('dept_division', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'double'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'string')]

In [65]:
# convert case_opened_date, case_closed_date and case_due_date to datetime format

df.select('case_opened_date', 'case_closed_date', 'case_due_date').show(5)


+----------------+----------------+-------------+
|case_opened_date|case_closed_date|case_due_date|
+----------------+----------------+-------------+
|     1/1/18 0:42|    1/1/18 12:29| 9/26/20 0:42|
|     1/1/18 0:46|     1/3/18 8:11|  1/5/18 8:30|
|     1/1/18 0:48|     1/2/18 7:57|  1/5/18 8:30|
|     1/1/18 1:29|     1/2/18 8:13| 1/17/18 8:30|
|     1/1/18 1:34|    1/1/18 13:29|  1/1/18 4:34|
+----------------+----------------+-------------+
only showing top 5 rows



In [69]:
# to_timestamp, fmt

fmt = "M/d/yy H:mm"

df.withColumn('case_opened_data', to_timestamp('case_opened_date', fmt))\
df.withColumn('case_closed_data', to_timestamp('case_closed_date', fmt))\
df.withColumn('case_due_data', to_timestamp('case_due_date', fmt))

DataFrame[case_id: int, case_opened_date: string, case_closed_date: string, case_due_date: string, case_late: string, num_days_late: double, case_closed: string, dept_division: string, service_request_type: string, SLA_days: double, case_status: string, source_id: string, request_address: string, council_district: string, case_opened_data: timestamp, case_closed_data: timestamp, case_due_data: timestamp]

In [73]:
# assigning to df
df = df.withColumn('case_opened_data', to_timestamp('case_opened_date', fmt))\
.withColumn('case_closed_data', to_timestamp('case_closed_date', fmt))\
.withColumn('case_due_data', to_timestamp('case_due_date', fmt))

In [74]:
# check the three columns again
df.select('case_opened_date', 'case_closed_date', 'case_due_date').show(5)

+----------------+----------------+-------------+
|case_opened_date|case_closed_date|case_due_date|
+----------------+----------------+-------------+
|     1/1/18 0:42|    1/1/18 12:29| 9/26/20 0:42|
|     1/1/18 0:46|     1/3/18 8:11|  1/5/18 8:30|
|     1/1/18 0:48|     1/2/18 7:57|  1/5/18 8:30|
|     1/1/18 1:29|     1/2/18 8:13| 1/17/18 8:30|
|     1/1/18 1:34|    1/1/18 13:29|  1/1/18 4:34|
+----------------+----------------+-------------+
only showing top 5 rows



In [75]:
df.dtypes

[('case_id', 'int'),
 ('case_opened_date', 'string'),
 ('case_closed_date', 'string'),
 ('case_due_date', 'string'),
 ('case_late', 'string'),
 ('num_days_late', 'double'),
 ('case_closed', 'string'),
 ('dept_division', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'double'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'string'),
 ('case_opened_data', 'timestamp'),
 ('case_closed_data', 'timestamp'),
 ('case_due_data', 'timestamp')]

#### Data Transformation

In [77]:
df.select('request_address').show(5)

+--------------------+
|     request_address|
+--------------------+
|2315  EL PASO ST,...|
|2215  GOLIAD RD, ...|
|102  PALFREY ST W...|
|114  LA GARDE ST,...|
|734  CLEARVIEW DR...|
+--------------------+
only showing top 5 rows



In [76]:
# request_address: trim and lowercase
#     False removes truncations
df.select('request_address').show(5, False)

+-------------------------------------+
|request_address                      |
+-------------------------------------+
|2315  EL PASO ST, San Antonio, 78207 |
|2215  GOLIAD RD, San Antonio, 78223  |
|102  PALFREY ST W, San Antonio, 78223|
|114  LA GARDE ST, San Antonio, 78223 |
|734  CLEARVIEW DR, San Antonio, 78228|
+-------------------------------------+
only showing top 5 rows



In [78]:
# request_address: trim and lowercase

df.withColumn('request_address', lower(trim('request_address')))

DataFrame[case_id: int, case_opened_date: string, case_closed_date: string, case_due_date: string, case_late: string, num_days_late: double, case_closed: string, dept_division: string, service_request_type: string, SLA_days: double, case_status: string, source_id: string, request_address: string, council_district: string, case_opened_data: timestamp, case_closed_data: timestamp, case_due_data: timestamp]

In [79]:
df = df.withColumn('request_address', lower(trim('request_address')))

In [80]:
df.select('request_address').show(5, False)

+-------------------------------------+
|request_address                      |
+-------------------------------------+
|2315  el paso st, san antonio, 78207 |
|2215  goliad rd, san antonio, 78223  |
|102  palfrey st w, san antonio, 78223|
|114  la garde st, san antonio, 78223 |
|734  clearview dr, san antonio, 78228|
+-------------------------------------+
only showing top 5 rows



In [82]:
# convert the number of days a case is late to a number of weeks

# viewing current column for number of days late
df.select('num_days_late').show(5)

+-------------+
|num_days_late|
+-------------+
| -998.5087616|
| -2.012604167|
| -3.022337963|
| -15.01148148|
|  0.372164352|
+-------------+
only showing top 5 rows



In [84]:
df.dtypes

[('case_id', 'int'),
 ('case_opened_date', 'string'),
 ('case_closed_date', 'string'),
 ('case_due_date', 'string'),
 ('case_late', 'string'),
 ('num_days_late', 'double'),
 ('case_closed', 'string'),
 ('dept_division', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'double'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'string'),
 ('case_opened_data', 'timestamp'),
 ('case_closed_data', 'timestamp'),
 ('case_due_data', 'timestamp')]

In [87]:
# code to get new col
df.withColumn('num_weeks_late', expr('num_days_late / 7')).select('num_weeks_late').show(5)

+--------------------+
|      num_weeks_late|
+--------------------+
|        -142.6441088|
|        -0.287514881|
|-0.43176256614285713|
| -2.1444973542857144|
|0.053166335999999995|
+--------------------+
only showing top 5 rows



In [89]:
# writing over df, adding new weeks col to original data
df = df.withColumn('num_weeks_late', expr('num_days_late / 7'))

df.show(1)

+----------+----------------+----------------+-------------+---------+-------------+-----------+----------------+--------------------+--------+-----------+---------+--------------------+----------------+-------------------+-------------------+-------------------+--------------+
|   case_id|case_opened_date|case_closed_date|case_due_date|case_late|num_days_late|case_closed|   dept_division|service_request_type|SLA_days|case_status|source_id|     request_address|council_district|   case_opened_data|   case_closed_data|      case_due_data|num_weeks_late|
+----------+----------------+----------------+-------------+---------+-------------+-----------+----------------+--------------------+--------+-----------+---------+--------------------+----------------+-------------------+-------------------+-------------------+--------------+
|1014127332|     1/1/18 0:42|    1/1/18 12:29| 9/26/20 0:42|       NO| -998.5087616|        YES|Field Operations|        Stray Animal|   999.0|     Closed| svcCRML

In [91]:
# use format_string function to pad zeros for council_district
df.withColumn('council_district', format_string('%03d', col('council_district').cast('int')))

DataFrame[case_id: int, case_opened_date: string, case_closed_date: string, case_due_date: string, case_late: string, num_days_late: double, case_closed: string, dept_division: string, service_request_type: string, SLA_days: double, case_status: string, source_id: string, request_address: string, council_district: string, case_opened_data: timestamp, case_closed_data: timestamp, case_due_data: timestamp, num_weeks_late: double]

In [97]:
df = df.withColumn('council_district', format_string('%03d', col('council_district')).cast('int'))

df.dtypes

[('case_id', 'int'),
 ('case_opened_date', 'string'),
 ('case_closed_date', 'string'),
 ('case_due_date', 'string'),
 ('case_late', 'string'),
 ('num_days_late', 'double'),
 ('case_closed', 'string'),
 ('dept_division', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'double'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'int'),
 ('case_opened_data', 'timestamp'),
 ('case_closed_data', 'timestamp'),
 ('case_due_data', 'timestamp'),
 ('num_weeks_late', 'double')]

#### New features:

In [98]:
# create a new column for zipcode:

df.withColumn('zipcode', expr('right(request_address, 5)'))

DataFrame[case_id: int, case_opened_date: string, case_closed_date: string, case_due_date: string, case_late: string, num_days_late: double, case_closed: string, dept_division: string, service_request_type: string, SLA_days: double, case_status: string, source_id: string, request_address: string, council_district: int, case_opened_data: timestamp, case_closed_data: timestamp, case_due_data: timestamp, num_weeks_late: double, zipcode: string]

In [99]:
df = df.withColumn('zipcode', expr('right(request_address, 5)'))

In [107]:
df.select('request_address', 'zipcode').show(4, False)

+-------------------------------------+-------+
|request_address                      |zipcode|
+-------------------------------------+-------+
|2315  el paso st, san antonio, 78207 |78207  |
|2215  goliad rd, san antonio, 78223  |78223  |
|102  palfrey st w, san antonio, 78223|78223  |
|114  la garde st, san antonio, 78223 |78223  |
+-------------------------------------+-------+
only showing top 4 rows



In [103]:
# # with regex
# regex = r'(/d+$)

# df.withColumn('zipcode', regexp)

+-------+
|zipcode|
+-------+
|  78207|
|  78223|
|  78223|
|  78223|
+-------+
only showing top 4 rows



case_age: How old the case is; the difference in days between when the case was opened and the current day  
days_to_closed: The number of days between when the case was opened and when it was closed  
case_lifetime: Number of days between when the case was opened and when it was closed, if the case is still open, the number of days since the case was opened  

In [110]:
#create three new columns 'case_age', 'days_to_closed', 'case_lifetime'

df = (
    df.withColumn(
        "case_age", datediff(current_timestamp(), "case_opened_date")
    )
    .withColumn(
        "days_to_closed", datediff("case_closed_date", "case_opened_date")
    )
    .withColumn(
        "case_lifetime",
        when(expr("! case_closed"), col("case_age")).otherwise(
            col("days_to_closed")
        ),
    )
)

AnalysisException: cannot resolve '(NOT case_closed)' due to data type mismatch: argument 1 requires boolean type, however, 'case_closed' is of string type.; line 1 pos 0;
'Project [case_id#371, case_opened_date#372, case_closed_date#373, case_due_date#599, case_late#375, num_days_late#376, case_closed#377, dept_division#378, service_request_type#379, SLA_days#380, case_status#381, source_id#382, request_address#1142, council_district#1693, case_opened_data#1037, case_closed_data#1055, case_due_data#1073, num_weeks_late#1432, zipcode#1732, case_age#1880, days_to_closed#1901, CASE WHEN NOT case_closed#377 THEN case_age#1880 ELSE days_to_closed#1901 END AS case_lifetime#1923]
+- Project [case_id#371, case_opened_date#372, case_closed_date#373, case_due_date#599, case_late#375, num_days_late#376, case_closed#377, dept_division#378, service_request_type#379, SLA_days#380, case_status#381, source_id#382, request_address#1142, council_district#1693, case_opened_data#1037, case_closed_data#1055, case_due_data#1073, num_weeks_late#1432, zipcode#1732, case_age#1880, datediff(cast(case_closed_date#373 as date), cast(case_opened_date#372 as date)) AS days_to_closed#1901]
   +- Project [case_id#371, case_opened_date#372, case_closed_date#373, case_due_date#599, case_late#375, num_days_late#376, case_closed#377, dept_division#378, service_request_type#379, SLA_days#380, case_status#381, source_id#382, request_address#1142, council_district#1693, case_opened_data#1037, case_closed_data#1055, case_due_data#1073, num_weeks_late#1432, zipcode#1732, datediff(cast(current_timestamp() as date), cast(case_opened_date#372 as date)) AS case_age#1880]
      +- Project [case_id#371, case_opened_date#372, case_closed_date#373, case_due_date#599, case_late#375, num_days_late#376, case_closed#377, dept_division#378, service_request_type#379, SLA_days#380, case_status#381, source_id#382, request_address#1142, council_district#1693, case_opened_data#1037, case_closed_data#1055, case_due_data#1073, num_weeks_late#1432, right(request_address#1142, 5) AS zipcode#1732]
         +- Project [case_id#371, case_opened_date#372, case_closed_date#373, case_due_date#599, case_late#375, num_days_late#376, case_closed#377, dept_division#378, service_request_type#379, SLA_days#380, case_status#381, source_id#382, request_address#1142, cast(format_string(%03d, council_district#1674) as int) AS council_district#1693, case_opened_data#1037, case_closed_data#1055, case_due_data#1073, num_weeks_late#1432]
            +- Project [case_id#371, case_opened_date#372, case_closed_date#373, case_due_date#599, case_late#375, num_days_late#376, case_closed#377, dept_division#378, service_request_type#379, SLA_days#380, case_status#381, source_id#382, request_address#1142, cast(format_string(%03d, council_district#1648) as int) AS council_district#1674, case_opened_data#1037, case_closed_data#1055, case_due_data#1073, num_weeks_late#1432]
               +- Project [case_id#371, case_opened_date#372, case_closed_date#373, case_due_date#599, case_late#375, num_days_late#376, case_closed#377, dept_division#378, service_request_type#379, SLA_days#380, case_status#381, source_id#382, request_address#1142, cast(format_string(%03d, council_district#1628) as int) AS council_district#1648, case_opened_data#1037, case_closed_data#1055, case_due_data#1073, num_weeks_late#1432]
                  +- Project [case_id#371, case_opened_date#372, case_closed_date#373, case_due_date#599, case_late#375, num_days_late#376, case_closed#377, dept_division#378, service_request_type#379, SLA_days#380, case_status#381, source_id#382, request_address#1142, cast(format_string(%03d, council_district#1602) as int) AS council_district#1628, case_opened_data#1037, case_closed_data#1055, case_due_data#1073, num_weeks_late#1432]
                     +- Project [case_id#371, case_opened_date#372, case_closed_date#373, case_due_date#599, case_late#375, num_days_late#376, case_closed#377, dept_division#378, service_request_type#379, SLA_days#380, case_status#381, source_id#382, request_address#1142, cast(format_string(%03d, council_district#1576) as int) AS council_district#1602, case_opened_data#1037, case_closed_data#1055, case_due_data#1073, num_weeks_late#1432]
                        +- Project [case_id#371, case_opened_date#372, case_closed_date#373, case_due_date#599, case_late#375, num_days_late#376, case_closed#377, dept_division#378, service_request_type#379, SLA_days#380, case_status#381, source_id#382, request_address#1142, cast(format_string(%03d, council_district#785) as int) AS council_district#1576, case_opened_data#1037, case_closed_data#1055, case_due_data#1073, num_weeks_late#1432]
                           +- Project [case_id#371, case_opened_date#372, case_closed_date#373, case_due_date#599, case_late#375, num_days_late#376, case_closed#377, dept_division#378, service_request_type#379, SLA_days#380, case_status#381, source_id#382, request_address#1142, council_district#785, case_opened_data#1037, case_closed_data#1055, case_due_data#1073, (num_days_late#376 / cast(7 as double)) AS num_weeks_late#1432]
                              +- Project [case_id#371, case_opened_date#372, case_closed_date#373, case_due_date#599, case_late#375, num_days_late#376, case_closed#377, dept_division#378, service_request_type#379, SLA_days#380, case_status#381, source_id#382, request_address#1142, council_district#785, case_opened_data#1037, case_closed_data#1055, case_due_data#1073, (num_days_late#376 / cast(7 as double)) AS num_weeks_late#1326]
                                 +- Project [case_id#371, case_opened_date#372, case_closed_date#373, case_due_date#599, case_late#375, num_days_late#376, case_closed#377, dept_division#378, service_request_type#379, SLA_days#380, case_status#381, source_id#382, lower(trim(request_address#383, None)) AS request_address#1142, council_district#785, case_opened_data#1037, case_closed_data#1055, case_due_data#1073]
                                    +- Project [case_id#371, case_opened_date#372, case_closed_date#373, case_due_date#599, case_late#375, num_days_late#376, case_closed#377, dept_division#378, service_request_type#379, SLA_days#380, case_status#381, source_id#382, request_address#383, council_district#785, case_opened_data#1037, case_closed_data#1055, to_timestamp('case_due_date, Some(M/d/yy H:mm)) AS case_due_data#1073]
                                       +- Project [case_id#371, case_opened_date#372, case_closed_date#373, case_due_date#599, case_late#375, num_days_late#376, case_closed#377, dept_division#378, service_request_type#379, SLA_days#380, case_status#381, source_id#382, request_address#383, council_district#785, case_opened_data#1037, to_timestamp('case_closed_date, Some(M/d/yy H:mm)) AS case_closed_data#1055, case_due_data#997]
                                          +- Project [case_id#371, case_opened_date#372, case_closed_date#373, case_due_date#599, case_late#375, num_days_late#376, case_closed#377, dept_division#378, service_request_type#379, SLA_days#380, case_status#381, source_id#382, request_address#383, council_district#785, to_timestamp('case_opened_date, Some(M/d/yy H:mm)) AS case_opened_data#1037, case_closed_data#980, case_due_data#997]
                                             +- Project [case_id#371, case_opened_date#372, case_closed_date#373, case_due_date#599, case_late#375, num_days_late#376, case_closed#377, dept_division#378, service_request_type#379, SLA_days#380, case_status#381, source_id#382, request_address#383, council_district#785, case_opened_data#964, case_closed_data#980, to_timestamp('case_due_date, Some(M/d/yy H:mm)) AS case_due_data#997]
                                                +- Project [case_id#371, case_opened_date#372, case_closed_date#373, case_due_date#599, case_late#375, num_days_late#376, case_closed#377, dept_division#378, service_request_type#379, SLA_days#380, case_status#381, source_id#382, request_address#383, council_district#785, case_opened_data#964, to_timestamp('case_closed_date, Some(M/d/yy H:mm)) AS case_closed_data#980]
                                                   +- Project [case_id#371, case_opened_date#372, case_closed_date#373, case_due_date#599, case_late#375, num_days_late#376, case_closed#377, dept_division#378, service_request_type#379, SLA_days#380, case_status#381, source_id#382, request_address#383, council_district#785, to_timestamp('case_opened_date, Some(M/d/yy H:mm)) AS case_opened_data#964]
                                                      +- Project [case_id#371, case_opened_date#372, case_closed_date#373, case_due_date#599, case_late#375, num_days_late#376, case_closed#377, dept_division#378, service_request_type#379, SLA_days#380, case_status#381, source_id#382, request_address#383, cast(council_district#384 as string) AS council_district#785]
                                                         +- Project [case_id#371, case_opened_date#372, case_closed_date#373, SLA_due_date#374 AS case_due_date#599, case_late#375, num_days_late#376, case_closed#377, dept_division#378, service_request_type#379, SLA_days#380, case_status#381, source_id#382, request_address#383, council_district#384]
                                                            +- Relation [case_id#371,case_opened_date#372,case_closed_date#373,SLA_due_date#374,case_late#375,num_days_late#376,case_closed#377,dept_division#378,service_request_type#379,SLA_days#380,case_status#381,source_id#382,request_address#383,council_district#384] csv


In [None]:
df.show(1, False, True)

In [112]:
# read the dept.csv file:

dept = spark.read.csv("csv_files/dept.csv", header = True, inferSchema = True)
dept.show(5, False, True)

-RECORD 0-----------------------------------------------
 dept_division          | 311 Call Center               
 dept_name              | Customer Service              
 standardized_dept_name | Customer Service              
 dept_subject_to_SLA    | YES                           
-RECORD 1-----------------------------------------------
 dept_division          | Brush                         
 dept_name              | Solid Waste Management        
 standardized_dept_name | Solid Waste                   
 dept_subject_to_SLA    | YES                           
-RECORD 2-----------------------------------------------
 dept_division          | Clean and Green               
 dept_name              | Parks and Recreation          
 standardized_dept_name | Parks & Recreation            
 dept_subject_to_SLA    | YES                           
-RECORD 3-----------------------------------------------
 dept_division          | Clean and Green Natural Areas 
 dept_name              | Parks

In [124]:
dept.show(5, True)

+--------------------+--------------------+----------------------+-------------------+
|       dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+--------------------+--------------------+----------------------+-------------------+
|     311 Call Center|    Customer Service|      Customer Service|                YES|
|               Brush|Solid Waste Manag...|           Solid Waste|                YES|
|     Clean and Green|Parks and Recreation|    Parks & Recreation|                YES|
|Clean and Green N...|Parks and Recreation|    Parks & Recreation|                YES|
|    Code Enforcement|Code Enforcement ...|  DSD/Code Enforcement|                YES|
+--------------------+--------------------+----------------------+-------------------+
only showing top 5 rows



In [118]:
dept.select('dept_division').distinct().show()
dept.select('dept_name').show()

+--------------------+
|       dept_division|
+--------------------+
|       Miscellaneous|
|         Solid Waste|
|    Field Operations|
|             Streets|
|    Waste Collection|
|          District 7|
|Code Enforcement ...|
|         District 10|
|              Vector|
|        Reservations|
|   Dangerous Premise|
|     311 Call Center|
|               Brush|
|Dangerous Premise...|
|Code Enforcement ...|
|Traffic Engineeri...|
|          District 2|
|             Signals|
|Engineering Division|
|Director's Office...|
+--------------------+
only showing top 20 rows

+--------------------+
|           dept_name|
+--------------------+
|    Customer Service|
|Solid Waste Manag...|
|Parks and Recreation|
|Parks and Recreation|
|Code Enforcement ...|
|Code Enforcement ...|
|                null|
|Code Enforcement ...|
|Code Enforcement ...|
|Trans & Cap Impro...|
|        City Council|
|        City Council|
|        City Council|
|        City Council|
|        City Council|
|       

In [119]:
# join the df and dept dataframe using 'dept_division' as common key
# drop columns as needed (keep standardized_dept_name)
# convert dept_subject_to_SLA to boolean

df = (
    df
    # left join on dept_division
    .join(dept, "dept_division", "left")
    # drop all the columns except for standardized name, as it has much fewer unique values
    .drop(dept.dept_division)
    .drop(dept.dept_name)
    .withColumnRenamed("standardized_dept_name", "department")
    # convert to a boolean
    .withColumn("dept_subject_to_SLA", col("dept_subject_to_SLA") == "YES")
)

AnalysisException: Reference 'dept_subject_to_SLA' is ambiguous, could be: dept_subject_to_SLA, dept_subject_to_SLA.

In [117]:
df.show(1)

Py4JJavaError: An error occurred while calling o600.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 89.0 failed 1 times, most recent failure: Lost task 0.0 in stage 89.0 (TID 132) (10.10.2.146 executor driver): java.util.IllegalFormatConversionException: d != org.apache.spark.unsafe.types.UTF8String
	at java.base/java.util.Formatter$FormatSpecifier.failConversion(Formatter.java:4426)
	at java.base/java.util.Formatter$FormatSpecifier.printInteger(Formatter.java:2938)
	at java.base/java.util.Formatter$FormatSpecifier.print(Formatter.java:2892)
	at java.base/java.util.Formatter.format(Formatter.java:2673)
	at java.base/java.util.Formatter.format(Formatter.java:2609)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:349)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:476)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:429)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:338)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:366)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:338)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3715)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2728)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2728)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2935)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:287)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:326)
	at jdk.internal.reflect.GeneratedMethodAccessor76.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.util.IllegalFormatConversionException: d != org.apache.spark.unsafe.types.UTF8String
	at java.base/java.util.Formatter$FormatSpecifier.failConversion(Formatter.java:4426)
	at java.base/java.util.Formatter$FormatSpecifier.printInteger(Formatter.java:2938)
	at java.base/java.util.Formatter$FormatSpecifier.print(Formatter.java:2892)
	at java.base/java.util.Formatter.format(Formatter.java:2673)
	at java.base/java.util.Formatter.format(Formatter.java:2609)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:349)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more
