In [1]:
import os
import env as e

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType
import pandas as pd



Read the case, department, and source data into their own spark dataframes.

- Let's see how writing to the local disk works in spark:

    - Write the code necessary to store the source data in both csv and json format, store these as sources_csv and sources_json

    - Inspect your folder structure. What do you notice?

In [2]:
# Generate spark object to initialize a local Spark JVM process
spark = SparkSession.builder.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/05 12:05:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/07/05 12:05:34 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
url = e.get_db_url("311_data")

In [4]:
#case
query = """SELECT * FROM cases"""
cases = pd.read_sql(query, url)
cases = spark.createDataFrame(cases)

cases.write.json("cases_json", mode="overwrite")
cases.write.format("csv").mode("overwrite").option("header", True).save("cases_csv")

cases.show(4)

23/07/05 12:08:27 WARN TaskSetManager: Stage 0 contains a task of very large size (18868 KiB). The maximum recommended task size is 1000 KiB.
23/07/05 12:08:36 WARN TaskSetManager: Stage 1 contains a task of very large size (18868 KiB). The maximum recommended task size is 1000 KiB.
23/07/05 12:08:40 WARN TaskSetManager: Stage 2 contains a task of very large size (18868 KiB). The maximum recommended task size is 1000 KiB.
[Stage 2:>                                                          (0 + 1) / 1]

+----------+----------------+----------------+------------+---------+-------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|   case_id|case_opened_date|case_closed_date|SLA_due_date|case_late|num_days_late|case_closed|   dept_division|service_request_type|   SLA_days|case_status|source_id|     request_address|council_district|
+----------+----------------+----------------+------------+---------+-------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|1014127332|     1/1/18 0:42|    1/1/18 12:29|9/26/20 0:42|       NO| -998.5087616|        YES|Field Operations|        Stray Animal|      999.0|     Closed| svcCRMLS|2315  EL PASO ST,...|               5|
|1014127333|     1/1/18 0:46|     1/3/18 8:11| 1/5/18 8:30|       NO| -2.012604167|        YES|     Storm Water|Removal Of Obstru...|4.322222222|     Closed| svcCRMSS|2215  GOL

23/07/05 12:08:44 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 2 (TID 16): Attempting to kill Python Worker
                                                                                

In [5]:
#department
query = """SELECT * FROM dept"""
dept = pd.read_sql(query, url)
dept = spark.createDataFrame(dept)

dept.write.json("dept_json", mode="overwrite")
dept.write.format("csv").mode("overwrite").option("header", True).save("dept_csv")

dept.show(4)

+--------------------+--------------------+----------------------+-------------------+
|       dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+--------------------+--------------------+----------------------+-------------------+
|     311 Call Center|    Customer Service|      Customer Service|                YES|
|               Brush|Solid Waste Manag...|           Solid Waste|                YES|
|     Clean and Green|Parks and Recreation|    Parks & Recreation|                YES|
|Clean and Green N...|Parks and Recreation|    Parks & Recreation|                YES|
+--------------------+--------------------+----------------------+-------------------+
only showing top 4 rows



In [6]:
#source
query = """SELECT * FROM source"""
source = pd.read_sql(query, url)
source = spark.createDataFrame(source)

source.write.json("source_json", mode="overwrite")
source.write.format("csv").mode("overwrite").option("header", True).save("source_csv")

source.show(4)

+-----+---------+----------------+
|index|source_id| source_username|
+-----+---------+----------------+
|    0|   100137|Merlene Blodgett|
|    1|   103582|     Carmen Cura|
|    2|   106463| Richard Sanchez|
|    3|   119403|  Betty De Hoyos|
+-----+---------+----------------+
only showing top 4 rows



Inspect the data in your dataframes. Are the data types appropriate? Write the code necessary to cast the values to the appropriate types.



In [7]:
cases.schema

StructType([StructField('case_id', LongType(), True), StructField('case_opened_date', StringType(), True), StructField('case_closed_date', StringType(), True), StructField('SLA_due_date', StringType(), True), StructField('case_late', StringType(), True), StructField('num_days_late', DoubleType(), True), StructField('case_closed', StringType(), True), StructField('dept_division', StringType(), True), StructField('service_request_type', StringType(), True), StructField('SLA_days', DoubleType(), True), StructField('case_status', StringType(), True), StructField('source_id', StringType(), True), StructField('request_address', StringType(), True), StructField('council_district', LongType(), True)])

In [8]:
source.schema

