In [1]:
import pandas as pd 
import numpy as np  


In [130]:
import pyspark  
import regex
spark = pyspark.sql.SparkSession.builder.getOrCreate()

import pyspark.sql.functions as F
from pyspark.sql.functions import lit
from pyspark.sql.functions import concat, sum, avg, min, max, count
from pyspark.sql.functions import when, expr,col ,asc ,desc,datediff,current_timestamp,format_string, to_timestamp,trim,lower
from pyspark.sql.functions import regexp_extract, regexp_replace 

# Read the case, department, and source data into their own spark dataframes.

In [5]:
from env import host, username, password

def get_db_url(database, host=host, user=username, password=password):
    return f'mysql+pymysql://{user}:{password}@{host}/{database}'

In [223]:
# query from sql(311_data), case, source and dept.
query = """SELECT * FROM cases limit 100"""
url = get_db_url("311_data")
source_df = pd.read_sql(query, url)
case_df = spark.createDataFrame(source_df)

In [11]:
query = """SELECT * FROM dept limit 100"""
url = get_db_url("311_data")
source_df = pd.read_sql(query, url)
dept_df = spark.createDataFrame(source_df)
dept_df.show(4)


+--------------------+--------------------+----------------------+-------------------+
|       dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+--------------------+--------------------+----------------------+-------------------+
|     311 Call Center|    Customer Service|      Customer Service|                YES|
|               Brush|Solid Waste Manag...|           Solid Waste|                YES|
|     Clean and Green|Parks and Recreation|    Parks & Recreation|                YES|
|Clean and Green N...|Parks and Recreation|    Parks & Recreation|                YES|
+--------------------+--------------------+----------------------+-------------------+
only showing top 4 rows



In [19]:

query = """SELECT * FROM source limit 100"""
url = get_db_url("311_data")
source_df = pd.read_sql(query, url)
source_df = spark.createDataFrame(source_df)

+-----+---------+----------------+
|index|source_id| source_username|
+-----+---------+----------------+
|    0|   100137|Merlene Blodgett|
|    1|   103582|     Carmen Cura|
|    2|   106463| Richard Sanchez|
|    3|   119403|  Betty De Hoyos|
+-----+---------+----------------+
only showing top 4 rows



In [122]:
case_df.show(4)

+----------+----------------+----------------+------------+---------+-------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|   case_id|case_opened_date|case_closed_date|SLA_due_date|case_late|num_days_late|case_closed|   dept_division|service_request_type|   SLA_days|case_status|source_id|     request_address|council_district|
+----------+----------------+----------------+------------+---------+-------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|1014127332|     1/1/18 0:42|    1/1/18 12:29|9/26/20 0:42|       NO| -998.5087616|        YES|Field Operations|        Stray Animal|      999.0|     Closed| svcCRMLS|2315  EL PASO ST,...|               5|
|1014127333|     1/1/18 0:46|     1/3/18 8:11| 1/5/18 8:30|       NO| -2.012604167|        YES|     Storm Water|Removal Of Obstru...|4.322222222|     Closed| svcCRMSS|2215  GOL

# Write the code necessary to store the source data in both csv and json format, store these as sources_csv and sources_json


In [None]:
# write to json and csv

In [22]:
source_df.write.json("data/source_json", mode="overwrite")

In [24]:
source_df.write.csv("data/source_csv", mode="overwrite")


In [None]:
# Inspect your folder structure. What do you notice?# 
# it has created a folder in my repo called data, and saved them in their

# Inspect the data in your dataframes. Are the data types appropriate? Write the code necessary to cast the values to the appropriate types.



In [254]:
# Rename column
df = case_df.withColumnRenamed('SLA_due_date', 'case_due_date')

In [255]:
# Convert to better data types
df = (
    df.withColumn('case_late', col('case_late') == 'YES')
    .withColumn('case_closed', col('case_closed') == 'YES')
)

df = df.withColumn('council_district', format_string('%03d', col('council_district')))

fmt = 'M/d/yy H:mm'
df = (
    df.withColumn('case_opened_date', to_timestamp(col('case_opened_date'), fmt))
    .withColumn('case_closed_date', to_timestamp(col('case_closed_date'), fmt))
    .withColumn('case_due_date', to_timestamp(col('case_due_date'), fmt))
)

In [256]:
# Cleanup text data
df = df.withColumn('request_address', lower(trim(col('request_address'))))

