# Data Acquisition

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType

from pyspark.sql.functions import *

In [2]:
spark = SparkSession.builder.getOrCreate()

## Exercise 1
This exercises uses the case.csv, dept.csv, and source.csv files from the san antonio 311 call dataset.

Read the case, department, and source data into their own spark dataframes.

In [3]:
case = spark.read.csv("data/case.csv", header=True, inferSchema=True)

In [4]:
dept = spark.read.csv("data/dept.csv", header=True, inferSchema=True)

In [5]:
source = spark.read.csv("data/source.csv", sep=",", header=True, inferSchema=True)

## Exercise 2
Let's see how writing to the local disk works in spark:

Write the code necessary to store the source data in both csv and json format, store these as sources_csv and sources_json
Inspect your folder structure. What do you notice?

In [6]:
source.write.json("source_json", mode="overwrite")

In [7]:
source.write.csv("source_csv", mode="overwrite")

The csv file does not automatically add headers to the columns like the json file does. The csv file instead seperates the columns by commas while the json file places each line into a dictionary.

## Exercise 3
Inspect the data in your dataframes. Are the data types appropriate? Write the code necessary to cast the values to the appropriate types.

In [8]:
source

DataFrame[source_id: string, source_username: string]

The `'source_id'` is listed as a string even though they are numbers. This will be fine since we are not going to perform any arithmetic with this number.

In [9]:
dept

DataFrame[dept_division: string, dept_name: string, standardized_dept_name: string, dept_subject_to_SLA: string]

In [10]:
dept.groupBy('dept_subject_to_SLA').count().show()

+-------------------+-----+
|dept_subject_to_SLA|count|
+-------------------+-----+
|                YES|   31|
|                 NO|    8|
+-------------------+-----+



The column `'dept_subject_to_SLA'` is listed as a string when it is really a boolean value.

In [11]:
dept = dept.withColumn("dept_subject_to_SLA", expr('dept_subject_to_SLA == "YES"'))

In [12]:
dept.show()

+--------------------+--------------------+----------------------+-------------------+
|       dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+--------------------+--------------------+----------------------+-------------------+
|     311 Call Center|    Customer Service|      Customer Service|               true|
|               Brush|Solid Waste Manag...|           Solid Waste|               true|
|     Clean and Green|Parks and Recreation|    Parks & Recreation|               true|
|Clean and Green N...|Parks and Recreation|    Parks & Recreation|               true|
|    Code Enforcement|Code Enforcement ...|  DSD/Code Enforcement|               true|
|Code Enforcement ...|Code Enforcement ...|  DSD/Code Enforcement|               true|
|Code Enforcement ...|                null|  DSD/Code Enforcement|               true|
|   Dangerous Premise|Code Enforcement ...|  DSD/Code Enforcement|               true|
|Dangerous Premise...|Code Enforcement ...|

In [13]:
case

DataFrame[case_id: int, case_opened_date: string, case_closed_date: string, SLA_due_date: string, case_late: string, num_days_late: double, case_closed: string, dept_division: string, service_request_type: string, SLA_days: double, case_status: string, source_id: string, request_address: string, council_district: int]

- `'case_id'` snd `'council_district'` needs to be changed into a string since we are not going to do any artithmetic.
- `'case_closed'` and `'case_late'` will need to be changed into boolean values
- `"case_opened_date"`, `"case_closed_date"`, `"case_due_date"` all need to be changed into a datetime format

In [14]:
case = case.withColumnRenamed("SLA_due_date", "case_due_date")

In [15]:
case

DataFrame[case_id: int, case_opened_date: string, case_closed_date: string, case_due_date: string, case_late: string, num_days_late: double, case_closed: string, dept_division: string, service_request_type: string, SLA_days: double, case_status: string, source_id: string, request_address: string, council_district: int]

In [16]:
case = case.withColumn("council_district", col("council_district").cast("string")).withColumn(
    "case_id", col("case_id").cast("string"))

In [17]:
case.select("council_district", "case_id")

DataFrame[council_district: string, case_id: string]

Values are now strings and no longer integers

In [18]:
case.groupBy("case_closed", "case_late").count().show()

