In [1]:
import pyspark
import pyspark.sql.functions as F

spark = pyspark.sql.SparkSession.builder.getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/20 13:41:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Data Wrangling Exercises

## Part 1

### 1

Read the case, department, and source data into their own spark dataframes.

In [2]:
# Read all three .csv files into dataframes

case = spark.read.csv("case.csv", sep=",", header=True, inferSchema=True)
department = spark.read.csv("dept.csv", sep=",", header=True, inferSchema=True)
source = spark.read.csv("source.csv", sep=",", header=True, inferSchema=True)

                                                                                

### 2

Let's see how writing to the local disk works in spark:

- Write the code necessary to store the source data in both csv and json format, store these as sources_csv and sources_json
- Inspect your folder structure. What do you notice?

In [3]:
# Write the source dataframe into json and csv formats

source.write.json('source_json', mode = 'overwrite')
source.write.csv('source_csv', mode = 'overwrite')

The files are stored in partitions inside a directory by the name given to the write methods. There is also a _SUCCESS file.

### 3

Inspect the data in your dataframes. Are the data types appropriate? Write the code necessary to cast the values to the appropriate types.

In [4]:
# Let's see the case dataframe

case.printSchema()

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: string (nullable = true)
 |-- case_closed_date: string (nullable = true)
 |-- SLA_due_date: string (nullable = true)
 |-- case_late: string (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: string (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: integer (nullable = true)



In [5]:
case.show(1, vertical = True, truncate = False)

-RECORD 0----------------------------------------------------
 case_id              | 1014127332                           
 case_opened_date     | 1/1/18 0:42                          
 case_closed_date     | 1/1/18 12:29                         
 SLA_due_date         | 9/26/20 0:42                         
 case_late            | NO                                   
 num_days_late        | -998.5087616000001                   
 case_closed          | YES                                  
 dept_division        | Field Operations                     
 service_request_type | Stray Animal                         
 SLA_days             | 999.0                                
 case_status          | Closed                               
 source_id            | svcCRMLS                             
 request_address      | 2315  EL PASO ST, San Antonio, 78207 
 council_district     | 5                                    
only showing top 1 row



The case_opened_date, case_closed_date, and SLA_due_date columns should be datetime types. The case_late and case_closed columns can be cast to boolean types.

In [6]:
fmt = "M/d/yy H:mm"
case = (
    case.withColumn("case_opened_date", F.to_timestamp("case_opened_date", fmt))
    .withColumn("case_closed_date", F.to_timestamp("case_closed_date", fmt))
    .withColumn("SLA_due_date", F.to_timestamp("SLA_due_date", fmt))
)

case = (
    case.withColumn("case_closed", F.expr('case_closed == "YES"'))
    .withColumn("case_late", F.expr('case_late == "YES"'))
)

In [7]:
# Let's see the department dataframe

department.printSchema()

root
 |-- dept_division: string (nullable = true)
 |-- dept_name: string (nullable = true)
 |-- standardized_dept_name: string (nullable = true)
 |-- dept_subject_to_SLA: string (nullable = true)



In [8]:
department.show(1, vertical = True, truncate = False)

-RECORD 0----------------------------------
 dept_division          | 311 Call Center  
 dept_name              | Customer Service 
 standardized_dept_name | Customer Service 
 dept_subject_to_SLA    | YES              
only showing top 1 row



In [9]:
department.select('dept_subject_to_SLA').distinct().show()

[Stage 12:>                                                         (0 + 1) / 1]

+-------------------+
|dept_subject_to_SLA|
+-------------------+
|                YES|
|                 NO|
+-------------------+



                                                                                

The dept_subject_to_SLA column can be a boolean type.

In [10]:
department = department.withColumn('dept_subject_to_SLA', F.expr('dept_subject_to_SLA == "YES"'))

In [11]:
# Finally let's see the source dataframe

source.printSchema()

root
 |-- source_id: string (nullable = true)
 |-- source_username: string (nullable = true)



In [12]:
source.show(1, vertical = True, truncate = False)

-RECORD 0---------------------------
 source_id       | 100137           
 source_username | Merlene Blodgett 
only showing top 1 row



The source_id column can probably be cast to an int type, but I don't think it is necessary.

## Part 2

### 1

How old is the latest (in terms of days past SLA) currently open issue?

In [13]:
# Let's try this with SQL
case.createOrReplaceTempView('case')

In [14]:
spark.sql('''
SELECT
    num_days_late
FROM case
WHERE case_closed = false
    AND case_late = true
ORDER BY num_days_late DESC
LIMIT 1;
''').show(vertical = True, truncate = False)

[Stage 14:>                                                         (0 + 4) / 4]

-RECORD 0--------------------
 num_days_late | 348.6458333 



                                                                                

How long has the oldest (in terms of days since opened) currently opened issue been open?

In [59]:
spark.sql('''
SELECT
    DATEDIFF(NOW(), case_opened_date) AS days_open
FROM case
WHERE case_closed = false
ORDER BY days_open DESC
LIMIT 1;
''').show(vertical = True)



-RECORD 0---------
 days_open | 1965 



                                                                                

### 2

How many Stray Animal cases are there?

In [17]:
spark.sql('''
SELECT
    COUNT(*)
FROM case
WHERE service_request_type = 'Stray Animal';
''').show(vertical = True)

[Stage 21:>                                                         (0 + 4) / 4]

-RECORD 0---------
 count(1) | 26760 



                                                                                

### 3

How many service requests that are assigned to the Field Operations department (dept_division) are not classified as "Officer Standby" request type (service_request_type)?

In [20]:
spark.sql('''
SELECT
    COUNT(*)
FROM case
WHERE dept_division = 'Field Operations'
    AND service_request_type != 'Officer Standby';
''').show(vertical = True)



-RECORD 0----------
 count(1) | 113902 



                                                                                

### 4

Convert the council_district column to a string column.

In [22]:
case = case.withColumn('council_district', F.col('council_district').cast('string'))
case.printSchema()

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: timestamp (nullable = true)
 |-- case_closed_date: timestamp (nullable = true)
 |-- SLA_due_date: timestamp (nullable = true)
 |-- case_late: boolean (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: boolean (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: string (nullable = true)



### 5

Extract the year from the case_closed_date column.

In [24]:
case.select(F.year('case_closed_date')).show(5)

+----------------------+
|year(case_closed_date)|
+----------------------+
|                  2018|
|                  2018|
|                  2018|
|                  2018|
|                  2018|
+----------------------+
only showing top 5 rows



### 6

Convert num_days_late from days to hours in new columns num_hours_late.

In [25]:
case = case.withColumn('num_hours_late', F.expr('num_days_late * 24'))
case.show(1, vertical = True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 SLA_due_date         | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
 num_hours_late       | -23964.2102784       
only showing top 1 row



In [26]:
case.select('num_days_late', 'num_hours_late').show(5)

+-------------------+-------------------+
|      num_days_late|     num_hours_late|
+-------------------+-------------------+
| -998.5087616000001|     -23964.2102784|
|-2.0126041669999997|-48.302500007999996|
|       -3.022337963|      -72.536111112|
|       -15.01148148|      -360.27555552|
|0.37216435200000003|  8.931944448000001|
+-------------------+-------------------+
only showing top 5 rows



### 7

Join the case data with the source and department data.

In [27]:
# Let's first see the schema for each dataframe so we can know how to join them all

case.printSchema()

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: timestamp (nullable = true)
 |-- case_closed_date: timestamp (nullable = true)
 |-- SLA_due_date: timestamp (nullable = true)
 |-- case_late: boolean (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: boolean (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: string (nullable = true)
 |-- num_hours_late: double (nullable = true)



In [28]:
department.printSchema()

root
 |-- dept_division: string (nullable = true)
 |-- dept_name: string (nullable = true)
 |-- standardized_dept_name: string (nullable = true)
 |-- dept_subject_to_SLA: boolean (nullable = true)



In [29]:
source.printSchema()

root
 |-- source_id: string (nullable = true)
 |-- source_username: string (nullable = true)



The department dataframe can be joined on dept_division and the source dataframe can be joined on source_id.

In [33]:
df = (
    case.join(department, 'dept_division')
    .join(source, 'source_id')
    .drop(department.dept_division)
    .drop(source.source_id)
)
df.printSchema()

root
 |-- source_id: string (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: timestamp (nullable = true)
 |-- case_closed_date: timestamp (nullable = true)
 |-- SLA_due_date: timestamp (nullable = true)
 |-- case_late: boolean (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: boolean (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: string (nullable = true)
 |-- num_hours_late: double (nullable = true)
 |-- dept_name: string (nullable = true)
 |-- standardized_dept_name: string (nullable = true)
 |-- dept_subject_to_SLA: boolean (nullable = true)
 |-- source_username: string (nullable = true)



### 8

Are there any cases that do not have a request source?

In [46]:
# I think this is how to get the answer
df.where(df.source_username.isNull()).count()

0

### 9

What are the top 10 service request types in terms of number of requests?

In [52]:
spark.sql('''
SELECT
    service_request_type,
    COUNT(*) AS number_of_requests
FROM case
GROUP BY service_request_type
ORDER BY number_of_requests DESC
LIMIT 10;
''').show()

[Stage 87:>                                                         (0 + 4) / 4]

+--------------------+------------------+
|service_request_type|number_of_requests|
+--------------------+------------------+
|           No Pickup|             86855|
|Overgrown Yard/Trash|             65895|
|        Bandit Signs|             32910|
|        Damaged Cart|             30338|
|Front Or Side Yar...|             28794|
|        Stray Animal|             26760|
|Aggressive Animal...|             24882|
|Cart Exchange Req...|             22024|
|Junk Vehicle On P...|             21473|
|     Pot Hole Repair|             20616|
+--------------------+------------------+



                                                                                

### 10

What are the top 10 service request types in terms of average days late?

In [53]:
spark.sql('''
SELECT
    service_request_type,
    AVG(num_days_late) AS avg_days_late
FROM case
GROUP BY service_request_type
ORDER BY avg_days_late DESC
LIMIT 10;
''').show()



+--------------------+------------------+
|service_request_type|     avg_days_late|
+--------------------+------------------+
|  Zoning: Junk Yards| 175.9563621042095|
|Labeling for Used...|162.43032902285717|
|Record Keeping of...|153.99724039428568|
|Signage Requied f...|151.63868055333333|
|Storage of Used M...|     142.112556415|
|Zoning: Recycle Yard|135.92851612479797|
|Donation Containe...|131.75610506358706|
|License Requied U...|128.79828704142858|
|Traffic Signal Gr...| 77.90021217000002|
|           Complaint| 72.51790932659713|
+--------------------+------------------+



                                                                                

### 11

Does number of days late depend on department?

In [63]:
(
    df.groupBy('dept_name')
    .agg(F.mean('num_days_late').alias('avg_days_late'))
    .sort('avg_days_late', ascending = False)
    .show()
)



+--------------------+-------------------+
|           dept_name|      avg_days_late|
+--------------------+-------------------+
|                null| 135.92851612479797|
|    Customer Service| 59.737091496300785|
|Development Services|  13.43372455586971|
|Solid Waste Manag...|-2.2000575136721747|
|        Metro Health| -4.911766979607002|
|Parks and Recreation| -5.251521960055133|
|Trans & Cap Impro...|-20.612837354052626|
|Code Enforcement ...|-38.701330683295375|
|Animal Care Services|-226.51783940550382|
|        City Council|               null|
+--------------------+-------------------+



                                                                                

It seems like days late does depend on department. Some departments are on average late and others are on average early.

### 12

How do number of days late depend on department and request type?

In [64]:
(
    df.groupBy('dept_name', 'service_request_type')
    .agg(F.mean('num_days_late').alias('avg_days_late'))
    .sort('avg_days_late', ascending = False)
    .show()
)



+--------------------+--------------------+------------------+
|           dept_name|service_request_type|     avg_days_late|
+--------------------+--------------------+------------------+
|Code Enforcement ...|  Zoning: Junk Yards| 175.9563621042095|
|Code Enforcement ...|Labeling for Used...|162.43032902285717|
|Code Enforcement ...|Record Keeping of...|153.99724039428568|
|Code Enforcement ...|Signage Requied f...|151.63868055333333|
|Code Enforcement ...|Storage of Used M...|     142.112556415|
|                null|Zoning: Recycle Yard|135.92851612479797|
|Code Enforcement ...|Donation Containe...|131.75610506358706|
|Code Enforcement ...|License Requied U...|128.79828704142858|
|Trans & Cap Impro...|Traffic Signal Gr...|101.79846062200002|
|    Customer Service|           Complaint| 72.87050230311695|
|Code Enforcement ...|             Vendors|   66.548098985078|
|Parks and Recreation|Reservation Assis...|       66.03116319|
|Code Enforcement ...|   No Address Posted|59.875640733

                                                                                