In [1]:
import os
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType
import pandas as pd

This exercises use the cases, dept, and source tables from the 311_data on the Codeup MySQL server.

- Read the case, department, and source data into their own spark dataframes.

- Let's see how writing to the local disk works in spark:

    - Write the code necessary to store the source data in both csv and json format, store these as sources_csv and sources_json  
    - Inspect your folder structure. What do you notice?
    Comma seperated and json organizes it into a dictionary format/
   
- Inspect the data in your dataframes. Are the data types appropriate? Write the code necessary to cast the values to the appropriate types.

In [2]:
# Generate spark object to initialize a local Spark JVM process
spark = SparkSession.builder.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/05 12:50:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/07/05 12:50:46 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/07/05 12:50:46 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
23/07/05 12:50:46 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.


In [5]:
# source df
source_df = spark.read.csv("source.csv", sep=",", header=True, inferSchema=True)

# cases df
case_df = spark.read.csv("case.csv", sep=",", header=True, inferSchema=True)

# dept df
dept_df = spark.read.csv("dept.csv", sep=",", header=True, inferSchema=True)

                                                                                

In [9]:
# source df
source_df.write.json("source_json", mode="overwrite")
source_df.write.csv("source_csv", mode="overwrite")


In [10]:
# case df
case_df.write.json("case_json", mode="overwrite")
case_df.write.csv("case_csv", mode="overwrite")

                                                                                

In [11]:
# dept df
dept_df.write.json("dept_dept", mode="overwrite")
dept_df.write.csv("dept_csv", mode="overwrite")

In [14]:
dept_df.dtypes

[('dept_division', 'string'),
 ('dept_name', 'string'),
 ('standardized_dept_name', 'string'),
 ('dept_subject_to_SLA', 'string')]

In [27]:
case_df.dtypes

[('case_id', 'int'),
 ('case_opened_date', 'string'),
 ('case_closed_date', 'string'),
 ('SLA_due_date', 'string'),
 ('case_late', 'string'),
 ('num_days_late', 'double'),
 ('case_closed', 'string'),
 ('dept_division', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'double'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'string')]

In [52]:
# rename columns
case_df = case_df.withColumnRenamed("SLA_due_date", "case_due_date")


In [53]:
case_df.select("case_opened_date").show(5)

+-------------------+
|   case_opened_date|
+-------------------+
|2018-01-01 00:42:00|
|2018-01-01 00:46:00|
|2018-01-01 00:48:00|
|2018-01-01 01:29:00|
|2018-01-01 01:34:00|
+-------------------+
only showing top 5 rows



In [54]:
case_df = case_df.withColumn("case_closed", expr('case_closed == "YES"')).withColumn(
    "case_late", expr('case_late == "YES"'))

#Select just the two columns
case_df.select("case_closed", "case_late").show(5)

+-----------+---------+
|case_closed|case_late|
+-----------+---------+
|       true|    false|
|       true|    false|
|       true|    false|
|       true|    false|
|       true|     true|
+-----------+---------+
only showing top 5 rows



In [59]:
case_df.dtypes

[('case_id', 'int'),
 ('case_opened_date', 'timestamp'),
 ('case_closed_date', 'timestamp'),
 ('case_due_date', 'timestamp'),
 ('case_late', 'boolean'),
 ('num_days_late', 'double'),
 ('case_closed', 'boolean'),
 ('dept_division', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'double'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'string')]

In [56]:
#Before
case_df.select('case_opened_date', 'case_closed_date', 'case_due_date').show()

+-------------------+-------------------+-------------+
|   case_opened_date|   case_closed_date|case_due_date|
+-------------------+-------------------+-------------+
|2018-01-01 00:42:00|2018-01-01 12:29:00| 9/26/20 0:42|
|2018-01-01 00:46:00|2018-01-03 08:11:00|  1/5/18 8:30|
|2018-01-01 00:48:00|2018-01-02 07:57:00|  1/5/18 8:30|
|2018-01-01 01:29:00|2018-01-02 08:13:00| 1/17/18 8:30|
|2018-01-01 01:34:00|2018-01-01 13:29:00|  1/1/18 4:34|
|2018-01-01 06:28:00|2018-01-01 14:38:00| 1/31/18 8:30|
|2018-01-01 06:57:00|2018-01-02 15:32:00| 1/17/18 8:30|
|2018-01-01 06:58:00|2018-01-02 15:32:00| 1/17/18 8:30|
|2018-01-01 06:58:00|2018-01-02 15:32:00| 1/17/18 8:30|
|2018-01-01 06:59:00|2018-01-02 15:32:00| 1/17/18 8:30|
|2018-01-01 07:00:00|2018-01-02 15:32:00| 1/17/18 8:30|
|2018-01-01 07:02:00|2018-01-02 15:32:00| 1/17/18 8:30|
|2018-01-01 07:02:00|2018-01-02 15:33:00| 1/17/18 8:30|
|2018-01-01 07:03:00|2018-01-02 15:32:00| 1/17/18 8:30|
|2018-01-01 07:04:00|2018-01-02 15:33:00| 1/17/1

