# Spark Data Wrangling

In [1]:
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql.functions import * 

from env import username, password, host

## Acquire

In [3]:
#create enviroment
spark = SparkSession.builder.getOrCreate()
spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/19 14:09:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/05/19 14:09:51 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


### load mpg data set from pydataset

In [15]:
from pydataset import data

In [16]:
mpg = spark.createDataFrame(data('mpg'))
mpg

DataFrame[manufacturer: string, model: string, displ: double, year: bigint, cyl: bigint, trans: string, drv: string, cty: bigint, hwy: bigint, fl: string, class: string]

In [17]:
mpg.show(5)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
|        audi|   a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|
|        audi|   a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 5 rows



### write datafame to file

- `json`: for writing to a local json file(s)
- `csv`: for writing to a local csv file(s)
- `parquet`: Parquet is a very popular columnar storage format for Hadoop.
- `jdbc`: for writing to a SQL database table

#### write file to json

In [18]:
#df.write.type
mpg.write.json('data/mpg_json', mode='overwrite')

                                                                                

In [19]:
import os

In [23]:
[fn for fn in os.listdir('data/mpg_json') if not fn.startswith('.')]

['part-00006-6d4ea11d-5c5b-4576-b097-3a1ae9d4d01d-c000.json',
 'part-00003-6d4ea11d-5c5b-4576-b097-3a1ae9d4d01d-c000.json',
 'part-00001-6d4ea11d-5c5b-4576-b097-3a1ae9d4d01d-c000.json',
 'part-00004-6d4ea11d-5c5b-4576-b097-3a1ae9d4d01d-c000.json',
 'part-00005-6d4ea11d-5c5b-4576-b097-3a1ae9d4d01d-c000.json',
 'part-00000-6d4ea11d-5c5b-4576-b097-3a1ae9d4d01d-c000.json',
 '_SUCCESS',
 'part-00002-6d4ea11d-5c5b-4576-b097-3a1ae9d4d01d-c000.json',
 'part-00007-6d4ea11d-5c5b-4576-b097-3a1ae9d4d01d-c000.json']

In [25]:
first_file = [fn for fn in os.listdir('data/mpg_json') if not fn.startswith('.')][0]

In [32]:
first_file

'part-00006-6d4ea11d-5c5b-4576-b097-3a1ae9d4d01d-c000.json'

In [33]:
len(first_file)

57

In [28]:
spark.read.json(f'data/mpg_json/{first_file}').show()

+-------+---+---+-----+---+---+---+------------+------------+----------+----+
|  class|cty|cyl|displ|drv| fl|hwy|manufacturer|       model|     trans|year|
+-------+---+---+-----+---+---+---+------------+------------+----------+----+
|    suv| 16|  4|  2.7|  4|  r| 20|      toyota| 4runner 4wd|  auto(l4)|1999|
|    suv| 15|  6|  3.4|  4|  r| 19|      toyota| 4runner 4wd|  auto(l4)|1999|
|    suv| 15|  6|  3.4|  4|  r| 17|      toyota| 4runner 4wd|manual(m5)|1999|
|    suv| 16|  6|  4.0|  4|  r| 20|      toyota| 4runner 4wd|  auto(l5)|2008|
|    suv| 14|  8|  4.7|  4|  r| 17|      toyota| 4runner 4wd|  auto(l5)|2008|
|midsize| 21|  4|  2.2|  f|  r| 29|      toyota|       camry|manual(m5)|1999|
|midsize| 21|  4|  2.2|  f|  r| 27|      toyota|       camry|  auto(l4)|1999|
|midsize| 21|  4|  2.4|  f|  r| 31|      toyota|       camry|manual(m5)|2008|
|midsize| 21|  4|  2.4|  f|  r| 31|      toyota|       camry|  auto(l5)|2008|
|midsize| 18|  6|  3.0|  f|  r| 26|      toyota|       camry|  a

In [29]:
spark.read.json(f'data/mpg_json/{first_file}').count()

29

In [30]:
spark.read.json('data/mpg_json').count()

234

#### write dataframe to csv

In [34]:
#df.write.format()
(
mpg.write.format('csv')
    .mode('overwrite')
    .option('header', 'True')
    .save('data/mpg_csv')
)

[Stage 14:>                                                         (0 + 8) / 8]                                                                                

### read files
- spark.read.[type]

#### read json

In [36]:
spark.read.json(f'data/mpg_json/{first_file}').count()

29

#### read csv

In [35]:
#keep written csv headers
(
    spark.read.format("csv")
      .option("header", True)
      .load("data/mpg_csv")
).count()



234

### load source from 311_data in sql

In [8]:
#sql query
url = get_db_url('311_data')
query = 'select source_id, source_username from source'

In [13]:
df = pd.read_csv('311 - 311.csv')

In [37]:
df.head()

