In [1]:
import pyspark
import pandas as pd
import numpy as np
from pyspark.sql.functions import *

spark = pyspark.sql.SparkSession.builder.getOrCreate()

In [2]:
dept = spark.read.csv('dept.csv', header=True, inferSchema=True)
case = spark.read.csv('case.csv', header=True, inferSchema=True)
source = spark.read.csv('source.csv', header=True, inferSchema=True)

In [3]:
dept.show(5)

+--------------------+--------------------+----------------------+-------------------+
|       dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+--------------------+--------------------+----------------------+-------------------+
|     311 Call Center|    Customer Service|      Customer Service|                YES|
|               Brush|Solid Waste Manag...|           Solid Waste|                YES|
|     Clean and Green|Parks and Recreation|    Parks & Recreation|                YES|
|Clean and Green N...|Parks and Recreation|    Parks & Recreation|                YES|
|    Code Enforcement|Code Enforcement ...|  DSD/Code Enforcement|                YES|
+--------------------+--------------------+----------------------+-------------------+
only showing top 5 rows



In [4]:
source.write.json("data/sources_json", mode="overwrite")

In [5]:
source.write.csv("data/sources_csv", mode="overwrite", header=True)

In [6]:
dept.dtypes

[('dept_division', 'string'),
 ('dept_name', 'string'),
 ('standardized_dept_name', 'string'),
 ('dept_subject_to_SLA', 'string')]

In [7]:
dept.show(2, vertical=True, truncate=False)

-RECORD 0----------------------------------------
 dept_division          | 311 Call Center        
 dept_name              | Customer Service       
 standardized_dept_name | Customer Service       
 dept_subject_to_SLA    | YES                    
-RECORD 1----------------------------------------
 dept_division          | Brush                  
 dept_name              | Solid Waste Management 
 standardized_dept_name | Solid Waste            
 dept_subject_to_SLA    | YES                    
only showing top 2 rows



In [10]:
dept = dept.withColumn('dept_subject_to_SLA', col('dept_subject_to_SLA') == 'YES')

In [11]:
case.dtypes

[('case_id', 'int'),
 ('case_opened_date', 'string'),
 ('case_closed_date', 'string'),
 ('SLA_due_date', 'string'),
 ('case_late', 'string'),
 ('num_days_late', 'double'),
 ('case_closed', 'string'),
 ('dept_division', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'double'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'int')]

In [12]:
case.show(1, vertical=True, truncate=False)

-RECORD 0----------------------------------------------------
 case_id              | 1014127332                           
 case_opened_date     | 1/1/18 0:42                          
 case_closed_date     | 1/1/18 12:29                         
 SLA_due_date         | 9/26/20 0:42                         
 case_late            | NO                                   
 num_days_late        | -998.5087616000001                   
 case_closed          | YES                                  
 dept_division        | Field Operations                     
 service_request_type | Stray Animal                         
 SLA_days             | 999.0                                
 case_status          | Closed                               
 source_id            | svcCRMLS                             
 request_address      | 2315  EL PASO ST, San Antonio, 78207 
 council_district     | 5                                    
only showing top 1 row



In [13]:
case = case.withColumn('case_late', col('case_late') == 'YES')\
        .withColumn('case_closed', col('case_late') == 'YES')\
        .withColumn('case_id', format_string('%d', col('case_id')))\
        .withColumn('council_district', format_string('%d', col('council_district')))\
        .withColumn('case_closed_date', to_timestamp(col('case_closed_date'), 'M/d/yy H:mm'))\
        .withColumn('case_opened_date', to_timestamp(col('case_opened_date'), 'M/d/yy H:mm'))\
        .withColumn('SLA_due_date', to_timestamp(col('SLA_due_date'), 'M/d/yy H:mm'))

In [14]:
source.dtypes

[('source_id', 'string'), ('source_username', 'string')]

In [15]:
source.show(2)

+---------+----------------+
|source_id| source_username|
+---------+----------------+
|   100137|Merlene Blodgett|
|   103582|     Carmen Cura|
+---------+----------------+
only showing top 2 rows



