In [1]:
import pyspark

from pyspark.sql.functions import *

In [2]:
spark = pyspark.sql.SparkSession.builder.getOrCreate()
spark

This exercises uses the case.csv, dept.csv, and source.csv files from the san antonio 311 call dataset.

1. Read the case, department, and source data into their own spark dataframes.

In [3]:
case = spark.read.csv('case.csv', sep=",", header=True, inferSchema=True)

In [4]:
case

DataFrame[case_id: int, case_opened_date: string, case_closed_date: string, SLA_due_date: string, case_late: string, num_days_late: double, case_closed: string, dept_division: string, service_request_type: string, SLA_days: double, case_status: string, source_id: string, request_address: string, council_district: int]

In [5]:
case.show(vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 SLA_due_date         | 9/26/20 0:42         
 case_late            | NO                   
 num_days_late        | -998.5087616000001   
 case_closed          | YES                  
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 1/1/18 0:46          
 case_closed_date     | 1/3/18 8:11          
 SLA_due_date         | 1/5/18 8:30          
 case_late            | NO                   
 num_days_late        | -2.0126041

In [6]:
dept = spark.read.csv('dept.csv', sep=",", header=True, inferSchema=True)

In [7]:
dept

DataFrame[dept_division: string, dept_name: string, standardized_dept_name: string, dept_subject_to_SLA: string]

In [8]:
dept.show()

+--------------------+--------------------+----------------------+-------------------+
|       dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+--------------------+--------------------+----------------------+-------------------+
|     311 Call Center|    Customer Service|      Customer Service|                YES|
|               Brush|Solid Waste Manag...|           Solid Waste|                YES|
|     Clean and Green|Parks and Recreation|    Parks & Recreation|                YES|
|Clean and Green N...|Parks and Recreation|    Parks & Recreation|                YES|
|    Code Enforcement|Code Enforcement ...|  DSD/Code Enforcement|                YES|
|Code Enforcement ...|Code Enforcement ...|  DSD/Code Enforcement|                YES|
|Code Enforcement ...|                null|  DSD/Code Enforcement|                YES|
|   Dangerous Premise|Code Enforcement ...|  DSD/Code Enforcement|                YES|
|Dangerous Premise...|Code Enforcement ...|

In [9]:
source = spark.read.csv('source.csv', sep=",", header=True, inferSchema=True)

In [10]:
source

DataFrame[source_id: string, source_username: string]

In [11]:
source.show()

+---------+--------------------+
|source_id|     source_username|
+---------+--------------------+
|   100137|    Merlene Blodgett|
|   103582|         Carmen Cura|
|   106463|     Richard Sanchez|
|   119403|      Betty De Hoyos|
|   119555|      Socorro Quiara|
|   119868| Michelle San Miguel|
|   120752|      Eva T. Kleiber|
|   124405|           Lori Lara|
|   132408|       Leonard Silva|
|   135723|        Amy Cardenas|
|   136202|    Michelle Urrutia|
|   136979|      Leticia Garcia|
|   137943|    Pamela K. Baccus|
|   138605|        Marisa Ozuna|
|   138650|      Kimberly Green|
|   138650|Kimberly Green-Woods|
|   138793| Guadalupe Rodriguez|
|   138810|       Tawona Martin|
|   139342|     Jessica Mendoza|
|   139344|        Isis Mendoza|
+---------+--------------------+
only showing top 20 rows



2. Let's see how writing to the local disk works in spark:

    - Write the code necessary to store the source data in both csv and json format, store these as sources_csv and sources_json
    - Inspect your folder structure. What do you notice?

In [12]:
source.write.format('json').mode('overwrite').option('header', 'true').save('sources_json')

In [13]:
source.write.format('csv').mode('overwrite').option('header', 'true').save('sources_csv')

In [14]:
# each folder strucer has a success file and then the csv or json file as well

3. Inspect the data in your dataframes. Are the data types appropriate? Write the code necessary to cast the values to the appropriate types.

In [15]:
case.describe().show(vertical=True)

-RECORD 0------------------------------------
 summary              | count                
 case_id              | 841704               
 case_opened_date     | 841704               
 case_closed_date     | 823594               
 SLA_due_date         | 841671               
 case_late            | 841704               
 num_days_late        | 841671               
 case_closed          | 841704               
 dept_division        | 841704               
 service_request_type | 841704               
 SLA_days             | 841671               
 case_status          | 841704               
 source_id            | 841704               
 request_address      | 841704               
 council_district     | 841704               
-RECORD 1------------------------------------
 summary              | mean                 
 case_id              | 1.0139680837676392E9 
 case_opened_date     | null                 
 case_closed_date     | null                 
 SLA_due_date         | null      

In [16]:
case.printSchema()

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: string (nullable = true)
 |-- case_closed_date: string (nullable = true)
 |-- SLA_due_date: string (nullable = true)
 |-- case_late: string (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: string (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: integer (nullable = true)



In [17]:
source.describe().show()

+-------+------------------+---------------+
|summary|         source_id|source_username|
+-------+------------------+---------------+
|  count|               140|            140|
|   mean|136973.92727272728|           null|
| stddev| 10275.63643312297|           null|
|    min|            100137|  Alex Franklin|
|    max|           yh24110|       svcCRMSS|
+-------+------------------+---------------+



In [18]:
source.printSchema()

root
 |-- source_id: string (nullable = true)
 |-- source_username: string (nullable = true)



In [19]:
dept.describe().show()

+-------+----------------+--------------------+----------------------+-------------------+
|summary|   dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+-------+----------------+--------------------+----------------------+-------------------+
|  count|              39|                  38|                    39|                 39|
|   mean|            null|                null|                  null|               null|
| stddev|            null|                null|                  null|               null|
|    min| 311 Call Center|Animal Care Services|  Animal Care Services|                 NO|
|    max|Waste Collection|Trans & Cap Impro...|  Trans & Cap Impro...|                YES|
+-------+----------------+--------------------+----------------------+-------------------+



In [20]:
dept.printSchema()

root
 |-- dept_division: string (nullable = true)
 |-- dept_name: string (nullable = true)
 |-- standardized_dept_name: string (nullable = true)
 |-- dept_subject_to_SLA: string (nullable = true)



In [21]:
# changed column name to be similar to other case date columns
case = case.withColumnRenamed('SLA_due_date', 'case_due_date')

In [22]:
case.show(1, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 case_due_date        | 9/26/20 0:42         
 case_late            | NO                   
 num_days_late        | -998.5087616000001   
 case_closed          | YES                  
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
only showing top 1 row



In [23]:
# need to change these values into booleans, not strings
case.groupBy('case_late', 'case_closed').count().show()

+---------+-----------+------+
|case_late|case_closed| count|
+---------+-----------+------+
|       NO|        YES|735616|
|      YES|        YES| 87978|
|       NO|         NO| 11585|
|      YES|         NO|  6525|
+---------+-----------+------+



In [24]:
case = case.withColumn("case_closed", expr('case_closed == "YES"')).withColumn(
    "case_late", expr('case_late == "YES"')
)

In [25]:
case.select('case_late', 'case_closed').show(5)
case.select('case_late', 'case_closed').dtypes

+---------+-----------+
|case_late|case_closed|
+---------+-----------+
|    false|       true|
|    false|       true|
|    false|       true|
|    false|       true|
|     true|       true|
+---------+-----------+
only showing top 5 rows



[('case_late', 'boolean'), ('case_closed', 'boolean')]

In [26]:
# need to change this int to a string since it is a unique identifier
case.select('council_district').show(5)

+----------------+
|council_district|
+----------------+
|               5|
|               3|
|               3|
|               3|
|               7|
+----------------+
only showing top 5 rows



In [27]:
case = case.withColumn('council_district', col('council_district').cast("string"))

In [28]:
case.select('council_district').dtypes

[('council_district', 'string')]

In [29]:
# need to use spark datetime function to change to datetime
case.select('case_opened_date', 'case_due_date', 'case_closed_date').show(5)

+----------------+-------------+----------------+
|case_opened_date|case_due_date|case_closed_date|
+----------------+-------------+----------------+
|     1/1/18 0:42| 9/26/20 0:42|    1/1/18 12:29|
|     1/1/18 0:46|  1/5/18 8:30|     1/3/18 8:11|
|     1/1/18 0:48|  1/5/18 8:30|     1/2/18 7:57|
|     1/1/18 1:29| 1/17/18 8:30|     1/2/18 8:13|
|     1/1/18 1:34|  1/1/18 4:34|    1/1/18 13:29|
+----------------+-------------+----------------+
only showing top 5 rows



In [32]:
fmt = "M/d/yy H:mm"
case = (
    case.withColumn("case_opened_date", to_timestamp("case_opened_date", fmt))
    .withColumn("case_closed_date", to_timestamp("case_closed_date", fmt))
    .withColumn("case_due_date", to_timestamp("case_due_date", fmt))
)


In [33]:
case.select('case_opened_date', 'case_due_date', 'case_closed_date').show(5)

+-------------------+-------------------+-------------------+
|   case_opened_date|      case_due_date|   case_closed_date|
+-------------------+-------------------+-------------------+
|2018-01-01 00:42:00|2020-09-26 00:42:00|2018-01-01 12:29:00|
|2018-01-01 00:46:00|2018-01-05 08:30:00|2018-01-03 08:11:00|
|2018-01-01 00:48:00|2018-01-05 08:30:00|2018-01-02 07:57:00|
|2018-01-01 01:29:00|2018-01-17 08:30:00|2018-01-02 08:13:00|
|2018-01-01 01:34:00|2018-01-01 04:34:00|2018-01-01 13:29:00|
+-------------------+-------------------+-------------------+
only showing top 5 rows



1. How old is the latest (in terms of days past SLA) currently open issue? How long has the oldest (in terms of days since opened) currently opened issue been open?

2. How many Stray Animal cases are there?

3. How many service requests that are assigned to the Field Operations department (dept_division) are not classified as "Officer Standby" request type (service_request_type)?

4. Convert the council_district column to a string column.

5. Extract the year from the case_closed_date column.

6. Convert num_days_late from days to hours in new columns num_hours_late.

7. Join the case data with the source and department data.

8. Are there any cases that do not have a request source?

9. What are the top 10 service request types in terms of number of requests?

10. What are the top 10 service request types in terms of average days late?

11. Does number of days late depend on department?

12. How do number of days late depend on department and request type?

You might have noticed that the latest date in the dataset is fairly far off from the present day. To account for this, replace any occurances of the current time with the maximum date from the dataset.