In [1]:
# imports 
import wrangle

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.getOrCreate()


Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/20 15:57:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/05/20 15:57:10 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


## Data Acquisition

This exercises uses the case.csv, dept.csv, and source.csv files from the san antonio 311 call dataset.

In [2]:
source = spark.read.csv("source.csv", sep=",", header=True, inferSchema=True)
case = spark.read.csv("case.csv", sep=",", header=True, inferSchema=True)
dept = spark.read.csv("dept.csv", sep=",", header=True, inferSchema=True)


                                                                                

------------

Let's see how writing to the local disk works in spark:

Write the code necessary to store the source data in both csv and json format, store these as sources_csv and sources_json
Inspect your folder structure. What do you notice?

Answer: I noticed it created directories for both csv and json.

In [3]:
# store source data in a .csv and json file store these as sources_csv and sources_json
source.write.csv("sources_csv", mode="overwrite")
source.write.json("sources_json", mode="overwrite")


Inspect the data in your dataframes. Are the data types appropriate? Write the code necessary to cast the values to the appropriate types.

In [4]:
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType(
    [
        StructField("source_id", StringType()),
        StructField("source_username", StringType()),
    ]
)

spark.read.csv("source.csv", header=True, schema=schema)


DataFrame[source_id: string, source_username: string]

In [5]:
# Prep the data before exploration

# change to_timestamp, fmt

fmt = "M/d/yy H:mm"

case = case.withColumn("case_opened_date", to_timestamp(col("case_opened_date"), fmt)).withColumn(
    "case_closed_date", to_timestamp(col("case_closed_date"), fmt)).withColumn(
        "SLA_due_date", to_timestamp(col("SLA_due_date"), fmt))

In [6]:
case = case.withColumn('case_closed', expr('case_closed == "YES"')).withColumn(
    'case_late', expr('case_late == "YES"'))

In [7]:
case.dtypes

[('case_id', 'int'),
 ('case_opened_date', 'timestamp'),
 ('case_closed_date', 'timestamp'),
 ('SLA_due_date', 'timestamp'),
 ('case_late', 'boolean'),
 ('num_days_late', 'double'),
 ('case_closed', 'boolean'),
 ('dept_division', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'double'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'int')]

In [8]:
# check out dept.csv
dept.show(3)

+---------------+--------------------+----------------------+-------------------+
|  dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+---------------+--------------------+----------------------+-------------------+
|311 Call Center|    Customer Service|      Customer Service|                YES|
|          Brush|Solid Waste Manag...|           Solid Waste|                YES|
|Clean and Green|Parks and Recreation|    Parks & Recreation|                YES|
+---------------+--------------------+----------------------+-------------------+
only showing top 3 rows



In [9]:
dept.dtypes

[('dept_division', 'string'),
 ('dept_name', 'string'),
 ('standardized_dept_name', 'string'),
 ('dept_subject_to_SLA', 'string')]

------------------

## Part 2:

 ### How old is the latest (in terms of days past SLA) currently open issue? 
 
 ### How long has the oldest (in terms of days since opened) currently opened issue been open?

In [10]:
# look at open cases
open_cases = case.where(case.case_closed == False)

