In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from datetime import datetime 

# Import pandas to help with readability of answers
import pandas as pd

# Build pyspark session and turn off warnings
spark = SparkSession.builder.getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/11/09 08:24:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
%%html
<style>
  table {margin-left: 0 !important;}
</style>

## This exercise uses the `case.csv`, `dept.csv`, and `source.csv` files from the san antonio 311 call dataset.

### 1.Read the case, department, and source data into their own spark dataframes.

In [3]:
# import the csv files
case = spark.read.csv('case.csv', sep=",", header=True, inferSchema=True)
dept = spark.read.csv('dept.csv', sep=",", header=True, inferSchema=True)
source = spark.read.csv('source.csv', sep=",", header=True, inferSchema=True)

                                                                                

### 2. Let's see how writing to the local disk works in spark:
> * Write the code necessary to store the source data in both csv and json format, store these as `sources_csv` and `sources_json`
> * Inspect your folder structure. What do you notice?
    * I notice that the files are stored within a directory and not in a typical json or csv format.

In [4]:
# Save the data as sources_csv.csv and ignore if the file already exists
source.write.csv('sources_csv.csv', mode='ignore')
source.write.json('sources_json.json', mode='ignore')

### 3. Inspect the data in your dataframes. Are the data types appropriate? Write the code necessary to cast the values to the appropriate types.



In [5]:
# Dates will need to be formatted, creating a UDF will be an option to do this
date_convert_udf = udf(lambda date: datetime.strptime(date, "%m/%d/%y %H:%M"), TimestampType())

# Case csv file datatype conversions 
case = (
    case.
    withColumn('case_opened_date',
               date_convert_udf(col('case_opened_date'))).
    withColumn('case_closed_date',
               date_convert_udf(col('case_closed_date'))).
    withColumn('SLA_due_date',
               date_convert_udf(col('SLA_due_date'))).
    withColumn('case_closed',
               expr("case_closed == 'YES'")).
    withColumn('case_late',
               expr("case_late == 'YES'")).
    withColumn('num_days_late',
               expr("num_days_late * -1").cast('integer')).
    withColumn('SLA_days',
               col('SLA_days').cast('integer')).
    withColumn('case_id',
               col('case_id').cast('string'))
).drop(col('case_status'))

# Create case table alias 
case_table = case.alias('case_table')

# Department csv file datatype conversions
dept = (
    dept.
    withColumn('dept_subject_to_SLA',
               expr("dept_subject_to_SLA == 'YES'").cast('boolean'))
)

# Create dept table alias 
dept_table = dept.alias('dept_table')

# Create source table alias
source_table = source.alias('source_table')

case.show(1, False, True)
dept.show(1, False, True)
source.show(1, False, True)

                                                                                

-RECORD 0----------------------------------------------------
 case_id              | 1014127332                           
 case_opened_date     | 2018-01-01 00:42:00                  
 case_closed_date     | 2018-01-01 12:29:00                  
 SLA_due_date         | 2020-09-26 00:42:00                  
 case_late            | false                                
 num_days_late        | 998                                  
 case_closed          | true                                 
 dept_division        | Field Operations                     
 service_request_type | Stray Animal                         
 SLA_days             | 999                                  
 source_id            | svcCRMLS                             
 request_address      | 2315  EL PASO ST, San Antonio, 78207 
 council_district     | 5                                    
only showing top 1 row

-RECORD 0----------------------------------
 dept_division          | 311 Call Center  
 dept_name          

### Look at the data to verify what joins need to be performed
> * `left_join` on `case` with `dept` on `dept_division` and with `source` on `source_id`

In [6]:
case = (
    case_table.join(   # Join the case & dept on dept_division
        dept_table, ['dept_division']).
    join(              # Join the case & source on source_id
        source_table, ['source_id'])
)

# Create and replace case as df_table 
case.createOrReplaceTempView('df_table')

case.show(1, False, True)

-RECORD 0------------------------------------------------------
 source_id              | svcCRMLS                             
 dept_division          | Field Operations                     
 case_id                | 1014127332                           
 case_opened_date       | 2018-01-01 00:42:00                  
 case_closed_date       | 2018-01-01 12:29:00                  
 SLA_due_date           | 2020-09-26 00:42:00                  
 case_late              | false                                
 num_days_late          | 998                                  
 case_closed            | true                                 
 service_request_type   | Stray Animal                         
 SLA_days               | 999                                  
 request_address        | 2315  EL PASO ST, San Antonio, 78207 
 council_district       | 5                                    
 dept_name              | Animal Care Services                 
 standardized_dept_name | Animal Care Se


## 1a. How old is the latest (in terms of days past SLA) currently open issue? 

|    |    case_id | dept_division   |   SLA_oldest_currently_open_issue |
|---:|-----------:|:----------------|----------------------------------:|
|  0 | 1014128388 | 311 Call Center |                              1419 |

In [7]:
print((
    spark.sql(
        '''
        SELECT 
        first(case_id) as case_id,
        first(dept_division) as dept_division,
        (MAX(SLA_days)) as SLA_oldest_currently_open_issue
        FROM df_table
        WHERE case_closed is False
        '''
    )).toPandas().to_markdown())

[Stage 14:>                                                       (0 + 12) / 12]

|    |    case_id | dept_division   |   SLA_oldest_currently_open_issue |
|---:|-----------:|:----------------|----------------------------------:|
|  0 | 1014128388 | 311 Call Center |                              1419 |


                                                                                

## 1b. How long has the oldest (in terms of days since opened) currently opened issue been open?

|    case_id | dept_division   |   oldest_currently_open_issue |
|-----------:|:----------------|------------------------------:|
| 1014128388 | 311 Call Center |                          1387 |

In [8]:
print((
    spark.sql(
        '''
        SELECT 
        first(case_id) as case_id,
        first(dept_division) as dept_division,
        (MAX(num_days_late)) as oldest_currently_open_issue
        FROM df_table
        WHERE case_closed is False
        '''
    )).toPandas().to_markdown())

|    |    case_id | dept_division   |   oldest_currently_open_issue |
|---:|-----------:|:----------------|------------------------------:|
|  0 | 1014128388 | 311 Call Center |                          1387 |


## 2. How many Stray Animal cases are there?

|    | service_request_type   |   count |
|---:|:-----------------------|--------:|
|  0 | Stray Animal           |   27361 |

In [9]:
print((
    case.groupBy('service_request_type').count().
    filter(case.service_request_type == 'Stray Animal')
).toPandas().to_markdown())

|    | service_request_type   |   count |
|---:|:-----------------------|--------:|
|  0 | Stray Animal           |   27361 |


## 3. How many service requests that are assigned to the Field Operations department (`dept_division`) are not classified as "Officer Standby" request type (`service_request_type`)?

|    | dept_division    |   count |
|---:|:-----------------|--------:|
|  0 | Field Operations |  116295 |

In [12]:
# First need to filter by the two options, then we can groupby and get counts
print((
    case.filter(
        (case.dept_division == 'Field Operations') 
        & (case.service_request_type != 'Officer Standby')).
    groupBy('dept_division').
    count()
).toPandas().to_markdown())

|    | dept_division    |   count |
|---:|:-----------------|--------:|
|  0 | Field Operations |  116295 |