Unnamed: 0,case_id,case_opened_date,case_closed_date,SLA_due_date,case_late,num_days_late,case_closed,dept_division,service_request_type,SLA_days,case_status,source_id,request_address,council_district
0,1014127332,1/1/18 0:42,1/1/18 12:29,9/26/20 0:42,NO,-998.508762,YES,Field Operations,Stray Animal,999.0,Closed,svcCRMLS,"2315 EL PASO ST, San Antonio, 78207",5
1,1014127333,1/1/18 0:46,1/3/18 8:11,1/5/18 8:30,NO,-2.012604,YES,Storm Water,Removal Of Obstruction,4.322222,Closed,svcCRMSS,"2215 GOLIAD RD, San Antonio, 78223",3
2,1014127334,1/1/18 0:48,1/2/18 7:57,1/5/18 8:30,NO,-3.022338,YES,Storm Water,Removal Of Obstruction,4.320729,Closed,svcCRMSS,"102 PALFREY ST W, San Antonio, 78223",3
3,1014127335,1/1/18 1:29,1/2/18 8:13,1/17/18 8:30,NO,-15.011481,YES,Code Enforcement,Front Or Side Yard Parking,16.291887,Closed,svcCRMSS,"114 LA GARDE ST, San Antonio, 78223",3
4,1014127336,1/1/18 1:34,1/1/18 13:29,1/1/18 4:34,YES,0.372164,YES,Field Operations,Animal Cruelty(Critical),0.125,Closed,svcCRMSS,"734 CLEARVIEW DR, San Antonio, 78228",7


### load cases from 311_data from sql

In [None]:
#sql query
query = 'select * from cases limit 100000'

In [None]:
#pandas df
# pandas_df = pd.read_sql(query, url)

In [None]:
#spark df
# df = spark.createDataFrame(pandas_df)
# df

In [39]:
df = spark.createDataFrame(df)

In [40]:
df.show(3, vertical=True, truncate=False)

23/05/19 14:49:07 WARN TaskSetManager: Stage 23 contains a task of very large size (1346 KiB). The maximum recommended task size is 1000 KiB.


-RECORD 0-----------------------------------------------------
 case_id              | 1014127332                            
 case_opened_date     | 1/1/18 0:42                           
 case_closed_date     | 1/1/18 12:29                          
 SLA_due_date         | 9/26/20 0:42                          
 case_late            | NO                                    
 num_days_late        | -998.5087616                          
 case_closed          | YES                                   
 dept_division        | Field Operations                      
 service_request_type | Stray Animal                          
 SLA_days             | 999.0                                 
 case_status          | Closed                                
 source_id            | svcCRMLS                              
 request_address      | 2315  EL PASO ST, San Antonio, 78207  
 council_district     | 5                                     
-RECORD 1----------------------------------------------

## Prepare

- rename columns
- correct datatypes
- data transformation
- make new features
- join tables

### rename columns

#### change SLA_due_date to case_due_date

In [45]:
df = df.withColumnRenamed('SLA_due-date', 'case_due_date')

In [46]:
df.show(3, vertical=True, truncate=False)

-RECORD 0-----------------------------------------------------
 case_id              | 1014127332                            
 case_opened_date     | 1/1/18 0:42                           
 case_closed_date     | 1/1/18 12:29                          
 SLA_due_date         | 9/26/20 0:42                          
 case_late            | NO                                    
 num_days_late        | -998.5087616                          
 case_closed          | YES                                   
 dept_division        | Field Operations                      
 service_request_type | Stray Animal                          
 SLA_days             | 999.0                                 
 case_status          | Closed                                
 source_id            | svcCRMLS                              
 request_address      | 2315  EL PASO ST, San Antonio, 78207  
 council_district     | 5                                     
-RECORD 1----------------------------------------------

23/05/19 14:52:38 WARN TaskSetManager: Stage 27 contains a task of very large size (1346 KiB). The maximum recommended task size is 1000 KiB.


In [47]:
df.columns

['case_id',
 'case_opened_date',
 'case_closed_date',
 'SLA_due_date',
 'case_late',
 'num_days_late',
 'case_closed',
 'dept_division',
 'service_request_type',
 'SLA_days',
 'case_status',
 'source_id',
 'request_address',
 'council_district']

### correct datatypes

#### change close_closed and case_late columns into boolean values

In [None]:
#use condition to make true and false


#### change council_district datatype to string

In [None]:
#use .cast()


#### change dates to datetype

format date strings: https://docs.oracle.com/javase/10/docs/api/java/time/format/DateTimeFormatter.html

In [None]:
#use to_timestamp


### data transformation

#### normalize address
- `lower`: lowercase everything
- `trim`: remove whitespace on the edges 

#### change num_days_late to num_weeks_late

#### change council_district to int and pad with 00s

### new features

#### create zip code column

#### create case_lifetime column

- case_age: how long since the case first opened
- days_to_close: the number of days between days opened and days closed
- case_lifetime: if the case is open, how long since the case opened, if the case is closed, the number of days to close


In [None]:
#use datediff() to find the difference between two dates


In [None]:
#create case_lifetime column


In [None]:
#drop unnecessary columns


### join the dept table from sql to our current df

In [None]:
df.select('dept_division').show(5)

In [None]:
#get dept table from sql
query = 'select * from dept'

In [None]:
url = get_db_url('311_data')
dept = pd.read_sql(query, url)

In [None]:
dept = spark.createDataFrame(dept)
dept

### train, validate, test split

- `.randomSplit` to split df