# Spark Data Wrangling

In [1]:
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql.functions import * 

from env import user, pwd, host

In [2]:
def get_db_url(db):
    '''input df and output sql connection string'''
    return (f'mysql+pymysql://{user}:{pwd}@{host}/{db}')

## Acquire

In [3]:
#create enviroment
spark = SparkSession.builder.getOrCreate()
spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/19 14:53:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### load mpg data set from pydataset

In [4]:
from pydataset import data

In [5]:
mpg = spark.createDataFrame(data('mpg'))
mpg

DataFrame[manufacturer: string, model: string, displ: double, year: bigint, cyl: bigint, trans: string, drv: string, cty: bigint, hwy: bigint, fl: string, class: string]

In [6]:
mpg.show(5)

[Stage 0:>                                                          (0 + 1) / 1]

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
|        audi|   a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|
|        audi|   a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 5 rows



                                                                                

### write datafame to file

- `json`: for writing to a local json file(s)
- `csv`: for writing to a local csv file(s)
- `parquet`: Parquet is a very popular columnar storage format for Hadoop.
- `jdbc`: for writing to a SQL database table

#### write file to json

In [7]:
#df.write.type
mpg.write.json('data/mpg_json',mode='overwrite')

                                                                                

In [8]:
import os

In [9]:
os.listdir('data/mpg_json/')

['part-00005-61804ae3-1ce4-4925-b047-5a9521172f6e-c000.json',
 '.part-00006-61804ae3-1ce4-4925-b047-5a9521172f6e-c000.json.crc',
 'part-00000-61804ae3-1ce4-4925-b047-5a9521172f6e-c000.json',
 '.part-00005-61804ae3-1ce4-4925-b047-5a9521172f6e-c000.json.crc',
 'part-00002-61804ae3-1ce4-4925-b047-5a9521172f6e-c000.json',
 '._SUCCESS.crc',
 '.part-00002-61804ae3-1ce4-4925-b047-5a9521172f6e-c000.json.crc',
 'part-00007-61804ae3-1ce4-4925-b047-5a9521172f6e-c000.json',
 '.part-00001-61804ae3-1ce4-4925-b047-5a9521172f6e-c000.json.crc',
 'part-00006-61804ae3-1ce4-4925-b047-5a9521172f6e-c000.json',
 '.part-00007-61804ae3-1ce4-4925-b047-5a9521172f6e-c000.json.crc',
 'part-00003-61804ae3-1ce4-4925-b047-5a9521172f6e-c000.json',
 '.part-00004-61804ae3-1ce4-4925-b047-5a9521172f6e-c000.json.crc',
 '_SUCCESS',
 'part-00001-61804ae3-1ce4-4925-b047-5a9521172f6e-c000.json',
 '.part-00003-61804ae3-1ce4-4925-b047-5a9521172f6e-c000.json.crc',
 'part-00004-61804ae3-1ce4-4925-b047-5a9521172f6e-c000.json',
 '.p

In [10]:
first_file = [fn for fn in os.listdir('data/mpg_json/') if not fn.startswith('.')][0]

In [11]:
first_file

'part-00005-61804ae3-1ce4-4925-b047-5a9521172f6e-c000.json'

In [12]:
spark.read.json(f'data/mpg_json/{first_file}').count()

29

In [13]:
spark.read.json(f'data/mpg_json/').count()

234

#### write dataframe to csv

In [14]:
#df.write.format()
(
mpg.write.format('csv')
    .mode('overwrite')
    .option('header',True)
    .save('data/mpg_csv')

)

### read files
- spark.read.[type]

#### read json

#### read csv

In [15]:
#keep written csv headers
(
    spark.read.format("csv")
      .option("header", True)
      .load("data/mpg_csv")
).count()

234

In [16]:
#keep written csv headers
(
    spark.read.format("csv")
      .option("header", True)
      .load("data/mpg_csv")
)

DataFrame[manufacturer: string, model: string, displ: string, year: string, cyl: string, trans: string, drv: string, cty: string, hwy: string, fl: string, class: string]

### load source from 311_data in sql

In [17]:
# #sql query
# url = get_db_url('311_data')
# query = 'select source_id, source_username from source'

In [18]:
# #make pandas df
# pandas_df = pd.read_sql(query, url)
# pandas_df.head()

In [19]:
pandas_df = pd.read_csv('311.csv')

### load cases from 311_data from sql

In [20]:
# #sql query
# query = 'select * from cases limit 100000'

In [21]:
#pandas df
# pandas_df = pd.read_sql(query, url)

In [22]:
#spark df
df = spark.createDataFrame(pandas_df)
df

DataFrame[case_id: bigint, case_opened_date: string, case_closed_date: string, SLA_due_date: string, case_late: string, num_days_late: double, case_closed: string, dept_division: string, service_request_type: string, SLA_days: double, case_status: string, source_id: string, request_address: string, council_district: bigint]

In [23]:
df.show(3, vertical=True, truncate=False)

-RECORD 0-----------------------------------------------------
 case_id              | 1014127332                            
 case_opened_date     | 1/1/18 0:42                           
 case_closed_date     | 1/1/18 12:29                          
 SLA_due_date         | 9/26/20 0:42                          
 case_late            | NO                                    
 num_days_late        | -998.5087616                          
 case_closed          | YES                                   
 dept_division        | Field Operations                      
 service_request_type | Stray Animal                          
 SLA_days             | 999.0                                 
 case_status          | Closed                                
 source_id            | svcCRMLS                              
 request_address      | 2315  EL PASO ST, San Antonio, 78207  
 council_district     | 5                                     
-RECORD 1----------------------------------------------

23/05/19 14:54:09 WARN TaskSetManager: Stage 16 contains a task of very large size (1346 KiB). The maximum recommended task size is 1000 KiB.


## Prepare

- rename columns
- correct datatypes
- data transformation
- make new features
- join tables

### rename columns

#### change SLA_due_date to case_due_date

In [None]:
df = df.withColumnRenamed('SLA_due_date','case_due_date')

### correct datatypes

In [None]:
df.select('case_closed','case_late').show(1)

#### change close_closed and case_late columns into boolean values

In [None]:
#use condition to make true and false


#### change council_district datatype to string

In [None]:
#use .cast()


#### change dates to datetype

format date strings: https://docs.oracle.com/javase/10/docs/api/java/time/format/DateTimeFormatter.html

In [None]:
#use to_timestamp


### data transformation

#### normalize address
- `lower`: lowercase everything
- `trim`: remove whitespace on the edges 

#### change num_days_late to num_weeks_late

#### change council_district to int and pad with 00s

### new features

#### create zip code column

#### create case_lifetime column

- case_age: how long since the case first opened
- days_to_close: the number of days between days opened and days closed
- case_lifetime: if the case is open, how long since the case opened, if the case is closed, the number of days to close


In [None]:
#use datediff() to find the difference between two dates


In [None]:
#create case_lifetime column


In [None]:
#drop unnecessary columns


### join the dept table from sql to our current df

In [None]:
df.select('dept_division').show(5)

In [None]:
#get dept table from sql
query = 'select * from dept'

In [None]:
url = get_db_url('311_data')
dept = pd.read_sql(query, url)

In [None]:
dept = spark.createDataFrame(dept)
dept

### train, validate, test split

- `.randomSplit` to split df