+-----------+---------+------+
|case_closed|case_late| count|
+-----------+---------+------+
|         NO|      YES|  6525|
|        YES|      YES| 87978|
|         NO|       NO| 11585|
|        YES|       NO|735616|
+-----------+---------+------+



In [19]:
case = case.withColumn("case_closed", expr('case_closed == "YES"')).withColumn(
    "case_late", expr('case_late == "YES"')
)

In [20]:
case.select("case_closed", "case_late")

DataFrame[case_closed: boolean, case_late: boolean]

Values are now booleans and no longer strings

In [21]:
case.select("case_opened_date", "case_closed_date", "case_due_date").show(5)

+----------------+----------------+-------------+
|case_opened_date|case_closed_date|case_due_date|
+----------------+----------------+-------------+
|     1/1/18 0:42|    1/1/18 12:29| 9/26/20 0:42|
|     1/1/18 0:46|     1/3/18 8:11|  1/5/18 8:30|
|     1/1/18 0:48|     1/2/18 7:57|  1/5/18 8:30|
|     1/1/18 1:29|     1/2/18 8:13| 1/17/18 8:30|
|     1/1/18 1:34|    1/1/18 13:29|  1/1/18 4:34|
+----------------+----------------+-------------+
only showing top 5 rows



In [22]:
fmt = "M/d/yy H:mm"
case = (
    case.withColumn("case_opened_date", to_timestamp("case_opened_date", fmt))
    .withColumn("case_closed_date", to_timestamp("case_opened_date", fmt))
    .withColumn("case_due_date", to_timestamp("case_opened_date", fmt))
)

In [23]:
case.select("case_opened_date", "case_closed_date", "case_due_date").show(5)

+-------------------+-------------------+-------------------+
|   case_opened_date|   case_closed_date|      case_due_date|
+-------------------+-------------------+-------------------+
|2018-01-01 00:42:00|2018-01-01 00:42:00|2018-01-01 00:42:00|
|2018-01-01 00:46:00|2018-01-01 00:46:00|2018-01-01 00:46:00|
|2018-01-01 00:48:00|2018-01-01 00:48:00|2018-01-01 00:48:00|
|2018-01-01 01:29:00|2018-01-01 01:29:00|2018-01-01 01:29:00|
|2018-01-01 01:34:00|2018-01-01 01:34:00|2018-01-01 01:34:00|
+-------------------+-------------------+-------------------+
only showing top 5 rows



Now that we have the data as the appropriate types, we can make a few transformations to the data.

In [24]:
case.select("request_address").show(5)

+--------------------+
|     request_address|
+--------------------+
|2315  EL PASO ST,...|
|2215  GOLIAD RD, ...|
|102  PALFREY ST W...|
|114  LA GARDE ST,...|
|734  CLEARVIEW DR...|
+--------------------+
only showing top 5 rows



In [25]:
case = case.withColumn("request_address", trim(lower(case.request_address)))

In [26]:
case.select("request_address").show(5)

+--------------------+
|     request_address|
+--------------------+
|2315  el paso st,...|
|2215  goliad rd, ...|
|102  palfrey st w...|
|114  la garde st,...|
|734  clearview dr...|
+--------------------+
only showing top 5 rows



We will convert the number of days a case is late to a number of weeks.

In [27]:
case = case.withColumn(
    "num_weeks_late", expr("num_days_late / 7 AS num_weeks_late")
)

case.select("num_days_late", "num_weeks_late").show(5)

+-------------------+--------------------+
|      num_days_late|      num_weeks_late|
+-------------------+--------------------+
| -998.5087616000001|        -142.6441088|
|-2.0126041669999997|-0.28751488099999994|
|       -3.022337963|-0.43176256614285713|
|       -15.01148148| -2.1444973542857144|
|0.37216435200000003|         0.053166336|
+-------------------+--------------------+
only showing top 5 rows



Lastly, we can format the council district column a little differently. We'll add leading 0s to it.

In [28]:
case = case.withColumn("council_district", col("council_district").cast("int"))

# '%03d' means at least 3 digits, pad with 0s
#
# In order to use the format_string function the way we are, we'll need to
# convert council_district back to an integer temporarily, but the final output
# will be a string.
case = case.withColumn(
    "council_district",
    format_string("%03d", col("council_district").cast("int")),
)