In [11]:
open_cases.select("*").show(2, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014128388           
 case_opened_date     | 2018-01-02 09:39:00  
 case_closed_date     | null                 
 SLA_due_date         | 2018-01-09 09:39:00  
 case_late            | true                 
 num_days_late        | 211.5974884          
 case_closed          | false                
 dept_division        | 311 Call Center      
 service_request_type | Complaint            
 SLA_days             | 7.0                  
 case_status          | Open                 
 source_id            | mt13131              
 request_address      | 7326  WESTGLADE P... 
 council_district     | 6                    
-RECORD 1------------------------------------
 case_id              | 1014128790           
 case_opened_date     | 2018-01-02 10:49:00  
 case_closed_date     | null                 
 SLA_due_date         | 2018-05-10 10:49:00  
 case_late            | true                 
 num_days_late        | 90.5492592

In [12]:
open_cases = open_cases.withColumn('days_past_sla', datediff(current_timestamp(), col('SLA_due_date'))).withColumn(
    'days_open', datediff(current_timestamp(), 'case_opened_date'))

In [13]:
open_cases.sort(desc("days_past_sla")).show(1, vertical=True)



-RECORD 0------------------------------------
 case_id              | 1013225646           
 case_opened_date     | 2017-01-01 13:48:00  
 case_closed_date     | null                 
 SLA_due_date         | 2017-01-17 08:30:00  
 case_late            | true                 
 num_days_late        | 348.6458333          
 case_closed          | false                
 dept_division        | Code Enforcement     
 service_request_type | No Address Posted    
 SLA_days             | 15.77859954          
 case_status          | Open                 
 source_id            | svcCRMSS             
 request_address      | 7299  SHADOW RIDG... 
 council_district     | 6                    
 days_past_sla        | 1949                 
 days_open            | 1965                 
only showing top 1 row



                                                                                

### How many Stray Animal cases are there?

In [14]:
case.groupBy('service_request_type').count().orderBy('count', ascending=False).show()



+--------------------+-----+
|service_request_type|count|
+--------------------+-----+
|           No Pickup|86855|
|Overgrown Yard/Trash|65895|
|        Bandit Signs|32910|
|        Damaged Cart|30338|
|Front Or Side Yar...|28794|
|        Stray Animal|26760|
|Aggressive Animal...|24882|
|Cart Exchange Req...|22024|
|Junk Vehicle On P...|21473|
|     Pot Hole Repair|20616|
|Alley-Way Mainten...|20214|
|    Lost/Stolen Cart|18731|
|Right Of Way/Side...|17699|
|   Dead Animal - Dog|16714|
|       Cart Delivery|15471|
|   Dead Animal - Cat|14983|
|      Animal Neglect|13441|
|  Dead Animal - Misc|13234|
|Trapped/Confined ...|11354|
|Public Nuisance(O...|10715|
+--------------------+-----+
only showing top 20 rows



                                                                                

In [15]:
case.where(expr('service_request_type = "Stray Animal"')).count()

                                                                                

26760

### How many service requests that are assigned to the Field Operations department (dept_division) are not classified as "Officer Standby" request type (service_request_type)?

In [16]:
case.where(case.dept_division == "Field Operations").where(case.service_request_type != "Officer Standby").count()

                                                                                

113902

### Convert the council_district column to a string column.

In [17]:
case.dtypes

[('case_id', 'int'),
 ('case_opened_date', 'timestamp'),
 ('case_closed_date', 'timestamp'),
 ('SLA_due_date', 'timestamp'),
 ('case_late', 'boolean'),
 ('num_days_late', 'double'),
 ('case_closed', 'boolean'),
 ('dept_division', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'double'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'int')]

In [18]:
# convert council_district from an integer to a string
case = case.withColumn('council_district', col('council_district').cast('string'))

In [19]:
# check out the data
case.select('council_district').show(5)

+----------------+
|council_district|
+----------------+
|               5|
|               3|
|               3|
|               3|
|               7|
+----------------+
only showing top 5 rows



In [20]:
# check types
case.dtypes

[('case_id', 'int'),
 ('case_opened_date', 'timestamp'),
 ('case_closed_date', 'timestamp'),
 ('SLA_due_date', 'timestamp'),
 ('case_late', 'boolean'),
 ('num_days_late', 'double'),
 ('case_closed', 'boolean'),
 ('dept_division', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'double'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'string')]

### Extract the year from the case_closed_date column.

In [21]:
# convert case_closed_date to datetime
case = case.withColumn('case_closed_date', to_timestamp('case_closed_date', fmt))

In [22]:
# create new column with only year
case = case.withColumn('year', year('case_closed_date'))

In [23]:
# look at data
case.show(2, vertical=True, truncate=False)

-RECORD 0----------------------------------------------------
 case_id              | 1014127332                           
 case_opened_date     | 2018-01-01 00:42:00                  
 case_closed_date     | 2018-01-01 12:29:00                  
 SLA_due_date         | 2020-09-26 00:42:00                  
 case_late            | false                                
 num_days_late        | -998.5087616000001                   
 case_closed          | true                                 
 dept_division        | Field Operations                     
 service_request_type | Stray Animal                         
 SLA_days             | 999.0                                
 case_status          | Closed                               
 source_id            | svcCRMLS                             
 request_address      | 2315  EL PASO ST, San Antonio, 78207 
 council_district     | 5                                    
 year                 | 2018                                 
-RECORD 

### Convert num_days_late from days to hours in new columns num_hours_late.

In [24]:
case = case.select('*', (case.num_days_late * 24).alias('num_hours_late'))   

In [25]:
case.show(2, vertical=True, truncate=False)

-RECORD 0----------------------------------------------------
 case_id              | 1014127332                           
 case_opened_date     | 2018-01-01 00:42:00                  
 case_closed_date     | 2018-01-01 12:29:00                  
 SLA_due_date         | 2020-09-26 00:42:00                  
 case_late            | false                                
 num_days_late        | -998.5087616000001                   
 case_closed          | true                                 
 dept_division        | Field Operations                     
 service_request_type | Stray Animal                         
 SLA_days             | 999.0                                
 case_status          | Closed                               
 source_id            | svcCRMLS                             
 request_address      | 2315  EL PASO ST, San Antonio, 78207 
 council_district     | 5                                    
 year                 | 2018                                 
 num_hou

### Join the case data with the source and department data.

In [26]:
# check keys of source, dept, and case
source.show(2, vertical=True, truncate=False)

-RECORD 0---------------------------
 source_id       | 100137           
 source_username | Merlene Blodgett 
-RECORD 1---------------------------
 source_id       | 103582           
 source_username | Carmen Cura      
only showing top 2 rows



In [27]:
dept.show(2, vertical=True, truncate=False)

-RECORD 0----------------------------------------
 dept_division          | 311 Call Center        
 dept_name              | Customer Service       
 standardized_dept_name | Customer Service       
 dept_subject_to_SLA    | YES                    
-RECORD 1----------------------------------------
 dept_division          | Brush                  
 dept_name              | Solid Waste Management 
 standardized_dept_name | Solid Waste            
 dept_subject_to_SLA    | YES                    
only showing top 2 rows



In [28]:
case.show(2, vertical=True, truncate=False)

-RECORD 0----------------------------------------------------
 case_id              | 1014127332                           
 case_opened_date     | 2018-01-01 00:42:00                  
 case_closed_date     | 2018-01-01 12:29:00                  
 SLA_due_date         | 2020-09-26 00:42:00                  
 case_late            | false                                
 num_days_late        | -998.5087616000001                   
 case_closed          | true                                 
 dept_division        | Field Operations                     
 service_request_type | Stray Animal                         
 SLA_days             | 999.0                                
 case_status          | Closed                               
 source_id            | svcCRMLS                             
 request_address      | 2315  EL PASO ST, San Antonio, 78207 
 council_district     | 5                                    
 year                 | 2018                                 
 num_hou

In [29]:
# Join case and dept due to commomn keys
case_dept = (case
            .join(dept, on='dept_division', how='left')
            .drop(dept.dept_division)
            .drop(dept.dept_name)
            .drop(case.dept_division)
            .withColumnRenamed('standardized_dept_name', 'department')
            .withColumn('dept_subject_to_SLA', col('dept_subject_to_SLA') == 'YES')
)

In [30]:
case_dept.show(2, vertical=True, truncate=False)

-RECORD 0----------------------------------------------------
 case_id              | 1014127332                           
 case_opened_date     | 2018-01-01 00:42:00                  
 case_closed_date     | 2018-01-01 12:29:00                  
 SLA_due_date         | 2020-09-26 00:42:00                  
 case_late            | false                                
 num_days_late        | -998.5087616000001                   
 case_closed          | true                                 
 service_request_type | Stray Animal                         
 SLA_days             | 999.0                                
 case_status          | Closed                               
 source_id            | svcCRMLS                             
 request_address      | 2315  EL PASO ST, San Antonio, 78207 
 council_district     | 5                                    
 year                 | 2018                                 
 num_hours_late       | -23964.2102784                       
 departm

In [31]:
# case_dept to join source on source_id
joined_df = (case_dept
                .join(source, on='source_id', how='left')
)

In [32]:
joined_df.show(2, vertical=True, truncate=False)

-RECORD 0----------------------------------------------------
 source_id            | svcCRMLS                             
 case_id              | 1014127332                           
 case_opened_date     | 2018-01-01 00:42:00                  
 case_closed_date     | 2018-01-01 12:29:00                  
 SLA_due_date         | 2020-09-26 00:42:00                  
 case_late            | false                                
 num_days_late        | -998.5087616000001                   
 case_closed          | true                                 
 service_request_type | Stray Animal                         
 SLA_days             | 999.0                                
 case_status          | Closed                               
 request_address      | 2315  EL PASO ST, San Antonio, 78207 
 council_district     | 5                                    
 year                 | 2018                                 
 num_hours_late       | -23964.2102784                       
 departm

### Are there any cases that do not have a request source?

In [33]:
joined_df.filter('source_id is null').show(5, vertical=True, truncate=False)

[Stage 34:>                                                         (0 + 3) / 3]

(0 rows)



                                                                                

### What are the top 10 service request types in terms of number of requests?

In [34]:
joined_df.groupby('service_request_type').count().sort('count', ascending=False).show(5, vertical=True, truncate=False)

[Stage 37:>                                                         (0 + 4) / 4]

-RECORD 0------------------------------------------
 service_request_type | No Pickup                  
 count                | 89210                      
-RECORD 1------------------------------------------
 service_request_type | Overgrown Yard/Trash       
 count                | 66403                      
-RECORD 2------------------------------------------
 service_request_type | Bandit Signs               
 count                | 32968                      
-RECORD 3------------------------------------------
 service_request_type | Damaged Cart               
 count                | 31163                      
-RECORD 4------------------------------------------
 service_request_type | Front Or Side Yard Parking 
 count                | 28920                      
only showing top 5 rows



                                                                                

### What are the top 10 service request types in terms of average days late?

In [35]:
joined_df.groupby('service_request_type').mean('num_days_late').sort(desc('avg(num_days_late)')).show(5, vertical=True, truncate=False)

[Stage 42:>                                                         (0 + 4) / 4]

-RECORD 0------------------------------------------------------
 service_request_type | Zoning: Junk Yards                     
 avg(num_days_late)   | 175.9563621042095                      
-RECORD 1------------------------------------------------------
 service_request_type | Labeling for Used Mattress             
 avg(num_days_late)   | 162.43032902285717                     
-RECORD 2------------------------------------------------------
 service_request_type | Record Keeping of Used Mattresses      
 avg(num_days_late)   | 153.99724039428568                     
-RECORD 3------------------------------------------------------
 service_request_type | Signage Requied for Sale of Used Mattr 
 avg(num_days_late)   | 151.63868055333333                     
-RECORD 4------------------------------------------------------
 service_request_type | Storage of Used Mattress               
 avg(num_days_late)   | 142.112556415                          
only showing top 5 rows



                                                                                

### Does number of days late depend on department?

In [36]:
joined_df.groupby('department').mean('num_days_late').sort(desc('avg(num_days_late)')).show(5, vertical=True, truncate=False)

[Stage 47:>                                                         (0 + 4) / 4]

-RECORD 0--------------------------------------
 department         | Customer Service         
 avg(num_days_late) | 59.737091496300785       
-RECORD 1--------------------------------------
 department         | Solid Waste              
 avg(num_days_late) | -2.2000575136721747      
-RECORD 2--------------------------------------
 department         | Metro Health             
 avg(num_days_late) | -4.911766979607002       
-RECORD 3--------------------------------------
 department         | Parks & Recreation       
 avg(num_days_late) | -5.251521960055133       
-RECORD 4--------------------------------------
 department         | Trans & Cap Improvements 
 avg(num_days_late) | -20.612837354052626      
only showing top 5 rows



                                                                                

### How do number of days late depend on department and request type?

In [37]:
joined_df.groupby('department', 'service_request_type').mean('num_hours_late').sort(desc('avg(num_hours_late)')).show(5, vertical=True, truncate=False)



-RECORD 0------------------------------------------------------
 department           | DSD/Code Enforcement                   
 service_request_type | Zoning: Junk Yards                     
 avg(num_hours_late)  | 4222.952690501028                      
-RECORD 1------------------------------------------------------
 department           | DSD/Code Enforcement                   
 service_request_type | Labeling for Used Mattress             
 avg(num_hours_late)  | 3898.327896548571                      
-RECORD 2------------------------------------------------------
 department           | DSD/Code Enforcement                   
 service_request_type | Record Keeping of Used Mattresses      
 avg(num_hours_late)  | 3695.9337694628566                     
-RECORD 3------------------------------------------------------
 department           | DSD/Code Enforcement                   
 service_request_type | Signage Requied for Sale of Used Mattr 
 avg(num_hours_late)  | 3639.32833327999

                                                                                

#### Create a wrangle function for 311 data

In [38]:
def calls_311_wrangle():
    """ Function is ment to prep 311 call data """
    # start spark session
    spark = SparkSession.builder.getOrCreate()
    # create a scheme
    schema = StructType([
                        StructField('source_id', StringType()),
                        StructField('source_username', StringType()),
                        ])
    # read in the csv file
    source = spark.read.csv('source.csv', header=True, schema=schema)
    dept = spark.read.csv('dept.csv', header=True)
    case = spark.read.csv('case.csv', header=True)
    # Rename the columns
    case = case.withColumnRenamed('SLA_due_date', 'case_due_date')
    # ser column to boolean
    case = case.withColumn('case_closed', expr('case_closed == "YES"')
                           .withColumnRenamed('case_late', expr('case_late == "YES"'))
                            )
    # conver counsil_distric to string
    case = case.withColumn('council_district', col('council_district').cast('string'))
    # convert datetime
    fmt = 'M/d/yy H:mm'
    # convert case_closed_date and case_open_date to datetime
    case = case.withColumn('case_closed_date', to_timestamp('case_closed_date', fmt)
                           .withColumnRenamed('case_opened_date', 'case_opened_date_datetime', fmt)
                           .withColumnRenamed('case_due_date', 'case_due_date_datetime', fmt))
                           
    # lowercase
    case = case.withColumn('request_address', trim(lower(case.request_address)))
    # create new column
    case = case.withColumn('num_week_late', expr('num_days_late / 7'))
    # create new column for zipcode
    case = case.withColumn('zipcode', regexp_extract(case.request_address, r"(\d+$)", 1))
    # create new columns case_age, days_to_closed, and case_lifetime
    # case =(
    #         case.withColumn(
    #                         'case_age', datediff(current_timestamp(), 'case_opened_date')
    #                         )
    #                        .withColumn(
    #                         'days_to_closed', datediff('case_closed_date', 'case_opened_date')
    #                         )
    #                        .withcolumn(
    #                            'case_lifetime', 
    #                            when(expr("! case_closed"), col("case_age")).otherwise(
    #                                col("days_to_closed")
    #                             ),
    #                         )
    #                     )
    # join the data
    df = (case
        .join(dept, on='dept_division', how='left')
        .drop(dept.dept_division)
        .drop(dept.dept_name)
        .drop(case.dept_division)
        .withColumnRenamed('standardized_dept_name', 'department')
        .withColumn('dept_subject_to_SLA', col('dept_subject_to_SLA') == 'YES')
    )
    return case


In [2]:
df = wrangle.calls_311_wrangle()

In [3]:
df.show(2, vertical=True, truncate=False)

-RECORD 0----------------------------------------------------
 case_id              | 1014127332                           
 case_opened_date     | 2018-01-01 00:42:00                  
 case_closed_date     | 2018-01-01 12:29:00                  
 case_due_date        | 2020-09-26 00:42:00                  
 case_late            | false                                
 num_days_late        | -998.5087616000001                   
 case_closed          | true                                 
 service_request_type | Stray Animal                         
 SLA_days             | 999.0                                
 case_status          | Closed                               
 source_id            | svcCRMLS                             
 request_address      | 2315  el paso st, san antonio, 78207 
 council_district     | 5                                    
 num_week_late        | -142.6441088                         
 zipcode              | 78207                                
 case_ag