In [58]:
fmt = "M/d/yy H:mm"

case_df = (
    case_df.withColumn("case_opened_date", to_timestamp("case_opened_date", fmt))
    .withColumn("case_closed_date", to_timestamp("case_closed_date", fmt))
    .withColumn("case_due_date", to_timestamp("case_due_date", fmt))

)

case_df.select("case_opened_date", "case_closed_date", "case_due_date").show(5)

+-------------------+-------------------+-------------------+
|   case_opened_date|   case_closed_date|      case_due_date|
+-------------------+-------------------+-------------------+
|2018-01-01 00:42:00|2018-01-01 12:29:00|2020-09-26 00:42:00|
|2018-01-01 00:46:00|2018-01-03 08:11:00|2018-01-05 08:30:00|
|2018-01-01 00:48:00|2018-01-02 07:57:00|2018-01-05 08:30:00|
|2018-01-01 01:29:00|2018-01-02 08:13:00|2018-01-17 08:30:00|
|2018-01-01 01:34:00|2018-01-01 13:29:00|2018-01-01 04:34:00|
+-------------------+-------------------+-------------------+
only showing top 5 rows



How old is the latest (in terms of days past SLA) currently open issue? How long has the oldest (in terms of days since opened) currently opened issue been open?

In [73]:
case_df.sort('case_opened_date', ascending=False).show(1, vertical=True)

# 2018-08-08 - 14.0 days



-RECORD 0------------------------------------
 case_id              | 1014759619           
 case_opened_date     | 2018-08-08 10:38:00  
 case_closed_date     | null                 
 case_due_date        | 2018-08-22 10:38:00  
 case_late            | false                
 num_days_late        | -13.44305556         
 case_closed          | false                
 dept_division        | Code Enforcement     
 service_request_type | Bandit Signs         
 SLA_days             | 14.0                 
 case_status          | Open                 
 source_id            | CRM_Listener         
 request_address      | 1935  MILITARY DR... 
 council_district     | 3                    
 num_weeks_late       | -1.9204365085714286  
only showing top 1 row



                                                                                

In [91]:
# mask of only open cases
cse_df2 = case_df.filter(case_df.case_closed == "False")

In [93]:
# min and max dates
cse_df2.select(
    min(cse_df2.case_opened_date), max(cse_df2.case_opened_date)
).show()

+---------------------+---------------------+
|min(case_opened_date)|max(case_opened_date)|
+---------------------+---------------------+
|  2017-01-01 13:48:00|  2018-08-08 10:38:00|
+---------------------+---------------------+



2. How many Stray Animal cases are there?

In [98]:
case_df.count()

841704

3. How many service requests that are assigned to the Field Operations department (dept_division) are not classified as "Officer Standby" request type (service_request_type)? 3013

In [128]:
case_df.filter(case_df.dept_division == "Field Operations").where(case_df.service_request_type == "Officer Standby").count()



3013

4. Convert the council_district column to a string column.

In [130]:
# cast to appropriate dtype - council district
case_df = case_df.withColumn("council_district", col("council_district").cast("string"))

5. Extract the year from the case_closed_date column.