In [29]:
case.select("council_district").show(5)

+----------------+
|council_district|
+----------------+
|             005|
|             003|
|             003|
|             003|
|             007|
+----------------+
only showing top 5 rows



We can create a new feature based on existing data. We will extract the zipcode from the address.

In [30]:
case = case.withColumn("zipcode", regexp_extract("request_address", r"\d+$", 0))

In [31]:
case.select("zipcode").show(5)

+-------+
|zipcode|
+-------+
|  78207|
|  78223|
|  78223|
|  78223|
|  78228|
+-------+
only showing top 5 rows



Next we will create several new, related columns:

- `case_age`: How old the case is; the difference in days between when the case was opened and the current day
- `days_to_closed`: The number of days between when the case was opened and when it was closed
- `case_lifetime`: Number of days between when the case was opened and when it was closed, if the case is still open, the number of days since the case was opened

In [32]:
case = (
    case.withColumn(
        "case_age", datediff(current_timestamp(), "case_opened_date")
    )
    .withColumn(
        "days_to_closed", datediff("case_closed_date", "case_opened_date")
    )
    .withColumn(
        "case_lifetime",
        when(expr("! case_closed"), col("case_age")).otherwise(
            col("days_to_closed")
        ),
    )
)

In [33]:
case.select(
    "case_closed",
    "case_opened_date",
    "case_closed_date",
    "case_age",
    "days_to_closed",
    "case_lifetime",
).where(expr("case_closed")).show(5)

+-----------+-------------------+-------------------+--------+--------------+-------------+
|case_closed|   case_opened_date|   case_closed_date|case_age|days_to_closed|case_lifetime|
+-----------+-------------------+-------------------+--------+--------------+-------------+
|       true|2018-01-01 00:42:00|2018-01-01 00:42:00|    1065|             0|            0|
|       true|2018-01-01 00:46:00|2018-01-01 00:46:00|    1065|             0|            0|
|       true|2018-01-01 00:48:00|2018-01-01 00:48:00|    1065|             0|            0|
|       true|2018-01-01 01:29:00|2018-01-01 01:29:00|    1065|             0|            0|
|       true|2018-01-01 01:34:00|2018-01-01 01:34:00|    1065|             0|            0|
+-----------+-------------------+-------------------+--------+--------------+-------------+
only showing top 5 rows



In [34]:
case.select(
    "case_closed",
    "case_opened_date",
    "case_closed_date",
    "case_age",
    "days_to_closed",
    "case_lifetime",
).where(expr("! case_closed")).show(5)

+-----------+-------------------+-------------------+--------+--------------+-------------+
|case_closed|   case_opened_date|   case_closed_date|case_age|days_to_closed|case_lifetime|
+-----------+-------------------+-------------------+--------+--------------+-------------+
|      false|2018-01-02 09:39:00|2018-01-02 09:39:00|    1064|             0|         1064|
|      false|2018-01-02 10:49:00|2018-01-02 10:49:00|    1064|             0|         1064|
|      false|2018-01-02 13:45:00|2018-01-02 13:45:00|    1064|             0|         1064|
|      false|2018-01-02 14:09:00|2018-01-02 14:09:00|    1064|             0|         1064|
|      false|2018-01-02 14:34:00|2018-01-02 14:34:00|    1064|             0|         1064|
+-----------+-------------------+-------------------+--------+--------------+-------------+
only showing top 5 rows



We can now join the department data onto the case data using the `dept_division` column.

In [35]:
case = (
    case
    # left join on dept_division
    .join(dept, "dept_division", "left")
    # drop all the columns except for standardized name, as it has much fewer unique values
    .drop(dept.dept_division)
    .drop(dept.dept_name)
    .withColumnRenamed("standardized_dept_name", "department")
    # convert to a boolean
    .withColumn("dept_subject_to_SLA", col("dept_subject_to_SLA") == "YES")
    #  left join on source_id
    .join(source, 'source_id', 'left')
)
# # Train Test Split
# train, test = df.randomSplit([.8, .2], seed=123)
# train, validate, test = df.randomSplit([.7, .15, .15], seed=123)

In [36]:
case.show(2, vertical=True)

