# Spark Wrangle Exercises
This exercises uses the `case.csv`, `dept.csv`, and `source.csv` files from the san antonio 311 call dataset.<br>
>A. Read the `case`, `department`, and `source` data into their own spark dataframes.<br>
B. Write the code necessary to store the source data in both __csv__ and __json__ format, store these as `sources_csv` and `sources_json`.<br>
C. Inspect your _folder_ structure. What do you notice?<br>
D. Inspect the data in your dataframes. Are the data types appropriate? Write the code necessary to cast the values to the appropriate types.<br>

In [1]:
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StructField, StringType, StringType

In [2]:
# Create a spark session object to build spark dataframes.
spark = SparkSession.builder.getOrCreate()

In [3]:
# 1. Read the case, dept, and source datasets into spark dataframes.
df_case = spark.read.csv('data/case.csv', header=True, sep=',', inferSchema=True)
df_dept = spark.read.csv('data/dept.csv', header=True, sep=',', inferSchema=True)
df_source = spark.read.csv('data/source.csv', header=True, sep=',', inferSchema=True)

## case.csv

In [4]:
# The dimensions of the case dataset
df_case.count(), len(df_case.columns)

(841704, 14)

In [5]:
# Efficient way to show the number of non-null values in a spark dataframe
df_case.summary().show(1, truncate=False, vertical=True)

-RECORD 0----------------------
 summary              | count  
 case_id              | 841704 
 case_opened_date     | 841704 
 case_closed_date     | 823594 
 SLA_due_date         | 841671 
 case_late            | 841704 
 num_days_late        | 841671 
 case_closed          | 841704 
 dept_division        | 841704 
 service_request_type | 841704 
 SLA_days             | 841671 
 case_status          | 841704 
 source_id            | 841704 
 request_address      | 841704 
 council_district     | 841704 
only showing top 1 row



In [6]:
# Display the column names
df_case.dtypes

[('case_id', 'int'),
 ('case_opened_date', 'string'),
 ('case_closed_date', 'string'),
 ('SLA_due_date', 'string'),
 ('case_late', 'string'),
 ('num_days_late', 'double'),
 ('case_closed', 'string'),
 ('dept_division', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'double'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'int')]

In [7]:
# Display a single record from the case dataset to understand what an obersation represents.
# The `case` dataset has many columns that wrap around when displayed.
# Pass: truncate=False, vertical=True to display the full values of each column.
df_case.show(2, truncate=False, vertical=True)

-RECORD 0----------------------------------------------------
 case_id              | 1014127332                           
 case_opened_date     | 1/1/18 0:42                          
 case_closed_date     | 1/1/18 12:29                         
 SLA_due_date         | 9/26/20 0:42                         
 case_late            | NO                                   
 num_days_late        | -998.5087616000001                   
 case_closed          | YES                                  
 dept_division        | Field Operations                     
 service_request_type | Stray Animal                         
 SLA_days             | 999.0                                
 case_status          | Closed                               
 source_id            | svcCRMLS                             
 request_address      | 2315  EL PASO ST, San Antonio, 78207 
 council_district     | 5                                    
-RECORD 1----------------------------------------------------
 case_id

### `case` dataset: Record 0 takeaway
Each row represents a 311 call.
- Unique case_id
- Case opened and closed datetimes.
    - Date and time represents the total time it took the department to complete the case: Paper work, travel, tasks, etc.
- The SLA due date is set 999 days from the case open date.
- This type of case is late if it exceeds 999 days.
- The number of days late is a negative number starting at -999, representing the case open date
    - The fraction missing means that the case was opened and closed within half of a day.
- Department division identifies the city department assigned to the case.
- Service type request indentifies the type of request issued to the department.
- SLA days is the maximum number of days this case can be opened.
- Case status identifies if a case is opened or closed.
- Source id may represent the division/personnel generating the request.
- Request address is the location requesting 311 service.
- Council district represents the district a location resides in.
    - Council representatives can find common 311 requests among their constituents.

## dept.csv

In [8]:
# The dimensions of the dept dataset
df_dept.count(), len(df_dept.columns)

(39, 4)

In [9]:
# Display the column names and datatypes
# Column data types are correct
df_dept.dtypes

[('dept_division', 'string'),
 ('dept_name', 'string'),
 ('standardized_dept_name', 'string'),
 ('dept_subject_to_SLA', 'string')]

## source.csv

In [10]:
# The dimensions of the source dataset
df_source.count(), len(df_source.columns)

(140, 2)

In [11]:
# Display the column names and datatypes
# Column data types are correct
df_source.dtypes

[('source_id', 'string'), ('source_username', 'string')]