In [139]:
case_df.withColumn(
    "year",
    year("case_closed_date")
).show(vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
 num_weeks_late       | -142.6441088         
 year                 | 2018                 
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 2018-01-01 00:46:00  
 case_closed_date     | 2018-01-03 08:11:00  
 case_due_date        | 2018-01-05

6. Convert num_days_late from days to hours in new columns num_hours_late.

In [152]:
case_df.withColumn(
    "num_hours_late",
    round(case_df.num_days_late) * -24).show(vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
 num_weeks_late       | -142.6441088         
 num_hours_late       | 23976.0              
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 2018-01-01 00:46:00  
 case_closed_date     | 2018-01-03 08:11:00  
 case_due_date        | 2018-01-05

7. Join the case data with the source and department data.

In [177]:
# join case and dept
df = (
    case_df
    # left join on dept_division
    .join(dept_df, "dept_division", "left")
    # drop all the columns except for standardized name, as it has much fewer unique values
    .drop(dept_df.dept_division)
    .drop(dept_df.dept_name)
    .drop(case_df.dept_division)
    .withColumnRenamed("standarized_dept_name", "department")
    # convert last col to a boolean
    .withColumn("dept_subject_to_SLA", col("dept_subject_to_SLA")=="YES")
)

df.show(2, vertical=True)

-RECORD 0--------------------------------------
 case_id                | 1014127332           
 case_opened_date       | 2018-01-01 00:42:00  
 case_closed_date       | 2018-01-01 12:29:00  
 case_due_date          | 2020-09-26 00:42:00  
 case_late              | false                
 num_days_late          | -998.5087616000001   
 case_closed            | true                 
 service_request_type   | Stray Animal         
 SLA_days               | 999.0                
 case_status            | Closed               
 source_id              | svcCRMLS             
 request_address        | 2315  EL PASO ST,... 
 council_district       | 5                    
 num_weeks_late         | -142.6441088         
 standardized_dept_name | Animal Care Services 
 dept_subject_to_SLA    | true                 
-RECORD 1--------------------------------------
 case_id                | 1014127333           
 case_opened_date       | 2018-01-01 00:46:00  
 case_closed_date       | 2018-01-03 08:

                                                                                

In [178]:
# join df and source
df = (
    df
    # left join on dept_division
    .join(source_df, "source_id", "right")
    # drop all the columns except for standardized name, as it has much fewer unique values
    .drop(source_df.source_id)
    .drop(df.source_id)
    .withColumnRenamed("source_username", "username")
)

df.show( vertical=True)



-RECORD 0--------------------------------------
 case_id                | 1014128056           
 case_opened_date       | 2018-01-02 08:21:00  
 case_closed_date       | 2018-01-10 08:39:00  
 case_due_date          | 2018-01-05 08:30:00  
 case_late              | true                 
 num_days_late          | 5.00681713           
 case_closed            | true                 
 service_request_type   | Solid Waste Fees ... 
 SLA_days               | 3.00619213           
 case_status            | Closed               
 request_address        | 3214  STONEY FORK... 
 council_district       | 10                   
 num_weeks_late         | 0.71525959           
 standardized_dept_name | Solid Waste          
 dept_subject_to_SLA    | true                 
 username               | Michelle Urrutia     
-RECORD 1--------------------------------------
 case_id                | 1014128366           
 case_opened_date       | 2018-01-02 09:36:00  
 case_closed_date       | 2018-01-03 13:

                                                                                

8. Are there any cases that do not have a request source? No?

In [185]:
case_df.select(case_df.request_address).orderBy(case_df.request_address.desc_nulls_first()).collect()

                                                                                

[Row(request_address='ZURICH and AMSTERDAM'),
 Row(request_address='ZULEMA and SW 42ND ST'),
 Row(request_address='ZULEMA and 41ST ST SW'),
 Row(request_address='ZULEMA and 41ST ST SW'),
 Row(request_address='ZULEMA and 41ST ST SW'),
 Row(request_address='ZULEMA and 41ST ST SW'),
 Row(request_address='ZULEMA and 41ST ST SW'),
 Row(request_address='ZULEMA and 41ST ST SW'),
 Row(request_address='ZULEMA and 41ST ST SW'),
 Row(request_address='ZEBULON DR and MARY TODD DR'),
 Row(request_address='ZEBULON DR and MARY TODD DR'),
 Row(request_address='ZEBULON DR and MARY TODD DR'),
 Row(request_address='ZEBULON DR and MARY TODD DR'),
 Row(request_address='ZARZAMORA S and SAN FERNANDO'),
 Row(request_address='ZARZAMORA S and BARRETT PLACE'),
 Row(request_address='ZARZAMORA N and TEXAS AVE'),
 Row(request_address='ZARZAMORA N and MISTLETOE W'),
 Row(request_address='ZARZAMORA N and COMMERCE ST W'),
 Row(request_address='ZANGS DR and STEMMONS'),
 Row(request_address='ZANGS DR and STEMMONS'),
 Row

9. What are the top 10 service request types in terms of number of requests?

In [190]:
df.groupBy(df.service_request_type).agg(count(df.service_request_type)).sort("count(service_request_type)", ascending=False).show(10)



+--------------------+---------------------------+
|service_request_type|count(service_request_type)|
+--------------------+---------------------------+
|           No Pickup|                      89210|
|Overgrown Yard/Trash|                      66403|
|        Bandit Signs|                      32968|
|        Damaged Cart|                      31163|
|Front Or Side Yar...|                      28920|
|        Stray Animal|                      27361|
|Aggressive Animal...|                      25492|
|Cart Exchange Req...|                      22608|
|Junk Vehicle On P...|                      21649|
|     Pot Hole Repair|                      20827|
+--------------------+---------------------------+
only showing top 10 rows



                                                                                

10. What are the top 10 service request types in terms of average days late?

In [217]:
df.groupBy(df.service_request_type).agg(mean(df.num_days_late), count(df.num_days_late)).sort("avg(num_days_late)", ascending=False).show(10)

[Stage 410:>                                                        (0 + 8) / 8]

+--------------------+------------------+--------------------+
|service_request_type|avg(num_days_late)|count(num_days_late)|
+--------------------+------------------+--------------------+
|  Zoning: Junk Yards|175.95636210420932|                 296|
|Labeling for Used...|162.43032902285717|                   7|
|Record Keeping of...|153.99724039428568|                   7|
|Signage Requied f...|151.63868055333333|                  12|
|Storage of Used M...|142.11255641500003|                   8|
|Zoning: Recycle Yard| 135.9285161247979|                 198|
|Donation Containe...|131.75610506358709|                 155|
|License Requied U...|128.79828704142858|                   7|
|Traffic Signal Gr...|101.79846062200002|                   5|
|           Complaint| 72.87050230311698|                2420|
+--------------------+------------------+--------------------+
only showing top 10 rows



                                                                                

11. Does number of days late depend on department? Yes

In [225]:
df.groupBy(df.standardized_dept_name).agg(count(df.num_days_late), mean(df.num_days_late)).sort("avg(num_days_late)", ascending=False).show(10)

+----------------------+--------------------+-------------------+
|standardized_dept_name|count(num_days_late)| avg(num_days_late)|
+----------------------+--------------------+-------------------+
|      Customer Service|                2889|  59.73709149630082|
|           Solid Waste|              286287|-2.2000575136721685|
|          Metro Health|                5313| -4.911766979607001|
|    Parks & Recreation|               19964| -5.251521960055156|
|  Trans & Cap Impro...|               97841|-20.612837354052726|
|  DSD/Code Enforcement|              323579| -38.36938892614369|
|  Animal Care Services|              119362| -226.5178394055035|
|          City Council|                   0|               null|
+----------------------+--------------------+-------------------+



12. How do number of days late depend on department and request type? certain deptarments and requests can take longer

In [226]:
df.groupBy(df.standardized_dept_name, df.service_request_type).agg(mean(df.num_days_late)).sort("avg(num_days_late)", ascending=False).show(50)

[Stage 480:>                                                        (0 + 8) / 8]

+----------------------+--------------------+------------------+
|standardized_dept_name|service_request_type|avg(num_days_late)|
+----------------------+--------------------+------------------+
|  DSD/Code Enforcement|  Zoning: Junk Yards|175.95636210420932|
|  DSD/Code Enforcement|Labeling for Used...|162.43032902285717|
|  DSD/Code Enforcement|Record Keeping of...|153.99724039428568|
|  DSD/Code Enforcement|Signage Requied f...|151.63868055333333|
|  DSD/Code Enforcement|Storage of Used M...|142.11255641500003|
|  DSD/Code Enforcement|Zoning: Recycle Yard| 135.9285161247979|
|  DSD/Code Enforcement|Donation Containe...|131.75610506358709|
|  DSD/Code Enforcement|License Requied U...|128.79828704142858|
|  Trans & Cap Impro...|Traffic Signal Gr...|101.79846062200002|
|      Customer Service|           Complaint| 72.87050230311696|
|  DSD/Code Enforcement|             Vendors|   66.548098985078|
|    Parks & Recreation|Reservation Assis...|       66.03116319|
|  DSD/Code Enforcement| 

                                                                                