-RECORD 0------------------------------------
 source_id            | svcCRMLS             
 dept_division        | Field Operations     
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 00:42:00  
 case_due_date        | 2018-01-01 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 request_address      | 2315  el paso st,... 
 council_district     | 005                  
 num_weeks_late       | -142.6441088         
 zipcode              | 78207                
 case_age             | 1065                 
 days_to_closed       | 0                    
 case_lifetime        | 0                    
 department           | Animal Care Services 
 dept_subject_to_SLA  | true      

## Question 1
How old is the latest (in terms of days past SLA) currently open issue? How long has the oldest (in terms of days since opened) currently opened issue been open?


In case we want to query our dataframe with Spark SQL:

In [37]:
case.createOrReplaceTempView('case')

In [38]:
spark.sql(
'''
SELECT DATEDIFF(current_timestamp, case_due_date) AS days_past_due
FROM case
WHERE NOT case_closed
ORDER BY days_past_due DESC
LIMIT 15
'''
).show()

+-------------+
|days_past_due|
+-------------+
|         1430|
|         1430|
|         1429|
|         1428|
|         1427|
|         1427|
|         1427|
|         1427|
|         1427|
|         1427|
|         1427|
|         1427|
|         1427|
|         1427|
|         1427|
+-------------+



## Question 2
How many Stray Animal cases are there?

In [39]:
case.filter(col("service_request_type") == "Stray Animal").count()

27361

## Question 3
How many service requests that are assigned to the Field Operations department (dept_division) are not classified as "Officer Standby" request type (service_request_type)?

In [40]:
(case
     .filter(col("dept_division") == "Field Operations")
     .filter(col("service_request_type") != "Officer Standby")
     .count()
)

116295

## Question 4
Convert the council_district column to a string column.

In [41]:
case.withColumn("council_district", col("council_district").cast("string"))
case.select("council_district")

DataFrame[council_district: string]

## Question 5
Extract the year from the case_closed_date column.

In [42]:
case = case.withColumn("case_closed_year", year("case_closed_date"))

In [43]:
case.select("case_closed_date", "case_closed_year").show()

+-------------------+----------------+
|   case_closed_date|case_closed_year|
+-------------------+----------------+
|2018-01-01 00:42:00|            2018|
|2018-01-01 00:46:00|            2018|
|2018-01-01 00:48:00|            2018|
|2018-01-01 01:29:00|            2018|
|2018-01-01 01:34:00|            2018|
|2018-01-01 06:28:00|            2018|
|2018-01-01 06:57:00|            2018|
|2018-01-01 06:58:00|            2018|
|2018-01-01 06:58:00|            2018|
|2018-01-01 06:59:00|            2018|
|2018-01-01 07:00:00|            2018|
|2018-01-01 07:02:00|            2018|
|2018-01-01 07:02:00|            2018|
|2018-01-01 07:03:00|            2018|
|2018-01-01 07:04:00|            2018|
|2018-01-01 07:04:00|            2018|
|2018-01-01 07:05:00|            2018|
|2018-01-01 07:06:00|            2018|
|2018-01-01 07:06:00|            2018|
|2018-01-01 07:07:00|            2018|
+-------------------+----------------+
only showing top 20 rows



In [44]:
case.select("case_closed_date", "case_closed_year").tail(5)

[Row(case_closed_date=datetime.datetime(2017, 12, 31, 22, 23), case_closed_year=2017),
 Row(case_closed_date=datetime.datetime(2017, 12, 31, 22, 26), case_closed_year=2017),
 Row(case_closed_date=datetime.datetime(2017, 12, 31, 22, 27), case_closed_year=2017),
 Row(case_closed_date=datetime.datetime(2017, 12, 31, 22, 41), case_closed_year=2017),
 Row(case_closed_date=datetime.datetime(2017, 12, 31, 22, 44), case_closed_year=2017)]

## Question 6
Convert num_days_late from days to hours in new columns num_hours_late.

In [45]:
case = case.withColumn(
    "num_hours_late", expr("num_days_late * 24 AS num_hours_late")
)

In [46]:
case.select("num_days_late", "num_hours_late").show(5)

