In [1]:
# Create the Spark Session

import pyspark

spark = pyspark.sql.SparkSession.builder.getOrCreate()

In [2]:
import pandas as pd
import numpy as np
import math
from pyspark.sql.functions import sum, mean, concat, lit, asc, desc, col, when, filter, to_timestamp
from datetime import datetime, date
from pandas import Timestamp

### Read the case, department, and source data into their own spark dataframes.

#### Let's see how writing to the local disk works in spark:

- Write the code necessary to store the source data in both csv and json format, store these as sources_csv and sources_json
- Inspect your folder structure. What do you notice?
- Inspect the data in your dataframes. Are the data types appropriate? Write the code necessary to cast the values to the appropriate types.

In [3]:
df = spark.read.csv("source.csv", sep=",", header=True, inferSchema=True)

In [4]:
df.show(3)

+---------+----------------+
|source_id| source_username|
+---------+----------------+
|   100137|Merlene Blodgett|
|   103582|     Carmen Cura|
|   106463| Richard Sanchez|
+---------+----------------+
only showing top 3 rows



In [5]:
df.describe().show()

+-------+------------------+---------------+
|summary|         source_id|source_username|
+-------+------------------+---------------+
|  count|               140|            140|
|   mean|136973.92727272728|           null|
| stddev| 10275.63643312297|           null|
|    min|            100137|  Alex Franklin|
|    max|           yh24110|       svcCRMSS|
+-------+------------------+---------------+



In [6]:
df.dtypes

[('source_id', 'string'), ('source_username', 'string')]

In [7]:
df.printSchema()

root
 |-- source_id: string (nullable = true)
 |-- source_username: string (nullable = true)



cast source id from string to int

In [11]:
df = df.withColumn('source_id', df.source_id.cast("string"))

In [12]:
df.printSchema()

root
 |-- source_id: string (nullable = true)
 |-- source_username: string (nullable = true)



--------------

How old is the latest (in terms of days past SLA) currently open issue? How long has the oldest (in terms of days since opened) currently opened issue been open?

---------

In [16]:
cases = spark.read.csv("case.csv", sep=",", header=True, inferSchema=True)

In [17]:
cases.sort(desc(cases.num_days_late)).show(1, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1013288696           
 case_opened_date     | 1/26/17 15:36        
 case_closed_date     | 7/14/18 8:21         
 SLA_due_date         | 2/9/17 15:36         
 case_late            | YES                  
 num_days_late        | 519.6980787          
 case_closed          | YES                  
 dept_division        | Code Enforcement     
 service_request_type | Bandit Signs         
 SLA_days             | 14.0                 
 case_status          | Closed               
 source_id            | 138650               
 request_address      | 10631  NACOGDOCHE... 
 council_district     | 10                   
only showing top 1 row



In [18]:
cases = cases.withColumnRenamed("SLA_due_date", "case_due_date")

Maggie comments on the date one

--------------

spark.sql('''
SELECT DATEDIFF(current_timestamp, case_due_date) AS days_past_due
FROM df
WHERE NOT case_closed
ORDER BY days_past_due DESC
LIMIT 15
''').show()

------------

How many Stray Animal cases are there?

--------

In [20]:
cases.filter(cases.service_request_type == 'Stray Animal').count()

26760

--------

How many service requests that are assigned to the Field Operations department (dept_division) are not classified as "Officer Standby" request type (service_request_type)?

-----

------

Convert the council_district column to a string column.

-------

--------

What are the top 10 service request types in terms of number of requests?

-------

In [26]:
cases.groupBy('service_request_type').count().sort(col('count').desc()).show()

+--------------------+-----+
|service_request_type|count|
+--------------------+-----+
|           No Pickup|86855|
|Overgrown Yard/Trash|65895|
|        Bandit Signs|32910|
|        Damaged Cart|30338|
|Front Or Side Yar...|28794|
|        Stray Animal|26760|
|Aggressive Animal...|24882|
|Cart Exchange Req...|22024|
|Junk Vehicle On P...|21473|
|     Pot Hole Repair|20616|
|Alley-Way Mainten...|20214|
|    Lost/Stolen Cart|18731|
|Right Of Way/Side...|17699|
|   Dead Animal - Dog|16714|
|       Cart Delivery|15471|
|   Dead Animal - Cat|14983|
|      Animal Neglect|13441|
|  Dead Animal - Misc|13234|
|Trapped/Confined ...|11354|
|Public Nuisance(O...|10715|
+--------------------+-----+
only showing top 20 rows



---------

What are the top 10 service request types in terms of average days late?

-----------

In [38]:
cases.groupBy('service_request_type').agg(mean(cases.num_days_late).alias('avg_days_late')).sort(desc('avg_days_late')).show(10)

+--------------------+------------------+
|service_request_type|     avg_days_late|
+--------------------+------------------+
|  Zoning: Junk Yards|175.95636210420943|
|Labeling for Used...|162.43032902285717|
|Record Keeping of...|153.99724039428568|
|Signage Requied f...|151.63868055333333|
|Storage of Used M...|142.11255641500003|
|Zoning: Recycle Yard|135.92851612479797|
|Donation Containe...|131.75610506358706|
|License Requied U...|128.79828704142858|
|Traffic Signal Gr...| 77.90021217000002|
|           Complaint|  72.5179093265971|
+--------------------+------------------+
only showing top 10 rows



------

Does number of days late depend on department?

------