StructType([StructField('index', LongType(), True), StructField('source_id', StringType(), True), StructField('source_username', StringType(), True)])

In [9]:
dept.schema

StructType([StructField('dept_division', StringType(), True), StructField('dept_name', StringType(), True), StructField('standardized_dept_name', StringType(), True), StructField('dept_subject_to_SLA', StringType(), True)])

In [16]:
cases = cases.withColumnRenamed("SLA_due_date", "case_due_date")
cases.show(2, vertical=True)

23/07/05 12:17:02 WARN TaskSetManager: Stage 11 contains a task of very large size (18868 KiB). The maximum recommended task size is 1000 KiB.
[Stage 11:>                                                         (0 + 1) / 1]

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 case_due_date        | 9/26/20 0:42         
 case_late            | false                
 num_days_late        | -998.5087616         
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 1/1/18 0:46          
 case_closed_date     | 1/3/18 8:11          
 case_due_date        | 1/5/18 8:30          
 case_late            | false                
 num_days_late        | -2.0126041

23/07/05 12:17:06 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 11 (TID 53): Attempting to kill Python Worker
                                                                                

In [14]:
#Let's create two new boolean columns
cases = cases.withColumn("case_closed", expr('case_closed == "YES"')).withColumn(
        "case_late", expr('case_late == "YES"'))

#Select just the two columns
cases.select("case_closed", "case_late").show()

23/07/05 12:14:25 WARN TaskSetManager: Stage 10 contains a task of very large size (18868 KiB). The maximum recommended task size is 1000 KiB.
[Stage 10:>                                                         (0 + 1) / 1]

+-----------+---------+
|case_closed|case_late|
+-----------+---------+
|       true|    false|
|       true|    false|
|       true|    false|
|       true|    false|
|       true|     true|
|       true|    false|
|       true|    false|
|       true|    false|
|       true|    false|
|       true|    false|
|       true|    false|
|       true|    false|
|       true|    false|
|       true|    false|
|       true|    false|
|       true|    false|
|       true|    false|
|       true|    false|
|       true|    false|
|       true|    false|
+-----------+---------+
only showing top 20 rows



23/07/05 12:14:29 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 10 (TID 52): Attempting to kill Python Worker
                                                                                

In [17]:
fmt = "M/d/yy H:mm"

cases = (
    cases.withColumn("case_opened_date", to_timestamp("case_opened_date", fmt))
    .withColumn("case_closed_date", to_timestamp("case_closed_date", fmt))
    .withColumn("case_due_date", to_timestamp("case_due_date", fmt))
)

cases.select("case_opened_date", "case_closed_date", "case_due_date").show(5)

23/07/05 12:17:10 WARN TaskSetManager: Stage 12 contains a task of very large size (18868 KiB). The maximum recommended task size is 1000 KiB.
[Stage 12:>                                                         (0 + 1) / 1]

+-------------------+-------------------+-------------------+
|   case_opened_date|   case_closed_date|      case_due_date|
+-------------------+-------------------+-------------------+
|2018-01-01 00:42:00|2018-01-01 12:29:00|2020-09-26 00:42:00|
|2018-01-01 00:46:00|2018-01-03 08:11:00|2018-01-05 08:30:00|
|2018-01-01 00:48:00|2018-01-02 07:57:00|2018-01-05 08:30:00|
|2018-01-01 01:29:00|2018-01-02 08:13:00|2018-01-17 08:30:00|
|2018-01-01 01:34:00|2018-01-01 13:29:00|2018-01-01 04:34:00|
+-------------------+-------------------+-------------------+
only showing top 5 rows



23/07/05 12:17:14 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 12 (TID 54): Attempting to kill Python Worker
                                                                                

# 1.
How old is the latest (in terms of days past SLA) currently open issue? How long has the oldest (in terms of days since opened) currently opened issue been open?


In [19]:
cases.show(1, vertical=True)

23/07/05 12:22:46 WARN TaskSetManager: Stage 14 contains a task of very large size (18868 KiB). The maximum recommended task size is 1000 KiB.
[Stage 14:>                                                         (0 + 1) / 1]

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616         
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
only showing top 1 row



23/07/05 12:22:50 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 14 (TID 56): Attempting to kill Python Worker
                                                                                

In [20]:
# Filter for currently open issues
open_issues = cases.filter(cases.case_closed == False)

# Calculate days past SLA for each open issue
open_issues = open_issues.withColumn("days_past_SLA", datediff(current_date(), open_issues.case_due_date))

