In [135]:
import os
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType
import pandas as pd


In [2]:
spark = SparkSession.builder.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/05 12:24:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Read the case, department, and source data into their own spark dataframes.

In [143]:
source_df = spark.read.csv("source.csv", sep=",", header=True, inferSchema=True)
case_df = spark.read.csv("case.csv", sep=",", header=True, inferSchema=True)
dept_df = spark.read.csv("dept.csv", sep=",", header=True, inferSchema=True)

source_df

                                                                                

DataFrame[source_id: string, source_username: string]

# Write the code necessary to store the source data in both csv and json format, store these as sources_csv and sources_json

In [4]:
source_df.write.json("source_json", mode="overwrite")
case_df.write.json("case_json", mode="overwrite")
dept_df.write.json("dept_json", mode="overwrite")

                                                                                

In [5]:
(
    source_df.write.format("csv")
    .mode("overwrite")
    .option("header", True)
    .save("source_csv")
)
(
    case_df.write.format("csv")
    .mode("overwrite")
    .option("header", True)
    .save("case_csv")
)
(
    dept_df.write.format("csv")
    .mode("overwrite")
    .option("header", True)
    .save("dept_csv")
)

                                                                                

# Inspect your folder structure. What do you notice?

I noticed that they all had their own folders containing success and it was accompanied by at least one part

# Inspect the data in your dataframes. Are the data types appropriate? Write the code necessary to cast the values to the appropriate types.

In [6]:
source_df.show(2)

+---------+----------------+
|source_id| source_username|
+---------+----------------+
|   100137|Merlene Blodgett|
|   103582|     Carmen Cura|
+---------+----------------+
only showing top 2 rows



In [144]:

source_df.printSchema()

root
 |-- source_id: string (nullable = true)
 |-- source_username: string (nullable = true)



In [8]:
case_df.show(2, vertical=True, truncate=False)

-RECORD 0----------------------------------------------------
 case_id              | 1014127332                           
 case_opened_date     | 1/1/18 0:42                          
 case_closed_date     | 1/1/18 12:29                         
 SLA_due_date         | 9/26/20 0:42                         
 case_late            | NO                                   
 num_days_late        | -998.5087616000001                   
 case_closed          | YES                                  
 dept_division        | Field Operations                     
 service_request_type | Stray Animal                         
 SLA_days             | 999.0                                
 case_status          | Closed                               
 source_id            | svcCRMLS                             
 request_address      | 2315  EL PASO ST, San Antonio, 78207 
 council_district     | 5                                    
-RECORD 1----------------------------------------------------
 case_id

In [16]:
fmt = "M/d/yy H:mm"

case_df = case_df.withColumn("case_opened_date", to_timestamp("case_opened_date", fmt))\
.withColumn("case_closed_date", to_timestamp("case_closed_date", fmt))\
.withColumn("SLA_due_date", to_timestamp("SLA_due_date", fmt))
          
          

