In [54]:
import pyspark
import pyspark.sql

from pyspark.sql.functions import *

In [2]:
spark = pyspark.sql.SparkSession.builder.getOrCreate()
spark

# Acquisition

- This exercises uses the case.csv, dept.csv, and source.csv files from the san antonio 311 call dataset.

1. Read the case, department, and source data into their own spark dataframes.

In [3]:
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType(
    [
        StructField("source_id", StringType()),
        StructField("source_username", StringType()),
    ]
)
schema

StructType(List(StructField(source_id,StringType,true),StructField(source_username,StringType,true)))

In [4]:
case = spark.read.csv('case.csv', header=True, inferSchema = True)
case.show(vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 SLA_due_date         | 9/26/20 0:42         
 case_late            | NO                   
 num_days_late        | -998.5087616000001   
 case_closed          | YES                  
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 1/1/18 0:46          
 case_closed_date     | 1/3/18 8:11          
 SLA_due_date         | 1/5/18 8:30          
 case_late            | NO                   
 num_days_late        | -2.0126041

In [5]:
case.printSchema()

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: string (nullable = true)
 |-- case_closed_date: string (nullable = true)
 |-- SLA_due_date: string (nullable = true)
 |-- case_late: string (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: string (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: integer (nullable = true)



In [6]:
dept = spark.read.csv('dept.csv', header=True, inferSchema = True)
dept.show(vertical=True)

-RECORD 0--------------------------------------
 dept_division          | 311 Call Center      
 dept_name              | Customer Service     
 standardized_dept_name | Customer Service     
 dept_subject_to_SLA    | YES                  
-RECORD 1--------------------------------------
 dept_division          | Brush                
 dept_name              | Solid Waste Manag... 
 standardized_dept_name | Solid Waste          
 dept_subject_to_SLA    | YES                  
-RECORD 2--------------------------------------
 dept_division          | Clean and Green      
 dept_name              | Parks and Recreation 
 standardized_dept_name | Parks & Recreation   
 dept_subject_to_SLA    | YES                  
-RECORD 3--------------------------------------
 dept_division          | Clean and Green N... 
 dept_name              | Parks and Recreation 
 standardized_dept_name | Parks & Recreation   
 dept_subject_to_SLA    | YES                  
-RECORD 4-------------------------------

In [7]:
dept.printSchema()

root
 |-- dept_division: string (nullable = true)
 |-- dept_name: string (nullable = true)
 |-- standardized_dept_name: string (nullable = true)
 |-- dept_subject_to_SLA: string (nullable = true)



In [8]:
source = spark.read.csv('source.csv', header = True, inferSchema = True)
source.show(vertical = True)

-RECORD 0-------------------------------
 source_id       | 100137               
 source_username | Merlene Blodgett     
-RECORD 1-------------------------------
 source_id       | 103582               
 source_username | Carmen Cura          
-RECORD 2-------------------------------
 source_id       | 106463               
 source_username | Richard Sanchez      
-RECORD 3-------------------------------
 source_id       | 119403               
 source_username | Betty De Hoyos       
-RECORD 4-------------------------------
 source_id       | 119555               
 source_username | Socorro Quiara       
-RECORD 5-------------------------------
 source_id       | 119868               
 source_username | Michelle San Miguel  
-RECORD 6-------------------------------
 source_id       | 120752               
 source_username | Eva T. Kleiber       
-RECORD 7-------------------------------
 source_id       | 124405               
 source_username | Lori Lara            
-RECORD 8-------

In [9]:
source.printSchema()

root
 |-- source_id: string (nullable = true)
 |-- source_username: string (nullable = true)



- Find shape of all 3 Spark data frames

In [10]:
case.count(), len(case.columns)

(841704, 14)

In [11]:
dept.count(), len(dept.columns)

(39, 4)

In [12]:
source.count(), len(source.columns)

(140, 2)

## 2. Let's see how writing to the local disk works in spark:

- Write the code necessary to store the source data in both `csv` and `json` format, store these as sources_csv and sources_json


- Inspect your folder structure. What do you notice?

In [13]:
source.write.json('data/sources_json', mode = 'overwrite')

In [14]:
source.write.csv('data/sources_csv', mode = 'overwrite')

## 3. Inspect the data in your dataframes. 

- Are the data types appropriate? 

- Write the code necessary to cast the values to the appropriate types.

**Task:** confirm 'case_late' & 'case_closed' are only YES and NO and covert to booleans

In [15]:
case.groupby('case_late', 'case_closed').count().show()

+---------+-----------+------+
|case_late|case_closed| count|
+---------+-----------+------+
|       NO|        YES|735616|
|      YES|        YES| 87978|
|       NO|         NO| 11585|
|      YES|         NO|  6525|
+---------+-----------+------+



In [16]:
case = case.withColumn('case_late', expr('case_late == "YES"'))\
    .withColumn("case_closed", expr('case_closed == "YES"'))                                         

In [17]:
case.select('case_late', 'case_closed').show(3)     

+---------+-----------+
|case_late|case_closed|
+---------+-----------+
|    false|       true|
|    false|       true|
|    false|       true|
+---------+-----------+
only showing top 3 rows



In [18]:
case.printSchema()

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: string (nullable = true)
 |-- case_closed_date: string (nullable = true)
 |-- SLA_due_date: string (nullable = true)
 |-- case_late: boolean (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: boolean (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: integer (nullable = true)



**Task:** convert 'case_id' and 'council_district' to a string

In [19]:
case.groupBy('case_id').count().show(5)

+----------+-----+
|   case_id|count|
+----------+-----+
|1014127358|    1|
|1014127828|    1|
|1014128265|    1|
|1014128399|    1|
|1014128427|    1|
+----------+-----+
only showing top 5 rows



In [20]:
case.groupBy('council_district').count().show()

+----------------+------+
|council_district| count|
+----------------+------+
|               1|119309|
|               6| 74095|
|               3|102706|
|               5|114609|
|               9| 40916|
|               4| 93778|
|               8| 42345|
|               7| 72445|
|              10| 62926|
|               2|114745|
|               0|  3830|
+----------------+------+



In [21]:
case = case.withColumn('case_id', col('case_id').cast('string'))

In [22]:
case = case.withColumn('council_district', col('council_district').cast('string'))

In [23]:
case.printSchema()

root
 |-- case_id: string (nullable = true)
 |-- case_opened_date: string (nullable = true)
 |-- case_closed_date: string (nullable = true)
 |-- SLA_due_date: string (nullable = true)
 |-- case_late: boolean (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: boolean (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: string (nullable = true)



**Task:** rename the 'SLA_due_date' column to 'case_due_date'

In [24]:
case = case.withColumnRenamed('SLA_due_date', 'case_due_date')

In [25]:
case.printSchema()

root
 |-- case_id: string (nullable = true)
 |-- case_opened_date: string (nullable = true)
 |-- case_closed_date: string (nullable = true)
 |-- case_due_date: string (nullable = true)
 |-- case_late: boolean (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: boolean (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: string (nullable = true)



**Task:** hande the 3 columns that have dates in them:

'case_opened_date', 'case_closed_date', 'case_due_date'

In [26]:
print('--- Before handling dates')
case.select('case_opened_date', 'case_closed_date', 'case_due_date').show(5)

fmt = 'M/d/yy H:mm'

case = (
    case.withColumn('case_opened_date', to_timestamp('case_opened_date', fmt))
    .withColumn('case_closed_date', to_timestamp('case_closed_date', fmt))
    .withColumn('case_due_date', to_timestamp('case_due_date', fmt))
)

print('--- After handling dates')
case.select('case_opened_date', 'case_closed_date', 'case_due_date').show(5)

--- Before handling dates
+----------------+----------------+-------------+
|case_opened_date|case_closed_date|case_due_date|
+----------------+----------------+-------------+
|     1/1/18 0:42|    1/1/18 12:29| 9/26/20 0:42|
|     1/1/18 0:46|     1/3/18 8:11|  1/5/18 8:30|
|     1/1/18 0:48|     1/2/18 7:57|  1/5/18 8:30|
|     1/1/18 1:29|     1/2/18 8:13| 1/17/18 8:30|
|     1/1/18 1:34|    1/1/18 13:29|  1/1/18 4:34|
+----------------+----------------+-------------+
only showing top 5 rows

--- After handling dates
+-------------------+-------------------+-------------------+
|   case_opened_date|   case_closed_date|      case_due_date|
+-------------------+-------------------+-------------------+
|2018-01-01 00:42:00|2018-01-01 12:29:00|2020-09-26 00:42:00|
|2018-01-01 00:46:00|2018-01-03 08:11:00|2018-01-05 08:30:00|
|2018-01-01 00:48:00|2018-01-02 07:57:00|2018-01-05 08:30:00|
|2018-01-01 01:29:00|2018-01-02 08:13:00|2018-01-17 08:30:00|
|2018-01-01 01:34:00|2018-01-01 13:29:00

### Data Transformation

- Noralize the request address field. Using the `trim` and `lower` functions lets us strip any leading and trailing whitespace and lowercase everything.

In [27]:
print('--- Before transforming the address')
case.select('request_address').show(5)

case = case.withColumn('request_address', trim(lower(case.request_address)))

print('--- After transforming the address')
case.select('request_address').show(5)

--- Before transforming the address
+--------------------+
|     request_address|
+--------------------+
|2315  EL PASO ST,...|
|2215  GOLIAD RD, ...|
|102  PALFREY ST W...|
|114  LA GARDE ST,...|
|734  CLEARVIEW DR...|
+--------------------+
only showing top 5 rows

--- After transforming the address
+--------------------+
|     request_address|
+--------------------+
|2315  el paso st,...|
|2215  goliad rd, ...|
|102  palfrey st w...|
|114  la garde st,...|
|734  clearview dr...|
+--------------------+
only showing top 5 rows



- Convert the number of days a case is late to a number of weeks

In [28]:
case = case.withColumn(
    'num_weeks_late', expr('num_days_late / 7 AS num_weeks_late'))

In [29]:
case.select('num_days_late', 'num_weeks_late').show(5)

+-------------------+--------------------+
|      num_days_late|      num_weeks_late|
+-------------------+--------------------+
| -998.5087616000001|        -142.6441088|
|-2.0126041669999997|-0.28751488099999994|
|       -3.022337963|-0.43176256614285713|
|       -15.01148148| -2.1444973542857144|
|0.37216435200000003|         0.053166336|
+-------------------+--------------------+
only showing top 5 rows



- Format the 'council_district' column by adding leading 0s to it

In [30]:
case = case.withColumn('council_district', col('council_district').cast('int'))
# '%03d' means at least 3 digits, pad with 0s

case = case.withColumn(
    'council_district', format_string('%03d', col('council_district').cast('int')))

case.groupBy('council_district').count().show()

+----------------+------+
|council_district| count|
+----------------+------+
|             009| 40916|
|             006| 74095|
|             005|114609|
|             003|102706|
|             008| 42345|
|             001|119309|
|             010| 62926|
|             004| 93778|
|             000|  3830|
|             007| 72445|
|             002|114745|
+----------------+------+



- Create a new feature named 'zipcode'

In [31]:
case = case.withColumn('zipcode', regexp_extract('request_address', r'\d+$', 0))

case.select('zipcode').show(5)

+-------+
|zipcode|
+-------+
|  78207|
|  78223|
|  78223|
|  78223|
|  78228|
+-------+
only showing top 5 rows



### Create several new, related columns:

- `case_age`: How old the case is; the difference in days between when the case was opened and the current day


- `days_to_closed`: The number of days between when the case was opened and when it was closed


- `case_lifetime`: Number of days between when the case was opened and when it was closed, if the case is still open, the number of days since the case was opened

In [32]:
case = (
    case.withColumn(
        "case_age", datediff(current_timestamp(), "case_opened_date")
    )
    .withColumn(
        "days_to_closed", datediff("case_closed_date", "case_opened_date")
    )
    .withColumn(
        "case_lifetime",
        when(expr("! case_closed"), col("case_age")).otherwise(
            col("days_to_closed")
        ),
    )
)

case.select(
    "case_closed",
    "case_opened_date",
    "case_closed_date",
    "case_age",
    "days_to_closed",
    "case_lifetime",
).where(expr("case_closed")).show(5)

case.select(
    "case_closed",
    "case_opened_date",
    "case_closed_date",
    "case_age",
    "days_to_closed",
    "case_lifetime",
).where(expr("! case_closed")).show(5)

+-----------+-------------------+-------------------+--------+--------------+-------------+
|case_closed|   case_opened_date|   case_closed_date|case_age|days_to_closed|case_lifetime|
+-----------+-------------------+-------------------+--------+--------------+-------------+
|       true|2018-01-01 00:42:00|2018-01-01 12:29:00|     717|             0|            0|
|       true|2018-01-01 00:46:00|2018-01-03 08:11:00|     717|             2|            2|
|       true|2018-01-01 00:48:00|2018-01-02 07:57:00|     717|             1|            1|
|       true|2018-01-01 01:29:00|2018-01-02 08:13:00|     717|             1|            1|
|       true|2018-01-01 01:34:00|2018-01-01 13:29:00|     717|             0|            0|
+-----------+-------------------+-------------------+--------+--------------+-------------+
only showing top 5 rows

+-----------+-------------------+----------------+--------+--------------+-------------+
|case_closed|   case_opened_date|case_closed_date|case_age

### Joining Department Data

- Join the `dept.csv` that contains information about various different departments

In [33]:
dept.columns

['dept_division', 'dept_name', 'standardized_dept_name', 'dept_subject_to_SLA']

In [34]:
case = (
    case.join(dept, 'dept_division', 'left')
    .drop(dept.dept_division)
    .drop(dept.dept_name)
    .drop(case.dept_division)
    .withColumnRenamed('standardized_dept_name', 'department')
    .withColumn('dept_subject_to_SLA', col('dept_subject_to_SLA') == 'YES')
)

In [35]:
case.count(), len(case.columns)

(841704, 20)

In [36]:
case.show(1, vertical = True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  el paso st,... 
 council_district     | 005                  
 num_weeks_late       | -142.6441088         
 zipcode              | 78207                
 case_age             | 717                  
 days_to_closed       | 0                    
 case_lifetime        | 0                    
 department           | Animal Care Services 
 dept_subject_to_SLA  | true                 
only showing top 1 row



**Note:** next few lines are from the Data Wrangling review/function building

In [38]:
row = case.select('service_request_type', 'department').head()
row

Row(service_request_type='Stray Animal', department='Animal Care Services')

In [46]:
row.service_request_type

'Stray Animal'

In [49]:
row = case.select(max('case_closed_date')).head()
row

Row(max(case_closed_date)=datetime.datetime(2018, 8, 8, 10, 38))

In [50]:
row[0]

datetime.datetime(2018, 8, 8, 10, 38)

In [52]:
case.select(max('case_lifetime')).show()

+------------------+
|max(case_lifetime)|
+------------------+
|              1082|
+------------------+



**Task:** complete the 12 exercises at the end of the Data Wrangling module