# Spark Data Wrangling

In [44]:
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

from env import user, password, host

In [2]:
def get_db_url(db):
    '''input df and output sql connection string'''
    return (f'mysql+pymysql://{user}:{password}@{host}/{db}')

## Acquire

In [3]:
#create enviroment
spark = SparkSession.builder.getOrCreate()
spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/19 14:06:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/05/19 14:06:55 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


### load mpg data set from pydataset

In [13]:
from pydataset import data

In [14]:
mpg = spark.createDataFrame(data('mpg'))
mpg

DataFrame[manufacturer: string, model: string, displ: double, year: bigint, cyl: bigint, trans: string, drv: string, cty: bigint, hwy: bigint, fl: string, class: string]

In [15]:
mpg.show(5)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
|        audi|   a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|
|        audi|   a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 5 rows



### write datafame to file

- `json`: for writing to a local json file(s)
- `csv`: for writing to a local csv file(s)
- `parquet`: Parquet is a very popular columnar storage format for Hadoop.
- `jdbc`: for writing to a SQL database table

#### write file to json

In [16]:
#df.write.type
mpg.write.json('data/mpg_json', mode='overwrite')

                                                                                

### Spark is splitting up the file in order to be used on distributed systems

#### write dataframe to csv

In [27]:
#df.write.format()
(
mpg.write.format('csv')
    .mode('overwrite')
    .option('header', 'True')
    .save('data/mpg_csv')
)



### read files
- spark.read.[type]

In [17]:
import os

#### read json

In [18]:
os.listdir('data/mpg_json')