In [257]:
# Create a `case_lifetime` feature
df = (
    df.withColumn('case_age', datediff(current_timestamp(), 'case_opened_date'))
    .withColumn('days_to_closed', datediff('case_closed_date', 'case_opened_date'))
    .withColumn('case_lifetime', when(
        col('case_closed'), col('days_to_closed')).otherwise(col('case_age')))
    .drop('case_age', 'days_to_closed')
)

# How old is the latest (in terms of days past SLA) currently open issue? How long has the oldest (in terms of days since opened) currently opened issue been open?


In [258]:
df.show(1, vertical= True)


-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616         
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  el paso st,... 
 council_district     | 005                  
 case_lifetime        | 0                    
only showing top 1 row



In [259]:
df.select(df.case_closed == False ,
          df.case_lifetime).sort(df.case_lifetime.desc()).show()

+---------------------+-------------+
|(case_closed = false)|case_lifetime|
+---------------------+-------------+
|                false|          114|
|                false|           84|
|                false|           81|
|                false|           78|
|                false|           77|
|                false|           77|
|                false|           73|
|                false|           51|
|                false|           51|
|                false|           51|
|                false|           41|
|                false|           30|
|                false|            8|
|                false|            5|
|                false|            5|
|                false|            5|
|                false|            2|
|                false|            2|
|                false|            2|
|                false|            2|
+---------------------+-------------+
only showing top 20 rows



# How many Stray Animal cases are there?

In [260]:
df.groupBy('service_request_type').count().show()