Save the source dataset in csv and JSON format.
```python
df_source.write.json('data/source_json', mode='overwrite')
df_source.write.csv('data/source_csv', mode='overwrite')
```
This saves each sparkdataframe as a folder with 2 files:
- _SUCCESS
- A partition csv file.

# Data Preparation

In [12]:
df_case.printSchema()

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: string (nullable = true)
 |-- case_closed_date: string (nullable = true)
 |-- SLA_due_date: string (nullable = true)
 |-- case_late: string (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: string (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: integer (nullable = true)



In [13]:
# Rename technical jargon into laymans terms...
df_case = df_case.withColumnRenamed("SLA_due_date", "case_due_date")

In [14]:
# Create a variable to store a datetime string format
dt_format = 'M/d/yy H:mm'

# Cast each date column from string datatypes into datetime datatypes
df_case = (
    df_case.withColumn('case_opened_date', to_timestamp('case_opened_date', dt_format))
           .withColumn('case_closed_date', to_timestamp('case_closed_date', dt_format))
           .withColumn('case_due_date', to_timestamp('case_due_date', dt_format))
)
                       

In [15]:
# Cast columns with binary outcomes from string datatype to boolean datatype.
df_case = (
    df_case.withColumn('case_closed', when(col('case_closed') == 'Open', True).otherwise(False))
           .withColumn('case_late', when(col('case_late') == 'YES', True).otherwise(False))
)

In [16]:
# Transform the negative values to 0. This means that the case wasn't late.
# If there is a non-zero value in this column, it means that the case was late x number of days.
df_case = (
    df_case.withColumn('num_days_late', when(col('num_days_late') < 0, 0).otherwise(col('num_days_late')))
)

In [17]:
# Create new columns to measure the status of open v. closed requests.
(
df_case
.withColumn("case_age", datediff(current_timestamp(), "case_opened_date"))
.withColumn("days_to_closed", datediff("case_closed_date", "case_opened_date"))
.withColumn("case_lifetime",
            when(col('case_closed') == False, col("case_age"))
            .otherwise(col("days_to_closed"))).show(2, truncate=False, vertical=True)
)

-RECORD 0----------------------------------------------------
 case_id              | 1014127332                           
 case_opened_date     | 2018-01-01 00:42:00                  
 case_closed_date     | 2018-01-01 12:29:00                  
 case_due_date        | 2020-09-26 00:42:00                  
 case_late            | false                                
 num_days_late        | 0.0                                  
 case_closed          | false                                
 dept_division        | Field Operations                     
 service_request_type | Stray Animal                         
 SLA_days             | 999.0                                
 case_status          | Closed                               
 source_id            | svcCRMLS                             
 request_address      | 2315  EL PASO ST, San Antonio, 78207 
 council_district     | 5                                    
 case_age             | 1065                                 
 days_to

In [18]:
# Normalize string datatype columns
df_case = (
    df_case.withColumn('request_address', lower(trim('request_address')))
)

In [19]:
df_case.show(1, truncate=False, vertical=True)

-RECORD 0----------------------------------------------------
 case_id              | 1014127332                           
 case_opened_date     | 2018-01-01 00:42:00                  
 case_closed_date     | 2018-01-01 12:29:00                  
 case_due_date        | 2020-09-26 00:42:00                  
 case_late            | false                                
 num_days_late        | 0.0                                  
 case_closed          | false                                
 dept_division        | Field Operations                     
 service_request_type | Stray Animal                         
 SLA_days             | 999.0                                
 case_status          | Closed                               
 source_id            | svcCRMLS                             
 request_address      | 2315  el paso st, san antonio, 78207 
 council_district     | 5                                    
only showing top 1 row



## 1. How old is the latest (in terms of days past SLA) currently open issue? How long has the oldest (in terms of days since opened) currently opened issue been open?<br>
> This question refers to the `case` dataset.

In [20]:
(
df_case
.select(datediff('current_timestamp', 'case_due_date').alias('days_past_due'))
.filter(col('case_closed') == False)
.sort(col('days_past_due').desc())
.show(10)
)

+-------------+
|days_past_due|
+-------------+
|         1430|
|         1430|
|         1430|
|         1430|
|         1430|
|         1430|
|         1430|
|         1430|
|         1430|
|         1430|
+-------------+
only showing top 10 rows



## 2. How many Stray Animal cases are there?

In [21]:
df_case.filter(col('service_request_type') == 'Stray Animal').count()

26760

## 3. How many service requests that are assigned to the Field Operations department (`dept_division`) are not classified as "Officer Standby" request type (`service_request_type`)?

In [22]:
(
df_case
.filter(col('dept_division') == 'Field Operations')
.filter(col('service_request_type') != 'Officer Standby').count()
)

113902

## 4. Convert the `council_district` column to a string column.

In [23]:
df_case = df_case.withColumn('council_district', col('council_district').cast('string'))

## 5. Extract the year from the `case_closed_date` column.

In [24]:
df_case = df_case.withColumn('year', year('case_closed_date'))

## 6. Convert `num_days_late` from days to hours in new columns `num_hours_late`.

In [25]:
df_case = df_case.withColumn('num_hours_late', col('num_days_late') * 24)

## 7. Join the case data with the source and department data.

In [26]:
df_source.show(1, truncate=False, vertical=True)

-RECORD 0---------------------------
 source_id       | 100137           
 source_username | Merlene Blodgett 
only showing top 1 row



In [27]:
df_dept.show(1, truncate=False, vertical=True)

-RECORD 0----------------------------------
 dept_division          | 311 Call Center  
 dept_name              | Customer Service 
 standardized_dept_name | Customer Service 
 dept_subject_to_SLA    | YES              
only showing top 1 row



In [28]:
df = (
    df_case
    .join(df_dept, 'dept_division', 'left')
    .join(df_source, 'source_id', 'left')
)

In [29]:
df.show(1, truncate=False, vertical=True)

-RECORD 0------------------------------------------------------
 source_id              | svcCRMLS                             
 dept_division          | Field Operations                     
 case_id                | 1014127332                           
 case_opened_date       | 2018-01-01 00:42:00                  
 case_closed_date       | 2018-01-01 12:29:00                  
 case_due_date          | 2020-09-26 00:42:00                  
 case_late              | false                                
 num_days_late          | 0.0                                  
 case_closed            | false                                
 service_request_type   | Stray Animal                         
 SLA_days               | 999.0                                
 case_status            | Closed                               
 request_address        | 2315  el paso st, san antonio, 78207 
 council_district       | 5                                    
 year                   | 2018          

## 8. Are there any cases that do not have a request source?

In [30]:
df.filter(col('source_id').isNull()).show(vertical=True)

(0 rows)



## 9. What are the top 10 service request types in terms of number of requests?

In [31]:
df.groupby('service_request_type').count().sort(desc('count')).show(10, truncate=False)

+--------------------------------+-----+
|service_request_type            |count|
+--------------------------------+-----+
|No Pickup                       |89210|
|Overgrown Yard/Trash            |66403|
|Bandit Signs                    |32968|
|Damaged Cart                    |31163|
|Front Or Side Yard Parking      |28920|
|Stray Animal                    |27361|
|Aggressive Animal(Non-Critical) |25492|
|Cart Exchange Request           |22608|
|Junk Vehicle On Private Property|21649|
|Pot Hole Repair                 |20827|
+--------------------------------+-----+
only showing top 10 rows



## 10. What are the top 10 service request types in terms of average days late?

In [32]:
(
df
.where(col('case_late') == True)
.groupBy('service_request_type')
.agg(round(mean('num_days_late'), 2).alias('n_days_late'),
     count('*').alias('n_cases'),
    )
.sort(desc('n_days_late'))
.show(10, truncate=False)
)

+--------------------------------------+-----------+-------+
|service_request_type                  |n_days_late|n_cases|
+--------------------------------------+-----------+-------+
|Zoning: Recycle Yard                  |210.89     |132    |
|Zoning: Junk Yards                    |200.21     |262    |
|Structure/Housing Maintenance         |190.21     |51     |
|Donation Container Enforcement        |171.09     |122    |
|Storage of Used Mattress              |163.97     |7      |
|Labeling for Used Mattress            |162.43     |7      |
|Record Keeping of Used Mattresses     |154.0      |7      |
|Signage Requied for Sale of Used Mattr|151.64     |12     |
|Traffic Signal Graffiti               |137.65     |4      |
|License Requied Used Mattress Sales   |128.8      |7      |
+--------------------------------------+-----------+-------+
only showing top 10 rows



## 11. Does number of days late depend on department?

In [33]:
(
df
.filter('case_late')
.groupby('dept_name')
.agg(avg('num_days_late').alias('avg_days_late'),
     count('num_days_late').alias('n_cases_late'),
    )
.sort(desc('avg_days_late'))
.withColumn('avg_days_late', round(col('avg_days_late'), 1))
.show(truncate=False)
)

+-------------------------+-------------+------------+
|dept_name                |avg_days_late|n_cases_late|
+-------------------------+-------------+------------+
|null                     |210.9        |132         |
|Customer Service         |88.2         |2035        |
|Development Services     |67.2         |840         |
|Code Enforcement Services|48.1         |25467       |
|Animal Care Services     |23.4         |23751       |
|Parks and Recreation     |22.4         |3810        |
|Trans & Cap Improvements |10.7         |5529        |
|Solid Waste Management   |7.1          |33729       |
|Metro Health             |6.5          |854         |
+-------------------------+-------------+------------+



## 12. How do number of days late depend on department and request type?