['part-00007-10b14cda-e351-4c81-bcc0-7c8d2d3bb627-c000.json',
 '.part-00002-10b14cda-e351-4c81-bcc0-7c8d2d3bb627-c000.json.crc',
 'part-00002-10b14cda-e351-4c81-bcc0-7c8d2d3bb627-c000.json',
 '.part-00001-10b14cda-e351-4c81-bcc0-7c8d2d3bb627-c000.json.crc',
 'part-00000-10b14cda-e351-4c81-bcc0-7c8d2d3bb627-c000.json',
 '._SUCCESS.crc',
 '.part-00006-10b14cda-e351-4c81-bcc0-7c8d2d3bb627-c000.json.crc',
 'part-00005-10b14cda-e351-4c81-bcc0-7c8d2d3bb627-c000.json',
 '.part-00005-10b14cda-e351-4c81-bcc0-7c8d2d3bb627-c000.json.crc',
 'part-00004-10b14cda-e351-4c81-bcc0-7c8d2d3bb627-c000.json',
 '.part-00003-10b14cda-e351-4c81-bcc0-7c8d2d3bb627-c000.json.crc',
 'part-00001-10b14cda-e351-4c81-bcc0-7c8d2d3bb627-c000.json',
 '.part-00000-10b14cda-e351-4c81-bcc0-7c8d2d3bb627-c000.json.crc',
 '_SUCCESS',
 'part-00003-10b14cda-e351-4c81-bcc0-7c8d2d3bb627-c000.json',
 '.part-00007-10b14cda-e351-4c81-bcc0-7c8d2d3bb627-c000.json.crc',
 'part-00006-10b14cda-e351-4c81-bcc0-7c8d2d3bb627-c000.json',
 '.p

In [19]:
# there are abunch of files created from this command

In [22]:
first_file = [fn for fn in os.listdir('data/mpg_json') if not fn.startswith('.')][0]

In [23]:
first_file

'part-00007-10b14cda-e351-4c81-bcc0-7c8d2d3bb627-c000.json'

In [26]:
spark.read.json(f'data/mpg_json/{first_file}')

DataFrame[class: string, cty: bigint, cyl: bigint, displ: double, drv: string, fl: string, hwy: bigint, manufacturer: string, model: string, trans: string, year: bigint]

#### read csv

In [28]:
#keep written csv headers
(
    spark.read.format("csv")
      .option("header", True)
      .load("data/mpg_csv")
).count()

                                                                                

234

### load source from 311_data in sql

In [4]:
#sql query
url = get_db_url('311_data')
query = 'select source_id, source_username from source'

In [8]:
#make pandas df

# pandas_df = pd.read_sql(query, url)
# pandas_df.head()

Unnamed: 0,dept_division,dept_name,standardized_dept_name,dept_subject_to_SLA
0,311 Call Center,Customer Service,Customer Service,YES
1,Brush,Solid Waste Management,Solid Waste,YES
2,Clean and Green,Parks and Recreation,Parks & Recreation,YES
3,Clean and Green Natural Areas,Parks and Recreation,Parks & Recreation,YES
4,Code Enforcement,Code Enforcement Services,DSD/Code Enforcement,YES


### load cases from 311_data from sql

In [6]:
#sql query
query = 'select * from cases limit 100000'

In [9]:
#pandas df
# pandas_df = pd.read_sql(query, url)
pandas_df = pd.read_csv('311.csv')

In [102]:
#spark df
df = spark.createDataFrame(pandas_df)
df.printSchema()

root
 |-- case_id: long (nullable = true)
 |-- case_opened_date: string (nullable = true)
 |-- case_closed_date: string (nullable = true)
 |-- SLA_due_date: string (nullable = true)
 |-- case_late: string (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: string (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: long (nullable = true)



In [60]:
df.show(3, vertical=True, truncate=False)

-RECORD 0-----------------------------------------------------
 case_id              | 1014127332                            
 case_opened_date     | 1/1/18 0:42                           
 case_closed_date     | 1/1/18 12:29                          
 SLA_due_date         | 9/26/20 0:42                          
 case_late            | NO                                    
 num_days_late        | -998.5087616                          
 case_closed          | YES                                   
 dept_division        | Field Operations                      
 service_request_type | Stray Animal                          
 SLA_days             | 999.0                                 
 case_status          | Closed                                
 source_id            | svcCRMLS                              
 request_address      | 2315  EL PASO ST, San Antonio, 78207  
 council_district     | 5                                     
-RECORD 1----------------------------------------------

23/05/19 15:02:50 WARN TaskSetManager: Stage 36 contains a task of very large size (1346 KiB). The maximum recommended task size is 1000 KiB.


## Prepare

- rename columns
- correct datatypes
- data transformation
- make new features
- join tables

### rename columns

#### change SLA_due_date to case_due_date

In [103]:
df = df.withColumnRenamed('SLA_due_date', 'case_due_date')

In [69]:
df.show(1, vertical=True, truncate=False)

-RECORD 0----------------------------------------------------
 case_id              | 1014127332                           
 case_opened_date     | 1/1/18 0:42                          
 case_closed_date     | 1/1/18 12:29                         
 case_due_date        | 9/26/20 0:42                         
 case_late            | NO                                   
 num_days_late        | -998.5087616                         
 case_closed          | YES                                  
 dept_division        | Field Operations                     
 service_request_type | Stray Animal                         
 SLA_days             | 999.0                                
 case_status          | Closed                               
 source_id            | svcCRMLS                             
 request_address      | 2315  EL PASO ST, San Antonio, 78207 
 council_district     | 5                                    
only showing top 1 row



23/05/19 15:03:42 WARN TaskSetManager: Stage 44 contains a task of very large size (1346 KiB). The maximum recommended task size is 1000 KiB.


### correct datatypes

#### change case_closed and case_late columns into boolean values

In [63]:
#use condition to make true and false
df.select('case_closed', 'case_late').distinct().show()

23/05/19 15:02:50 WARN TaskSetManager: Stage 38 contains a task of very large size (1346 KiB). The maximum recommended task size is 1000 KiB.


+-----------+---------+
|case_closed|case_late|
+-----------+---------+
|         NO|      YES|
|        YES|      YES|
|         NO|       NO|
|        YES|       NO|
+-----------+---------+



In [64]:
# the fields are in YES/NO format, we want them True/False

In [104]:
df = df.withColumn(
    'case_closed', 
    F.expr('case_closed') == 'YES').\
    withColumn(
    'case_late', 
    F.expr('case_late') == 'YES')

In [72]:
df.select('case_closed', 'case_late').distinct().show(1)

23/05/19 15:03:53 WARN TaskSetManager: Stage 45 contains a task of very large size (1346 KiB). The maximum recommended task size is 1000 KiB.


+-----------+---------+
|case_closed|case_late|
+-----------+---------+
|       true|    false|
+-----------+---------+
only showing top 1 row



#### change council_district datatype to string

In [73]:
df.show(1)

+----------+----------------+----------------+-------------+---------+-------------+-----------+----------------+--------------------+--------+-----------+---------+--------------------+----------------+
|   case_id|case_opened_date|case_closed_date|case_due_date|case_late|num_days_late|case_closed|   dept_division|service_request_type|SLA_days|case_status|source_id|     request_address|council_district|
+----------+----------------+----------------+-------------+---------+-------------+-----------+----------------+--------------------+--------+-----------+---------+--------------------+----------------+
|1014127332|     1/1/18 0:42|    1/1/18 12:29| 9/26/20 0:42|    false| -998.5087616|       true|Field Operations|        Stray Animal|   999.0|     Closed| svcCRMLS|2315  EL PASO ST,...|               5|
+----------+----------------+----------------+-------------+---------+-------------+-----------+----------------+--------------------+--------+-----------+---------+-------------------

23/05/19 15:04:00 WARN TaskSetManager: Stage 48 contains a task of very large size (1346 KiB). The maximum recommended task size is 1000 KiB.


In [105]:
#use .cast()
df = df.withColumn('council_district', F.col('council_district').cast('string'))

#### change dates to datetype

format date strings: https://docs.oracle.com/javase/10/docs/api/java/time/format/DateTimeFormatter.html

In [81]:
df.select('case_opened_date').show(3)

23/05/19 15:08:16 WARN TaskSetManager: Stage 50 contains a task of very large size (1346 KiB). The maximum recommended task size is 1000 KiB.


+----------------+
|case_opened_date|
+----------------+
|     1/1/18 0:42|
|     1/1/18 0:46|
|     1/1/18 0:48|
+----------------+
only showing top 3 rows



In [82]:
fmt = 'M/d/yy H:m'

In [84]:
#use to_timestamp
df.select(
    F.to_timestamp('case_opened_date', fmt), 
    F.to_timestamp('case_closed_date', fmt)
).show(3)

23/05/19 15:09:16 WARN TaskSetManager: Stage 51 contains a task of very large size (1346 KiB). The maximum recommended task size is 1000 KiB.


+------------------------------------------+------------------------------------------+
|to_timestamp(case_opened_date, M/d/yy H:m)|to_timestamp(case_closed_date, M/d/yy H:m)|
+------------------------------------------+------------------------------------------+
|                       2018-01-01 00:42:00|                       2018-01-01 12:29:00|
|                       2018-01-01 00:46:00|                       2018-01-03 08:11:00|
|                       2018-01-01 00:48:00|                       2018-01-02 07:57:00|
+------------------------------------------+------------------------------------------+
only showing top 3 rows



In [106]:
#use to_timestamp
df = df.\
    withColumn('case_opened_date', F.to_timestamp('case_opened_date', fmt)).\
    withColumn('case_closed_date', F.to_timestamp('case_closed_date', fmt))

### data transformation

#### normalize address
- `lower`: lowercase everything
- `trim`: remove whitespace on the edges 

In [107]:
df =df.withColumn(
    'request_address',
    F.trim(F.lower('request_address'))
)

#### change num_days_late to num_weeks_late

In [94]:
df.select('num_days_late').show(3)

23/05/19 15:13:35 WARN TaskSetManager: Stage 58 contains a task of very large size (1346 KiB). The maximum recommended task size is 1000 KiB.


+-------------+
|num_days_late|
+-------------+
| -998.5087616|
| -2.012604167|
| -3.022337963|
+-------------+
only showing top 3 rows



In [108]:
df = df.withColumn('num_weeks_late',
    F.expr('num_days_late / 7'))

#### change council_district to int and pad with 00s

In [112]:
df = df.withColumn('council_district',
    F.format_string('%03d', F.col('council_district').cast('int'))
)

In [113]:
df.show(3, vertical=True, truncate=False)

23/05/19 15:20:02 WARN TaskSetManager: Stage 65 contains a task of very large size (1346 KiB). The maximum recommended task size is 1000 KiB.


-RECORD 0-----------------------------------------------------
 case_id              | 1014127332                            
 case_opened_date     | 2018-01-01 00:42:00                   
 case_closed_date     | 2018-01-01 12:29:00                   
 case_due_date        | 9/26/20 0:42                          
 case_late            | false                                 
 num_days_late        | -998.5087616                          
 case_closed          | true                                  
 dept_division        | Field Operations                      
 service_request_type | Stray Animal                          
 SLA_days             | 999.0                                 
 case_status          | Closed                                
 source_id            | svcCRMLS                              
 request_address      | 2315  el paso st, san antonio, 78207  
 council_district     | 005                                   
 num_weeks_late       | -142.6441088                   

### new features

#### create zip code column

In [114]:
df.select('request_address').show(3)

23/05/19 15:21:16 WARN TaskSetManager: Stage 66 contains a task of very large size (1346 KiB). The maximum recommended task size is 1000 KiB.


+--------------------+
|     request_address|
+--------------------+
|2315  el paso st,...|
|2215  goliad rd, ...|
|102  palfrey st w...|
+--------------------+
only showing top 3 rows



In [117]:
df = df.withColumn('zip_code', F.regexp_extract('request_address', r'(\d+?)$', 1))

In [118]:
df.show(2, vertical=True, truncate=False)

23/05/19 15:23:10 WARN TaskSetManager: Stage 68 contains a task of very large size (1346 KiB). The maximum recommended task size is 1000 KiB.


-RECORD 0----------------------------------------------------
 case_id              | 1014127332                           
 case_opened_date     | 2018-01-01 00:42:00                  
 case_closed_date     | 2018-01-01 12:29:00                  
 case_due_date        | 9/26/20 0:42                         
 case_late            | false                                
 num_days_late        | -998.5087616                         
 case_closed          | true                                 
 dept_division        | Field Operations                     
 service_request_type | Stray Animal                         
 SLA_days             | 999.0                                
 case_status          | Closed                               
 source_id            | svcCRMLS                             
 request_address      | 2315  el paso st, san antonio, 78207 
 council_district     | 005                                  
 num_weeks_late       | -142.6441088                         
 zip_cod

#### create case_lifetime column

- case_age: how long since the case first opened
- days_to_close: the number of days between days opened and days closed
- case_lifetime: if the case is open, how long since the case opened, if the case is closed, the number of days to close


In [121]:
#use datediff() to find the difference between two dates
df = df.withColumn('case_age',
    F.datediff(
        F.current_timestamp(),
        F.col('case_opened_date'))
)

In [123]:
df = df.withColumn(
    'days_to_close',
    F.datediff(
        F.col('case_closed_date'),
        F.col('case_opened_date'))
)

In [124]:
df.show(1, vertical=True)

23/05/19 15:30:06 WARN TaskSetManager: Stage 71 contains a task of very large size (1346 KiB). The maximum recommended task size is 1000 KiB.


-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 9/26/20 0:42         
 case_late            | false                
 num_days_late        | -998.5087616         
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  el paso st,... 
 council_district     | 005                  
 num_weeks_late       | -142.6441088         
 zip_code             | 78207                
 case_age             | 1964                 
 days_to_close        | 0                    
only showing top 1 row



In [132]:
#create case_lifetime column
df = df.withColumn(
    'case_lifetime',
    F.when(F.col('case_closed'), F.col('days_to_close')
          ).\
    otherwise(F.col('case_age'))
)

In [133]:
df.show(1, vertical=True)

23/05/19 15:35:58 WARN TaskSetManager: Stage 72 contains a task of very large size (1346 KiB). The maximum recommended task size is 1000 KiB.


-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 9/26/20 0:42         
 case_late            | false                
 num_days_late        | -998.5087616         
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  el paso st,... 
 council_district     | 005                  
 num_weeks_late       | -142.6441088         
 zip_code             | 78207                
 case_age             | 1964                 
 days_to_close        | 0                    
 case_lifetime        | 0                    
only showing top 1 row



In [135]:
#drop unnecessary columns
df = df.drop('days_to_close').drop('case_age')

In [136]:
df.show(1, vertical=True)

23/05/19 15:48:50 WARN TaskSetManager: Stage 73 contains a task of very large size (1346 KiB). The maximum recommended task size is 1000 KiB.


-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 9/26/20 0:42         
 case_late            | false                
 num_days_late        | -998.5087616         
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  el paso st,... 
 council_district     | 005                  
 num_weeks_late       | -142.6441088         
 zip_code             | 78207                
 case_lifetime        | 0                    
only showing top 1 row



### join the dept table from sql to our current df

In [140]:
df.select('dept_division').show(5)

23/05/19 15:51:54 WARN TaskSetManager: Stage 75 contains a task of very large size (1346 KiB). The maximum recommended task size is 1000 KiB.


+----------------+
|   dept_division|
+----------------+
|Field Operations|
|     Storm Water|
|     Storm Water|
|Code Enforcement|
|Field Operations|
+----------------+
only showing top 5 rows



In [None]:
#get dept table from sql
query = 'select * from dept'

In [137]:
url = get_db_url('311_data')
# dept = pd.read_sql(query, url)
dept = pd.read_csv('dept.csv')

In [138]:
dept = spark.createDataFrame(dept)
dept

DataFrame[dept_division: string, dept_name: string, standardized_dept_name: string, dept_subject_to_SLA: string]

In [139]:
dept.show(3)

+---------------+--------------------+----------------------+-------------------+
|  dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+---------------+--------------------+----------------------+-------------------+
|311 Call Center|    Customer Service|      Customer Service|                YES|
|          Brush|Solid Waste Manag...|           Solid Waste|                YES|
|Clean and Green|Parks and Recreation|    Parks & Recreation|                YES|
+---------------+--------------------+----------------------+-------------------+
only showing top 3 rows



In [141]:
df.join(dept, 
        'dept_division', 
        'left').\
    drop('dept_division').\
    drop('dept_name')

23/05/19 15:52:55 WARN TaskSetManager: Stage 76 contains a task of very large size (1346 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+----------------+----------+-------------------+-------------------+-------------+---------+-------------+-----------+--------------------+--------+-----------+---------+--------------------+----------------+--------------+--------+-------------+--------------------+----------------------+-------------------+
|   dept_division|   case_id|   case_opened_date|   case_closed_date|case_due_date|case_late|num_days_late|case_closed|service_request_type|SLA_days|case_status|source_id|     request_address|council_district|num_weeks_late|zip_code|case_lifetime|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+----------------+----------+-------------------+-------------------+-------------+---------+-------------+-----------+--------------------+--------+-----------+---------+--------------------+----------------+--------------+--------+-------------+--------------------+----------------------+-------------------+
|Field Operations|1014127332|2018-01-01 00:42:00|2018-01-01 12:2

### train, validate, test split

- `.randomSplit` to split df

In [142]:
train, test = df.randomSplit([.8, .2])

In [143]:
train,validate,test = df.randomSplit([.6, .2, .2], seed=123)

In [145]:
df.count()

23/05/19 15:56:01 WARN TaskSetManager: Stage 85 contains a task of very large size (1346 KiB). The maximum recommended task size is 1000 KiB.


100000

In [144]:
validate.count()

23/05/19 15:55:47 WARN TaskSetManager: Stage 82 contains a task of very large size (1346 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

20117

In [146]:
train.count() + validate.count() + test.count()

23/05/19 15:56:23 WARN TaskSetManager: Stage 88 contains a task of very large size (1346 KiB). The maximum recommended task size is 1000 KiB.
23/05/19 15:56:25 WARN TaskSetManager: Stage 91 contains a task of very large size (1346 KiB). The maximum recommended task size is 1000 KiB.
23/05/19 15:56:26 WARN TaskSetManager: Stage 94 contains a task of very large size (1346 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

100000