# Calculate the age of the latest issue in terms of days past SLA
latest_age = open_issues.select(max("days_past_SLA")).first()[0]

# Calculate days since opened for each open issue
open_issues = open_issues.withColumn("days_since_opened", datediff(current_date(), open_issues.case_opened_date))

#Calculate the duration of the oldest issue in terms of days since opened
oldest_duration = open_issues.select(min("days_since_opened")).first()[0]

print("Age of the latest issue (days past SLA):", latest_age)
print("Duration of the oldest issue (days since opened):", oldest_duration)


23/07/05 12:23:57 WARN TaskSetManager: Stage 15 contains a task of very large size (18868 KiB). The maximum recommended task size is 1000 KiB.
23/07/05 12:23:59 WARN TaskSetManager: Stage 18 contains a task of very large size (18868 KiB). The maximum recommended task size is 1000 KiB.
[Stage 18:>                                                         (0 + 8) / 8]

Age of the latest issue (days past SLA): 2360
Duration of the oldest issue (days since opened): 1792


                                                                                

# 2.
How many Stray Animal cases are there?


In [25]:
stray_animal_cases = cases.filter(col("service_request_type") == "Stray Animal")
stray_animal_cases.count()

23/07/05 12:27:32 WARN TaskSetManager: Stage 30 contains a task of very large size (18868 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

26760

# 3.
How many service requests that are assigned to the Field Operations department (dept_division) are not classified as "Officer Standby" request type (service_request_type)?



In [26]:
field_operations_requests = cases.filter((col("dept_division") == "Field Operations") & (col("service_request_type") != "Officer Standby"))
field_operations_requests.count()




23/07/05 12:29:07 WARN TaskSetManager: Stage 33 contains a task of very large size (18868 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

113902

# 4.
Convert the council_district column to a string column.



In [27]:
cases = cases.withColumn("council_district", col("council_district").cast("string"))

# 5.
Extract the year from the case_closed_date column.


In [45]:
cases = cases.withColumn("closed_year", year("case_closed_date"))
cases.select("case_closed_date", "closed_year").show(5)

23/07/05 12:34:58 WARN TaskSetManager: Stage 50 contains a task of very large size (18868 KiB). The maximum recommended task size is 1000 KiB.
[Stage 50:>                                                         (0 + 1) / 1]

+-------------------+-----------+
|   case_closed_date|closed_year|
+-------------------+-----------+
|2018-01-01 12:29:00|       2018|
|2018-01-03 08:11:00|       2018|
|2018-01-02 07:57:00|       2018|
|2018-01-02 08:13:00|       2018|
|2018-01-01 13:29:00|       2018|
+-------------------+-----------+
only showing top 5 rows



23/07/05 12:35:02 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 50 (TID 134): Attempting to kill Python Worker
                                                                                

# 6.
Convert num_days_late from days to hours in new columns num_hours_late.



In [46]:
cases = cases.withColumn("num_hours_late", col("num_days_late") * 24)
cases.select("num_days_late", "num_hours_late").show(5)

23/07/05 12:38:40 WARN TaskSetManager: Stage 51 contains a task of very large size (18868 KiB). The maximum recommended task size is 1000 KiB.
[Stage 51:>                                                         (0 + 1) / 1]

+-------------+-------------------+
|num_days_late|     num_hours_late|
+-------------+-------------------+
| -998.5087616|-23964.210278399998|
| -2.012604167|      -48.302500008|
| -3.022337963|      -72.536111112|
| -15.01148148|      -360.27555552|
|  0.372164352|        8.931944448|
+-------------+-------------------+
only showing top 5 rows



23/07/05 12:38:44 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 51 (TID 135): Attempting to kill Python Worker
                                                                                

# 7.
Join the case data with the source and department data.



In [51]:
joined_data = cases.join(source, "source_id").join(dept, "dept_division")
joined_data.show(1, vertical=True)

23/07/05 12:39:42 WARN TaskSetManager: Stage 74 contains a task of very large size (18868 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

-RECORD 0--------------------------------------
 dept_division          | Miscellaneous        
 source_id              | 141239               
 case_id                | 1014129438           
 case_opened_date       | 2018-01-02 12:16:00  
 case_closed_date       | 2018-01-02 13:49:00  
 case_due_date          | 2018-01-04 12:16:00  
 case_late              | false                
 num_days_late          | -1.935416667         
 case_closed            | true                 
 service_request_type   | Dead Animal - Dog    
 SLA_days               | 2.0                  
 case_status            | Closed               
 request_address        | 161  PRICE, San A... 
 council_district       | 4                    
 closed_year            | 2018                 
 num_hours_late         | -46.450000007999996  
 index                  | 31                   
 source_username        | Erica Flores         
 dept_name              | Solid Waste Manag... 
 standardized_dept_name | Solid Waste   

# 8.

Are there any cases that do not have a request source?



In [53]:
cases_without_source = cases.filter(col("source_id").isNull())
cases_without_source.count()

23/07/05 12:40:06 WARN TaskSetManager: Stage 88 contains a task of very large size (18868 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

0

# 9.
What are the top 10 service request types in terms of number of requests?



In [54]:
top_10_request_types = cases.groupBy("service_request_type").count().orderBy(desc("count")).limit(10)
top_10_request_types.show()

23/07/05 12:40:15 WARN TaskSetManager: Stage 91 contains a task of very large size (18868 KiB). The maximum recommended task size is 1000 KiB.
[Stage 91:>                                                         (0 + 8) / 8]

+--------------------+-----+
|service_request_type|count|
+--------------------+-----+
|           No Pickup|86855|
|Overgrown Yard/Trash|65895|
|        Bandit Signs|32910|
|        Damaged Cart|30338|
|Front Or Side Yar...|28794|
|        Stray Animal|26760|
|Aggressive Animal...|24882|
|Cart Exchange Req...|22024|
|Junk Vehicle On P...|21473|
|     Pot Hole Repair|20616|
+--------------------+-----+



                                                                                

# 10.
What are the top 10 service request types in terms of average days late?


In [55]:
top_10_request_types_avg_late = cases.groupBy("service_request_type").avg("num_days_late").orderBy(desc("avg(num_days_late)")).limit(10)
top_10_request_types_avg_late.show()

23/07/05 12:40:50 WARN TaskSetManager: Stage 94 contains a task of very large size (18868 KiB). The maximum recommended task size is 1000 KiB.
[Stage 94:>                                                         (0 + 8) / 8]

+--------------------+------------------+
|service_request_type|avg(num_days_late)|
+--------------------+------------------+
|Request for Resea...|               NaN|
|CCO_Request for R...|               NaN|
|  Zoning: Junk Yards|175.95636210420952|
|Labeling for Used...|162.43032902285717|
|Record Keeping of...| 153.9972403942857|
|Signage Requied f...|151.63868055333333|
|Storage of Used M...|142.11255641500003|
|Zoning: Recycle Yard|135.92851612479797|
|Donation Containe...|131.75610506358706|
|License Requied U...|128.79828704142858|
+--------------------+------------------+





# 11.
Does number of days late depend on department?


In [57]:
average_days_late_by_department = cases.groupBy("dept_division").avg("num_days_late").orderBy("dept_division")
average_days_late_by_department.show(5)

23/07/05 12:41:51 WARN TaskSetManager: Stage 100 contains a task of very large size (18868 KiB). The maximum recommended task size is 1000 KiB.
[Stage 100:>                                                        (0 + 8) / 8]

+--------------------+-------------------+
|       dept_division| avg(num_days_late)|
+--------------------+-------------------+
|     311 Call Center|  59.49019459221512|
|               Brush|-3.9898705352285626|
|     Clean and Green| -2.574899988068109|
|Clean and Green N...|  1.691468919487805|
|    Code Enforcement| -38.24009089561048|
+--------------------+-------------------+
only showing top 5 rows



                                                                                

# 12.
How do number of days late depend on department and request type?

In [58]:
average_days_late_by_department_request_type = cases.groupBy("dept_division", "service_request_type").avg("num_days_late").orderBy("dept_division", "service_request_type")
average_days_late_by_department_request_type.show(5)

23/07/05 12:41:59 WARN TaskSetManager: Stage 103 contains a task of very large size (18868 KiB). The maximum recommended task size is 1000 KiB.
[Stage 103:>                                                        (0 + 8) / 8]

+---------------+--------------------+------------------+
|  dept_division|service_request_type|avg(num_days_late)|
+---------------+--------------------+------------------+
|311 Call Center|           Complaint| 72.51790932659712|
|311 Call Center|          Compliment|-7.993932925581345|
|          Brush|Brush - Out of Cy...|-4.327971198733802|
|          Brush|Brush / Bulky Mis...|-3.042030021533073|
|          Brush|Brush Missed Pick up|-5.882082558993797|
+---------------+--------------------+------------------+
only showing top 5 rows



                                                                                