+--------------------+-----+
|service_request_type|count|
+--------------------+-----+
|Right Of Way/Side...|   13|
|Front Or Side Yar...|   28|
|        Stray Animal|   10|
|Removal Of Obstru...|    2|
|Traffic Signal Op...|    1|
|Animal Cruelty(Cr...|    5|
|Alley-Way Mainten...|    1|
|   Dead Animal - Cat|    2|
|Traffic Signal Op...|    3|
|   Barricade Pick Up|    1|
|Dead Trees: Priva...|    1|
|Injured Animal(Cr...|    2|
|Trapped/Confined ...|    5|
|Aggressive Animal...|    6|
|   Dead Animal - Dog|    3|
|Aggressive Animal...|    2|
|      Animal Neglect|   12|
|Water Ponding or ...|    1|
|Stop/Yield Sign D...|    1|
|Graffiti Public P...|    1|
+--------------------+-----+



# How many service requests that are assigned to the Field Operations department (dept_division) ar

In [261]:
df.select('service_request_type','dept_division').where(df.dept_division != 'Officer Standby').count()

100

# Convert the council_district column to a string column.

In [262]:
df.withColumn('council_district',(df.council_district).cast('string'))


DataFrame[case_id: bigint, case_opened_date: timestamp, case_closed_date: timestamp, case_due_date: timestamp, case_late: boolean, num_days_late: double, case_closed: boolean, dept_division: string, service_request_type: string, SLA_days: double, case_status: string, source_id: string, request_address: string, council_district: string, case_lifetime: int]

# Extract the year from the case_closed_date column.

In [263]:
weather.withColumn('month', F.month(weather.date)).groupBy(F.col('month')).agg(F.mean(weather.precipitation).alias('avg_rain')).sort(F.col('avg_rain')).show()


NameError: name 'weather' is not defined

In [264]:
df  = df.withColumn('year_closed',F.year('case_closed_date'))


# Convert num_days_late from days to hours in new columns num_hours_late.

In [265]:
df = df.withColumn('num_hours_late',expr('num_days_late * 24'))


In [266]:
df.show(vertical= True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616         
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  el paso st,... 
 council_district     | 005                  
 case_lifetime        | 0                    
 year_closed          | 2018                 
 num_hours_late       | -23964.210278399998  
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 2018-01-01 00:46:00  
 case_closed_date     | 2018-01-03

# Join the case data with the source and department data.

In [267]:
source_df.columns

Index(['case_id', 'case_opened_date', 'case_closed_date', 'SLA_due_date',
       'case_late', 'num_days_late', 'case_closed', 'dept_division',
       'service_request_type', 'SLA_days', 'case_status', 'source_id',
       'request_address', 'council_district'],
      dtype='object')

In [268]:
df.columns

['case_id',
 'case_opened_date',
 'case_closed_date',
 'case_due_date',
 'case_late',
 'num_days_late',
 'case_closed',
 'dept_division',
 'service_request_type',
 'SLA_days',
 'case_status',
 'source_id',
 'request_address',
 'council_district',
 'case_lifetime',
 'year_closed',
 'num_hours_late']

In [269]:
source_df = spark.createDataFrame(source_df)

In [349]:
dfj = (
    df.join(source_df, "case_id", "left")
    .drop(source_df.case_id)
    .drop(source_df.source_id)
    .drop(source_df.service_request_type)
    .drop(source_df.num_days_late)
    .drop(source_df.dept_division)
    
)


In [338]:
dfj.show(1,vertical= True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616         
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  el paso st,... 
 council_district     | 005                  
 case_lifetime        | 0                    
 year_closed          | 2018                 
 num_hours_late       | -23964.210278399998  
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 SLA_due_date         | 9/26/20 0:42         
 case_late            | NO        

# Are there any cases that do not have a request source?

In [288]:
dfj.groupBy('source_id').agg(count(dfj.source_id)).show()

+---------+----------------+
|source_id|count(source_id)|
+---------+----------------+
|  ns16326|              21|
|   139344|              24|
| svcCRMSS|              53|
| svcCRMLS|               2|
+---------+----------------+



In [306]:
dfj.count()

100

In [305]:
dfj.select(dfj.source_id == 'nan').count()


100

# What are the top 10 service request types in terms of number of requests?

In [323]:
dfj.groupBy('service_request_type').count().sort(F.col('count').desc()).show(10)


+--------------------+-----+
|service_request_type|count|
+--------------------+-----+
|Front Or Side Yar...|   28|
|Right Of Way/Side...|   13|
|      Animal Neglect|   12|
|        Stray Animal|   10|
|Aggressive Animal...|    6|
|Animal Cruelty(Cr...|    5|
|Trapped/Confined ...|    5|
|Traffic Signal Op...|    3|
|   Dead Animal - Dog|    3|
|Injured Animal(Cr...|    2|
+--------------------+-----+
only showing top 10 rows



# What are the top 10 service request types in terms of average days late?

In [337]:
dfj.groupBy('service_request_type').agg(F.round(F.mean('num_days_late'),2).alias('avg_days_late')).sort(F.col('avg_days_late').desc()).show(10)


+--------------------+-------------+
|service_request_type|avg_days_late|
+--------------------+-------------+
|Aggressive Animal...|        97.24|
|      Animal Neglect|        43.43|
|Aggressive Animal...|         5.45|
|Trapped/Confined ...|         0.78|
|Animal Cruelty(Cr...|         0.04|
|Injured Animal(Cr...|        -0.07|
|   Dead Animal - Cat|        -1.09|
|   Dead Animal - Dog|        -1.16|
|Graffiti Public P...|        -1.65|
|Traffic Signal Op...|        -1.88|
+--------------------+-------------+
only showing top 10 rows



# Does number of days late depend on department?

In [359]:
dfj.groupBy('dept_division').agg(F.mean('num_days_late').alias('avg_late')).sort(F.col('avg_late').desc()).show()


+------------------+-------------------+
|     dept_division|           avg_late|
+------------------+-------------------+
|     Miscellaneous|      -1.1340763888|
|          Graffiti|       -1.649409722|
|Signs and Markings|       -1.945706019|
|       Storm Water|       -2.517471065|
|           Signals| -8.849137730999999|
|  Code Enforcement|-17.002187499069766|
|           Streets|-24.383096065500002|
|  Field Operations| -219.9206836960952|
+------------------+-------------------+



# How do number of days late depend on department and request type?


In [366]:
dfj.groupBy('service_request_type','dept_division').agg(F.mean('num_days_late')).sort(F.col('avg(num_days_late)')).show(10)

+--------------------+------------------+-------------------+
|service_request_type|     dept_division| avg(num_days_late)|
+--------------------+------------------+-------------------+
|        Stray Animal|  Field Operations| -998.8975937499999|
|Alley-Way Mainten...|  Code Enforcement|       -63.98395833|
|Dead Trees: Priva...|  Code Enforcement|       -63.79819444|
|Water Ponding or ...|           Streets|       -42.78289352|
|Traffic Signal Op...|           Signals|       -29.74398148|
|Front Or Side Yar...|  Code Enforcement|-14.718330439285713|
|Right Of Way/Side...|  Code Enforcement|       -14.70758903|
|   Barricade Pick Up|           Streets|       -5.983298611|
|Removal Of Obstru...|       Storm Water|       -2.517471065|
|Stop/Yield Sign D...|Signs and Markings|       -1.945706019|
+--------------------+------------------+-------------------+
only showing top 10 rows