+-------------------+-------------------+
|      num_days_late|     num_hours_late|
+-------------------+-------------------+
| -998.5087616000001|     -23964.2102784|
|-2.0126041669999997|-48.302500007999996|
|       -3.022337963|      -72.536111112|
|       -15.01148148|      -360.27555552|
|0.37216435200000003|  8.931944448000001|
+-------------------+-------------------+
only showing top 5 rows



In [47]:
#case = case.withColumn("num_hours_late", (col("num_days_late") * 24))

## Question 7
Join the case data with the source and department data.

Done in the exercises above

## Question 8
Are there any cases that do not have a request source?

In [48]:
case.filter(col('source_id').isNull()).show(vertical=True)

(0 rows)



## Question 9
What are the top 10 service request types in terms of number of requests?

In [49]:
(case
    .groupBy("service_request_type")
    .count()
    .sort(col("count").desc())
    .show(10)
)

+--------------------+-----+
|service_request_type|count|
+--------------------+-----+
|           No Pickup|89210|
|Overgrown Yard/Trash|66403|
|        Bandit Signs|32968|
|        Damaged Cart|31163|
|Front Or Side Yar...|28920|
|        Stray Animal|27361|
|Aggressive Animal...|25492|
|Cart Exchange Req...|22608|
|Junk Vehicle On P...|21649|
|     Pot Hole Repair|20827|
+--------------------+-----+
only showing top 10 rows



## Question 10
What are the top 10 service request types in terms of average days late?

In [50]:
(case
    .where("case_late")
    .groupBy("service_request_type")
    .mean("num_days_late")
    .sort(col("avg(num_days_late)").desc())
    .show(10)
)

+--------------------+------------------+
|service_request_type|avg(num_days_late)|
+--------------------+------------------+
|Zoning: Recycle Yard|210.89201994318182|
|  Zoning: Junk Yards|200.20517608494276|
|Structure/Housing...|190.20707698509804|
|Donation Containe...|171.09115313942618|
|Storage of Used M...|163.96812829714287|
|Labeling for Used...|162.43032902285717|
|Record Keeping of...|153.99724039428568|
|Signage Requied f...|151.63868055333333|
|Traffic Signal Gr...|137.64583330000002|
|License Requied U...|128.79828704142858|
+--------------------+------------------+
only showing top 10 rows



In [54]:
# What are the top 10 service request types in terms of average days late?
# - just the late cases
# - for the late cases:
#   - what is the average number of days late by request type?
(case
    .where('case_late') # just the rows where case_late == true
    .groupBy('service_request_type')
    .agg(mean('num_days_late').alias('n_days_late'), count('*').alias('n_cases'))
    .sort(desc('n_days_late'))
    .show(10, truncate=False)
)

+--------------------------------------+------------------+-------+
|service_request_type                  |n_days_late       |n_cases|
+--------------------------------------+------------------+-------+
|Zoning: Recycle Yard                  |210.89201994318182|132    |
|Zoning: Junk Yards                    |200.20517608494276|262    |
|Structure/Housing Maintenance         |190.20707698509804|51     |
|Donation Container Enforcement        |171.09115313942618|122    |
|Storage of Used Mattress              |163.96812829714287|7      |
|Labeling for Used Mattress            |162.43032902285717|7      |
|Record Keeping of Used Mattresses     |153.99724039428568|7      |
|Signage Requied for Sale of Used Mattr|151.63868055333333|12     |
|Traffic Signal Graffiti               |137.64583330000002|4      |
|License Requied Used Mattress Sales   |128.79828704142858|7      |
+--------------------------------------+------------------+-------+
only showing top 10 rows



## Question 11
Does number of days late depend on department?

In [61]:
# (case
#     .groupBy("department")
#     .mean("num_days_late")
#     .sort(col("avg(num_days_late)").desc())
#     .show()
# )

In [58]:

# Does number of days late depend on department?
(case
    .filter('case_late')
    .groupby('department')
    .agg(mean('num_days_late').alias('days_late'), count('num_days_late').alias('n_cases_late'))
    .sort('days_late')
    .withColumn('days_late', round(col('days_late'), 1))
    .show(truncate=False)
)