case_df.printSchema()How old is the latest (in terms of days past SLA) currently open issue? How long has the oldest (in terms of days since opened) currently opened issue been open?

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: timestamp (nullable = true)
 |-- case_closed_date: timestamp (nullable = true)
 |-- SLA_due_date: timestamp (nullable = true)
 |-- case_late: string (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: string (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: integer (nullable = true)



In [97]:
dept_df.show(2, vertical=True, truncate=False)

-RECORD 0----------------------------------------
 dept_division          | 311 Call Center        
 dept_name              | Customer Service       
 standardized_dept_name | Customer Service       
 dept_subject_to_SLA    | YES                    
-RECORD 1----------------------------------------
 dept_division          | Brush                  
 dept_name              | Solid Waste Management 
 standardized_dept_name | Solid Waste            
 dept_subject_to_SLA    | YES                    
only showing top 2 rows



In [11]:
dept_df.printSchema()

root
 |-- dept_division: string (nullable = true)
 |-- dept_name: string (nullable = true)
 |-- standardized_dept_name: string (nullable = true)
 |-- dept_subject_to_SLA: string (nullable = true)



# How old is the latest (in terms of days past SLA) currently open issue? How long has the oldest (in terms of days since opened) currently opened issue been open?

2217

1792

In [42]:
sorted_df = case_df.sort('SLA_due_date', 'case_opened_date')
# desc("SLA_due_date")

sorted_df.select(col('case_opened_date')).show(1, vertical=True, truncate=False)
sorted_df.select(col('case_opened_date'))


[Stage 33:>                                                         (0 + 8) / 8]

-RECORD 0-------------------------------
 case_opened_date | 2017-06-09 13:47:00 
only showing top 1 row





DataFrame[case_opened_date: timestamp]

In [48]:
sorted_df.withColumn(
        "case_age", datediff(current_timestamp(), 'case_opened_date')).show(1,  vertical=True, truncate=False)
    

                                                                                

-RECORD 0-------------------------------------------------------
 case_id              | 1013632928                              
 case_opened_date     | 2017-06-09 13:47:00                     
 case_closed_date     | 2017-06-16 15:40:00                     
 SLA_due_date         | null                                    
 case_late            | NO                                      
 num_days_late        | null                                    
 case_closed          | YES                                     
 dept_division        | District 9                              
 service_request_type | Request for Research/Information        
 SLA_days             | null                                    
 case_status          | Closed                                  
 source_id            | 138650                                  
 request_address      | 1100  SAN PEDRO AVE, San Antonio, 78212 
 council_district     | 1                                       
 case_age             | 2

In [51]:
sorted_df = case_df.sort("SLA_due_date", desc('case_opened_date'))
sorted_df.show(3, vertical=True, truncate=False)

[Stage 38:>                                                         (0 + 8) / 8]

-RECORD 0------------------------------------------------------
 case_id              | 1014759588                             
 case_opened_date     | 2018-08-08 10:33:00                    
 case_closed_date     | null                                   
 SLA_due_date         | null                                   
 case_late            | NO                                     
 num_days_late        | null                                   
 case_closed          | NO                                     
 dept_division        | District 2                             
 service_request_type | Request for Research/Information       
 SLA_days             | null                                   
 case_status          | Open                                   
 source_id            | js12254                                
 request_address      | 923  VIRGINIA BLVD, San Antonio, 78203 
 council_district     | 2                                      
-RECORD 1-------------------------------



In [52]:
sorted_df.withColumn(
        "case_age", datediff(current_timestamp(), 'case_opened_date')).show(1,  vertical=True, truncate=False)

[Stage 39:>                                                         (0 + 8) / 8]

-RECORD 0------------------------------------------------------
 case_id              | 1014759588                             
 case_opened_date     | 2018-08-08 10:33:00                    
 case_closed_date     | null                                   
 SLA_due_date         | null                                   
 case_late            | NO                                     
 num_days_late        | null                                   
 case_closed          | NO                                     
 dept_division        | District 2                             
 service_request_type | Request for Research/Information       
 SLA_days             | null                                   
 case_status          | Open                                   
 source_id            | js12254                                
 request_address      | 923  VIRGINIA BLVD, San Antonio, 78203 
 council_district     | 2                                      
 case_age             | 1792            

                                                                                

# How many Stray Animal cases are there?

26760

In [56]:
case_df[case_df.service_request_type == "Stray Animal"].show(3, vertical=True, truncate=False)

-RECORD 0-----------------------------------------------------
 case_id              | 1014127332                            
 case_opened_date     | 2018-01-01 00:42:00                   
 case_closed_date     | 2018-01-01 12:29:00                   
 SLA_due_date         | 2020-09-26 00:42:00                   
 case_late            | NO                                    
 num_days_late        | -998.5087616000001                    
 case_closed          | YES                                   
 dept_division        | Field Operations                      
 service_request_type | Stray Animal                          
 SLA_days             | 999.0                                 
 case_status          | Closed                                
 source_id            | svcCRMLS                              
 request_address      | 2315  EL PASO ST, San Antonio, 78207  
 council_district     | 5                                     
-RECORD 1----------------------------------------------

In [58]:
case_df[case_df.service_request_type == "Stray Animal"].count()

                                                                                

26760

# How many service requests that are assigned to the Field Operations department (dept_division) are not classified as "Officer Standby" request type (service_request_type)?

113902

In [65]:
# case_df.show(vertical=True, truncate=False)
case_df[(case_df['dept_division'] == 'Field Operations') & (case_df['service_request_type'] != "Officer Standby")].show(vertical=True, truncate=False)

-RECORD 0---------------------------------------------------------
 case_id              | 1014127332                                
 case_opened_date     | 2018-01-01 00:42:00                       
 case_closed_date     | 2018-01-01 12:29:00                       
 SLA_due_date         | 2020-09-26 00:42:00                       
 case_late            | NO                                        
 num_days_late        | -998.5087616000001                        
 case_closed          | YES                                       
 dept_division        | Field Operations                          
 service_request_type | Stray Animal                              
 SLA_days             | 999.0                                     
 case_status          | Closed                                    
 source_id            | svcCRMLS                                  
 request_address      | 2315  EL PASO ST, San Antonio, 78207      
 council_district     | 5                                     

In [66]:
case_df[(case_df['dept_division'] == 'Field Operations') & (case_df['service_request_type'] != "Officer Standby")].count()

                                                                                

113902

# Convert the council_district column to a string column.

In [72]:
case_df = case_df.withColumn("council_district", col("council_district").cast("string"))
case_df.printSchema()

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: timestamp (nullable = true)
 |-- case_closed_date: timestamp (nullable = true)
 |-- SLA_due_date: timestamp (nullable = true)
 |-- case_late: string (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: string (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: string (nullable = true)



# Extract the year from the case_closed_date column.

In [89]:
case_df.select(regexp_extract("case_closed_date", r"\d+", 0).alias('the_year')).show(vertical=True, truncate=False)

-RECORD 0--------
 the_year | 2018 
-RECORD 1--------
 the_year | 2018 
-RECORD 2--------
 the_year | 2018 
-RECORD 3--------
 the_year | 2018 
-RECORD 4--------
 the_year | 2018 
-RECORD 5--------
 the_year | 2018 
-RECORD 6--------
 the_year | 2018 
-RECORD 7--------
 the_year | 2018 
-RECORD 8--------
 the_year | 2018 
-RECORD 9--------
 the_year | 2018 
-RECORD 10-------
 the_year | 2018 
-RECORD 11-------
 the_year | 2018 
-RECORD 12-------
 the_year | 2018 
-RECORD 13-------
 the_year | 2018 
-RECORD 14-------
 the_year | 2018 
-RECORD 15-------
 the_year | 2018 
-RECORD 16-------
 the_year | 2018 
-RECORD 17-------
 the_year | 2018 
-RECORD 18-------
 the_year | 2018 
-RECORD 19-------
 the_year | 2018 
only showing top 20 rows



# Convert num_days_late from days to hours in new columns num_hours_late.

In [95]:
case_df = case_df.withColumn('num_hours_late', case_df.num_days_late * 24)

In [96]:
case_df.show(vertical=True, truncate=False)

-RECORD 0--------------------------------------------------------
 case_id              | 1014127332                               
 case_opened_date     | 2018-01-01 00:42:00                      
 case_closed_date     | 2018-01-01 12:29:00                      
 SLA_due_date         | 2020-09-26 00:42:00                      
 case_late            | NO                                       
 num_days_late        | -998.5087616000001                       
 case_closed          | YES                                      
 dept_division        | Field Operations                         
 service_request_type | Stray Animal                             
 SLA_days             | 999.0                                    
 case_status          | Closed                                   
 source_id            | svcCRMLS                                 
 request_address      | 2315  EL PASO ST, San Antonio, 78207     
 council_district     | 5                                        
 num_hours

# Join the case data with the source and department data.

In [106]:
test_df = (
    sorted_df
    .join(dept_df, on ="dept_division", how = "left")
    .drop(test_df.dept_name)
)


test_df.show(2, vertical=True)

-RECORD 0--------------------------------------
 dept_division          | Field Operations     
 case_id                | 1014127332           
 case_opened_date       | 2018-01-01 00:42:00  
 case_closed_date       | 2018-01-01 12:29:00  
 SLA_due_date           | 2020-09-26 00:42:00  
 case_late              | NO                   
 num_days_late          | -998.5087616000001   
 case_closed            | YES                  
 service_request_type   | Stray Animal         
 SLA_days               | 999.0                
 case_status            | Closed               
 source_id              | svcCRMLS             
 request_address        | 2315  EL PASO ST,... 
 council_district       | 5                    
 standardized_dept_name | Animal Care Services 
 dept_subject_to_SLA    | YES                  
-RECORD 1--------------------------------------
 dept_division          | Storm Water          
 case_id                | 1014127333           
 case_opened_date       | 2018-01-01 00:

In [146]:
answer_df = (
    test_df
    .join(source_df, on ="source_id", how = "left")
)
answer_df.show(2, vertical=True)

-RECORD 0--------------------------------------
 source_id              | svcCRMLS             
 dept_division          | Field Operations     
 case_id                | 1014127332           
 case_opened_date       | 2018-01-01 00:42:00  
 case_closed_date       | 2018-01-01 12:29:00  
 SLA_due_date           | 2020-09-26 00:42:00  
 case_late              | NO                   
 num_days_late          | -998.5087616000001   
 case_closed            | YES                  
 service_request_type   | Stray Animal         
 SLA_days               | 999.0                
 case_status            | Closed               
 request_address        | 2315  EL PASO ST,... 
 council_district       | 5                    
 standardized_dept_name | Animal Care Services 
 dept_subject_to_SLA    | YES                  
 source_username        | svcCRMLS             
-RECORD 1--------------------------------------
 source_id              | svcCRMSS             
 dept_division          | Storm Water   

# Are there any cases that do not have a request source?

Looks like we have no nulls and every source id column is filled

In [110]:
test_df.sort(desc('source_id')).show(2, vertical=True)

[Stage 86:>                                                         (0 + 8) / 8]

-RECORD 0--------------------------------------
 dept_division          | Streets              
 case_id                | 1014081882           
 case_opened_date       | 2017-12-08 12:47:00  
 case_closed_date       | 2018-04-03 11:37:00  
 SLA_due_date           | 2018-04-24 12:47:00  
 case_late              | NO                   
 num_days_late          | -21.04885417         
 case_closed            | YES                  
 service_request_type   | Base/Pavement Repair 
 SLA_days               | 137.0                
 case_status            | Closed               
 source_id              | yh24110              
 request_address        | LOOP 410 SW and R... 
 council_district       | 4                    
 standardized_dept_name | Trans & Cap Impro... 
 dept_subject_to_SLA    | YES                  
-RECORD 1--------------------------------------
 dept_division          | Storm Water          
 case_id                | 1014643380           
 case_opened_date       | 2018-06-28 11:



In [111]:
test_df.sort('source_id').show(2, vertical=True)



-RECORD 0--------------------------------------
 dept_division          | Code Enforcement     
 case_id                | 1013983058           
 case_opened_date       | 2017-10-26 17:14:00  
 case_closed_date       | 2017-11-20 17:33:00  
 SLA_due_date           | 2017-11-06 08:30:00  
 case_late              | YES                  
 num_days_late          | 14.37762731          
 case_closed            | YES                  
 service_request_type   | Permits Building     
 SLA_days               | 10.63596065          
 case_status            | Closed               
 source_id              | 100137               
 request_address        | 307  JOSEPHINE E,... 
 council_district       | 1                    
 standardized_dept_name | DSD/Code Enforcement 
 dept_subject_to_SLA    | YES                  
-RECORD 1--------------------------------------
 dept_division          | Waste Collection     
 case_id                | 1014358088           
 case_opened_date       | 2018-03-24 12:



In [117]:
test_df.filter(col('source_id').isNull()).show(2, vertical=True)

                                                                                

(0 rows)



# What are the top 10 service request types in terms of number of requests?

In [127]:
test_df.groupby('service_request_type').count().sort(desc('count')).show(10,50,  vertical=True)

[Stage 126:>                                                        (0 + 8) / 8]

-RECORD 0------------------------------------------------
 service_request_type | No Pickup                        
 count                | 86855                            
-RECORD 1------------------------------------------------
 service_request_type | Overgrown Yard/Trash             
 count                | 65895                            
-RECORD 2------------------------------------------------
 service_request_type | Bandit Signs                     
 count                | 32910                            
-RECORD 3------------------------------------------------
 service_request_type | Damaged Cart                     
 count                | 30338                            
-RECORD 4------------------------------------------------
 service_request_type | Front Or Side Yard Parking       
 count                | 28794                            
-RECORD 5------------------------------------------------
 service_request_type | Stray Animal                     
 count        

                                                                                

# What are the top 10 service request types in terms of average days late?

In [133]:
test_df.groupby('service_request_type').avg('num_days_late').sort(desc('avg(num_days_late)')).show(10,50,  vertical=True)

[Stage 144:>                                                        (0 + 8) / 8]

-RECORD 0------------------------------------------------------
 service_request_type | Zoning: Junk Yards                     
 avg(num_days_late)   | 175.95636210420943                     
-RECORD 1------------------------------------------------------
 service_request_type | Labeling for Used Mattress             
 avg(num_days_late)   | 162.43032902285717                     
-RECORD 2------------------------------------------------------
 service_request_type | Record Keeping of Used Mattresses      
 avg(num_days_late)   | 153.99724039428568                     
-RECORD 3------------------------------------------------------
 service_request_type | Signage Requied for Sale of Used Mattr 
 avg(num_days_late)   | 151.63868055333333                     
-RECORD 4------------------------------------------------------
 service_request_type | Storage of Used Mattress               
 avg(num_days_late)   | 142.11255641500003                     
-RECORD 5-------------------------------

                                                                                

# Does number of days late depend on department?

yes

In [150]:
test_df.groupby('dept_division').avg('num_days_late').show( vertical=True)

[Stage 190:>                                                        (0 + 8) / 8]

-RECORD 0----------------------------------
 dept_division      | Miscellaneous        
 avg(num_days_late) | -1.7233942611909219  
-RECORD 1----------------------------------
 dept_division      | Solid Waste          
 avg(num_days_late) | 3.541793412154981    
-RECORD 2----------------------------------
 dept_division      | Field Operations     
 avg(num_days_late) | -226.1654977071745   
-RECORD 3----------------------------------
 dept_division      | Streets              
 avg(num_days_late) | -23.829909905293455  
-RECORD 4----------------------------------
 dept_division      | Waste Collection     
 avg(num_days_late) | -2.162176598886006   
-RECORD 5----------------------------------
 dept_division      | Code Enforcement ... 
 avg(num_days_late) | -44.46861194686386   
-RECORD 6----------------------------------
 dept_division      | Vector               
 avg(num_days_late) | -1.1180635370092935  
-RECORD 7----------------------------------
 dept_division      | Dangerous 

                                                                                

# How do number of days late depend on department and request type?

depending on the dept_division and the service type requested there will be more or less avg number of days late

In [151]:
test_df.groupby('dept_division','service_request_type').avg('num_days_late').show(vertical=True)

[Stage 194:>                                                        (0 + 8) / 8]

-RECORD 0------------------------------------
 dept_division        | Code Enforcement     
 service_request_type | Overgrown Yard/Trash 
 avg(num_days_late)   | -46.3301358535213    
-RECORD 1------------------------------------
 dept_division        | Shops (Internal)     
 service_request_type | Major Park Improv... 
 avg(num_days_late)   | -280.2546235360405   
-RECORD 2------------------------------------
 dept_division        | Traffic Engineeri... 
 service_request_type | Flashing Beacon N... 
 avg(num_days_late)   | -68.19004095633333   
-RECORD 3------------------------------------
 dept_division        | Shops (Internal)     
 service_request_type | Playgrounds          
 avg(num_days_late)   | -6.624961567266342   
-RECORD 4------------------------------------
 dept_division        | Traffic Engineeri... 
 service_request_type | Parking Issue        
 avg(num_days_late)   | -15.160798689836735  
-RECORD 5------------------------------------
 dept_division        | Code Enfor

                                                                                