In [16]:
case.groupBy('case_status').count().show()

+-----------+------+
|case_status| count|
+-----------+------+
|       Open| 18110|
|     Closed|823594|
+-----------+------+



In [17]:
case.where((col('case_status') == 'Open'))\
    .select(max('num_days_late')).show()

+------------------+
|max(num_days_late)|
+------------------+
|       348.6458333|
+------------------+



In [18]:
case.where('num_days_late' == max('num_days_late'))\
    .show(vertical=True)
############ WRONG!

(0 rows)



In [19]:
case.where(col('service_request_type') == 'Stray Animal')\
    .count()

26760

In [20]:
case.where((case.dept_division == 'Field Operations') &
           (case.service_request_type != 'Officer Standby'))\
    .count()

113902

In [21]:
case.select(year('case_closed_date')).show(3)

+----------------------+
|year(case_closed_date)|
+----------------------+
|                  2018|
|                  2018|
|                  2018|
+----------------------+
only showing top 3 rows



In [22]:
case.withColumn('num_hours_late', round(case.num_days_late*24, 0))\
    .show(1, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 SLA_due_date         | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | false                
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
 num_hours_late       | -23964.0             
only showing top 1 row



In [23]:
source.columns

['source_id', 'source_username']

In [24]:
dept.columns

['dept_division', 'dept_name', 'standardized_dept_name', 'dept_subject_to_SLA']

In [25]:
df = (
    case.join(dept, 'dept_division', 'left')
    .join(source, 'source_id', 'left')
)

df.show(1, vertical=True)

-RECORD 0--------------------------------------
 source_id              | svcCRMLS             
 dept_division          | Field Operations     
 case_id                | 1014127332           
 case_opened_date       | 2018-01-01 00:42:00  
 case_closed_date       | 2018-01-01 12:29:00  
 SLA_due_date           | 2020-09-26 00:42:00  
 case_late              | false                
 num_days_late          | -998.5087616000001   
 case_closed            | false                
 service_request_type   | Stray Animal         
 SLA_days               | 999.0                
 case_status            | Closed               
 request_address        | 2315  EL PASO ST,... 
 council_district       | 5                    
 dept_name              | Animal Care Services 
 standardized_dept_name | Animal Care Services 
 dept_subject_to_SLA    | true                 
 source_username        | svcCRMLS             
only showing top 1 row



In [26]:
df.where(df.source_id.isNull()).count()

0

In [27]:
df.where(df.request_address.isNull()).count()

0

In [28]:
df.groupBy(df.service_request_type).count()\
    .sort('count', ascending=False).show(10, truncate=False)

+--------------------------------+-----+
|service_request_type            |count|
+--------------------------------+-----+
|No Pickup                       |89210|
|Overgrown Yard/Trash            |66403|
|Bandit Signs                    |32968|
|Damaged Cart                    |31163|
|Front Or Side Yard Parking      |28920|
|Stray Animal                    |27361|
|Aggressive Animal(Non-Critical) |25492|
|Cart Exchange Request           |22608|
|Junk Vehicle On Private Property|21649|
|Pot Hole Repair                 |20827|
+--------------------------------+-----+
only showing top 10 rows



In [29]:
df.groupBy(df.service_request_type).agg(avg(df.num_days_late))\
    .sort('avg(num_days_late)', ascending=False)\
    .show(10, truncate=False)

+--------------------------------------+------------------+
|service_request_type                  |avg(num_days_late)|
+--------------------------------------+------------------+
|Zoning: Junk Yards                    |175.9563621042095 |
|Labeling for Used Mattress            |162.43032902285717|
|Record Keeping of Used Mattresses     |153.99724039428568|
|Signage Requied for Sale of Used Mattr|151.63868055333333|
|Storage of Used Mattress              |142.112556415     |
|Zoning: Recycle Yard                  |135.92851612479797|
|Donation Container Enforcement        |131.75610506358706|
|License Requied Used Mattress Sales   |128.79828704142858|
|Traffic Signal Graffiti               |101.79846062200002|
|Complaint                             |72.87050230311695 |
+--------------------------------------+------------------+
only showing top 10 rows



In [30]:
df.groupBy(df.standardized_dept_name).agg(avg(df.num_days_late))\
    .sort('avg(num_days_late)', ascending=False)\
    .show(truncate=False)

+------------------------+-------------------+
|standardized_dept_name  |avg(num_days_late) |
+------------------------+-------------------+
|Customer Service        |59.737091496300785 |
|Solid Waste             |-2.2000575136721747|
|Metro Health            |-4.911766979607002 |
|Parks & Recreation      |-5.251521960055133 |
|Trans & Cap Improvements|-20.612837354052626|
|DSD/Code Enforcement    |-38.36938892614453 |
|Animal Care Services    |-226.51783940550382|
|City Council            |null               |
+------------------------+-------------------+



In [31]:
df.groupBy(df.standardized_dept_name, df.service_request_type)\
    .agg(avg(df.num_days_late))\
    .sort('avg(num_days_late)', ascending=False)\
    .show(10, truncate=False)

+------------------------+--------------------------------------+------------------+
|standardized_dept_name  |service_request_type                  |avg(num_days_late)|
+------------------------+--------------------------------------+------------------+
|DSD/Code Enforcement    |Zoning: Junk Yards                    |175.9563621042095 |
|DSD/Code Enforcement    |Labeling for Used Mattress            |162.43032902285717|
|DSD/Code Enforcement    |Record Keeping of Used Mattresses     |153.99724039428568|
|DSD/Code Enforcement    |Signage Requied for Sale of Used Mattr|151.63868055333333|
|DSD/Code Enforcement    |Storage of Used Mattress              |142.112556415     |
|DSD/Code Enforcement    |Zoning: Recycle Yard                  |135.92851612479797|
|DSD/Code Enforcement    |Donation Container Enforcement        |131.75610506358706|
|DSD/Code Enforcement    |License Requied Used Mattress Sales   |128.79828704142858|
|Trans & Cap Improvements|Traffic Signal Graffiti               |

In [32]:
df.columns

['source_id',
 'dept_division',
 'case_id',
 'case_opened_date',
 'case_closed_date',
 'SLA_due_date',
 'case_late',
 'num_days_late',
 'case_closed',
 'service_request_type',
 'SLA_days',
 'case_status',
 'request_address',
 'council_district',
 'dept_name',
 'standardized_dept_name',
 'dept_subject_to_SLA',
 'source_username']

In [33]:
max_date = df.agg(max(df.case_closed_date).alias('max_date')).head()[0]
max_date = max_date.strftime('%Y-%m-%d %H:%M:%S')
max_date

'2018-08-08 10:38:00'

In [34]:
# max_date.withColumn('date', to_date('max_date')).show()

In [35]:
df.withColumn('num_days_late_revised', 
        when(df.case_closed == True, 
             datediff(df.case_closed_date, df.case_opened_date))\
        .otherwise(datediff(lit(max_date), df.case_opened_date)))\
    .show(1, vertical=True)

-RECORD 0--------------------------------------
 source_id              | svcCRMLS             
 dept_division          | Field Operations     
 case_id                | 1014127332           
 case_opened_date       | 2018-01-01 00:42:00  
 case_closed_date       | 2018-01-01 12:29:00  
 SLA_due_date           | 2020-09-26 00:42:00  
 case_late              | false                
 num_days_late          | -998.5087616000001   
 case_closed            | false                
 service_request_type   | Stray Animal         
 SLA_days               | 999.0                
 case_status            | Closed               
 request_address        | 2315  EL PASO ST,... 
 council_district       | 5                    
 dept_name              | Animal Care Services 
 standardized_dept_name | Animal Care Services 
 dept_subject_to_SLA    | true                 
 source_username        | svcCRMLS             
 num_days_late_revised  | 219                  
only showing top 1 row