+------------------------+---------+------------+
|department              |days_late|n_cases_late|
+------------------------+---------+------------+
|Metro Health            |6.5      |854         |
|Solid Waste             |7.1      |33729       |
|Trans & Cap Improvements|10.7     |5529        |
|Parks & Recreation      |22.4     |3810        |
|Animal Care Services    |23.4     |23751       |
|DSD/Code Enforcement    |49.5     |26439       |
|Customer Service        |88.2     |2035        |
+------------------------+---------+------------+



In [60]:
case.groupby('department').count().show(truncate=False)

+------------------------+------+
|department              |count |
+------------------------+------+
|Solid Waste             |286287|
|Animal Care Services    |119362|
|Trans & Cap Improvements|97841 |
|Parks & Recreation      |19964 |
|Customer Service        |2889  |
|Metro Health            |5313  |
|City Council            |34    |
|DSD/Code Enforcement    |323579|
+------------------------+------+



## Question 12
How do number of days late depend on department and request type?

In [65]:
# (case
#     .groupBy("department", "service_request_type")
#     .mean("num_days_late")
#     .sort(col("avg(num_days_late)").desc())
#     .show()
# )

In [64]:
(case
    .filter("case_closed")
    .filter("case_late")
    .groupby("department", "service_request_type")
    .agg(avg("num_days_late").alias("days_late"), count("*").alias("n_cases"))
    .withColumn("days_late", round(col("days_late"), 1))
    .sort(desc("days_late"))
    .show(40, truncate=False)
)

+--------------------+--------------------------------------------+---------+-------+
|department          |service_request_type                        |days_late|n_cases|
+--------------------+--------------------------------------------+---------+-------+
|DSD/Code Enforcement|Zoning: Recycle Yard                        |273.6    |75     |
|DSD/Code Enforcement|Zoning: Junk Yards                          |251.9    |146    |
|DSD/Code Enforcement|Donation Container Enforcement              |201.7    |82     |
|DSD/Code Enforcement|Structure/Housing Maintenance               |182.4    |30     |
|DSD/Code Enforcement|Graffiti: Private Property (Corridors)      |175.1    |3      |
|DSD/Code Enforcement|Storage of Used Mattress                    |164.0    |7      |
|DSD/Code Enforcement|Labeling for Used Mattress                  |162.4    |7      |
|DSD/Code Enforcement|Record Keeping of Used Mattresses           |154.0    |7      |
|DSD/Code Enforcement|Signage Requied for Sale of Used

## Bonus: 
You might have noticed that the latest date in the dataset is fairly far off from the present day. To account for this, replace any occurances of the current time with the maximum date from the dataset.

Quick Recap: getting data from spark dataframes:
        
- .show(n): prints the first n rows. Doesn't produce a value that can be used later
- .first: gives us the first row object
- .head(n): gives us a list of the first n row objects
- .collect(): turns all the rows into a list of row objects be careful here

In [75]:
case.select(max('case_opened_date'), max('case_closed_date')).collect()

[Row(max(case_opened_date)=datetime.datetime(2018, 8, 8, 10, 38), max(case_closed_date)=datetime.datetime(2018, 8, 8, 10, 38))]

In [76]:
max_date = case.select(max('case_opened_date'), max('case_closed_date')).first()[0]
max_date

datetime.datetime(2018, 8, 8, 10, 38)

In [77]:
max_date = max_date.strftime('%Y-%m-%d %H:%M:%S')
max_date

'2018-08-08 10:38:00'

In [78]:
case = (
    case.withColumn('case_age', datediff(lit(max_date), 'case_opened_date'))
    .withColumn('days_to_closed', datediff('case_closed_date', 'case_opened_date'))
    .withColumn('case_lifetime', when(col('case_closed'), col('days_to_closed')).otherwise(col('case_age')))
    .drop('case_age', 'days_to_closed')
)

### Sidebar: Python Code Formatting

indentation conveys nesting

autoformatting tools remove the discussion

In [None]:
df = (df.withColumn('case_age', datediff(lit(max_date), 'case_opened_date')).withColumn('days_to_closed', datediff('case_closed_date', 'case_opened_date')).withColumn('case_lifetime', when(col('case_closed'), col('days_to_closed')).otherwise(col('case_age'))).drop('case_age', 'days_to_closed'))