# Spark Data Wrangling

In [2]:
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql.functions import * 

In [3]:
def get_db_url(db):
    '''input df and output sql connection string'''
    return (f'mysql+pymysql://{user}:{password}@{host}/{db}')

## Acquire

In [4]:
#create enviroment
spark = SparkSession.builder.getOrCreate()
spark

### load mpg data set from pydataset

In [5]:
from pydataset import data

In [6]:
mpg = spark.createDataFrame(data('mpg'))
mpg

DataFrame[manufacturer: string, model: string, displ: double, year: bigint, cyl: bigint, trans: string, drv: string, cty: bigint, hwy: bigint, fl: string, class: string]

In [7]:
mpg.show(5)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
|        audi|   a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|
|        audi|   a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 5 rows



### write datafame to file

- `json`: for writing to a local json file(s)
- `csv`: for writing to a local csv file(s)
- `parquet`: Parquet is a very popular columnar storage format for Hadoop.
- `jdbc`: for writing to a SQL database table

#### write file to json

In [None]:
#df.write.type
mpg.write.json('data/mpg_json', mode='overwrite')

In [10]:
import os

#### write dataframe to csv

In [None]:
#df.write.format()
(
mpg.write.format('csv')
    .mode('overwrite')
    .option('header', 'True')
    .save

)

### read files
- spark.read.[type]

#### read json

#### read csv

In [None]:
#keep written csv headers
(
    spark.read.format("csv")
      .option("header", True)
      .load("data/mpg_csv")
).count()

### load source from 311_data in sql

In [None]:
#sql query
#url = get_db_url('311_data')
#query = 'select source_id, source_username from source'

In [12]:
#make pandas df
#pandas_df = pd.read_sql(query, url)
pandas_df = pd.read_csv('311.csv')
pandas_df.head()

Unnamed: 0,case_id,case_opened_date,case_closed_date,SLA_due_date,case_late,num_days_late,case_closed,dept_division,service_request_type,SLA_days,case_status,source_id,request_address,council_district
0,1014127332,1/1/18 0:42,1/1/18 12:29,9/26/20 0:42,NO,-998.508762,YES,Field Operations,Stray Animal,999.0,Closed,svcCRMLS,"2315 EL PASO ST, San Antonio, 78207",5
1,1014127333,1/1/18 0:46,1/3/18 8:11,1/5/18 8:30,NO,-2.012604,YES,Storm Water,Removal Of Obstruction,4.322222,Closed,svcCRMSS,"2215 GOLIAD RD, San Antonio, 78223",3
2,1014127334,1/1/18 0:48,1/2/18 7:57,1/5/18 8:30,NO,-3.022338,YES,Storm Water,Removal Of Obstruction,4.320729,Closed,svcCRMSS,"102 PALFREY ST W, San Antonio, 78223",3
3,1014127335,1/1/18 1:29,1/2/18 8:13,1/17/18 8:30,NO,-15.011481,YES,Code Enforcement,Front Or Side Yard Parking,16.291887,Closed,svcCRMSS,"114 LA GARDE ST, San Antonio, 78223",3
4,1014127336,1/1/18 1:34,1/1/18 13:29,1/1/18 4:34,YES,0.372164,YES,Field Operations,Animal Cruelty(Critical),0.125,Closed,svcCRMSS,"734 CLEARVIEW DR, San Antonio, 78228",7


In [13]:
sources = spark.createDataFrame(pandas_df)

In [14]:
sources.printSchema()

root
 |-- case_id: long (nullable = true)
 |-- case_opened_date: string (nullable = true)
 |-- case_closed_date: string (nullable = true)
 |-- SLA_due_date: string (nullable = true)
 |-- case_late: string (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: string (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: long (nullable = true)



### load cases from 311_data from sql

In [None]:
#sql query
db = '311_data'
query = 'select * from cases limit 100000'

In [None]:
#pandas df
# pandas_df = pd.read_sql(query, url)

In [15]:
#spark df
df = spark.createDataFrame(pandas_df)
df

DataFrame[case_id: bigint, case_opened_date: string, case_closed_date: string, SLA_due_date: string, case_late: string, num_days_late: double, case_closed: string, dept_division: string, service_request_type: string, SLA_days: double, case_status: string, source_id: string, request_address: string, council_district: bigint]

In [16]:
df.show(3, vertical=True, truncate=False)

-RECORD 0-----------------------------------------------------
 case_id              | 1014127332                            
 case_opened_date     | 1/1/18 0:42                           
 case_closed_date     | 1/1/18 12:29                          
 SLA_due_date         | 9/26/20 0:42                          
 case_late            | NO                                    
 num_days_late        | -998.5087616                          
 case_closed          | YES                                   
 dept_division        | Field Operations                      
 service_request_type | Stray Animal                          
 SLA_days             | 999.0                                 
 case_status          | Closed                                
 source_id            | svcCRMLS                              
 request_address      | 2315  EL PASO ST, San Antonio, 78207  
 council_district     | 5                                     
-RECORD 1----------------------------------------------

## Prepare

- rename columns
- correct datatypes
- data transformation
- make new features
- join tables

### rename columns

#### change SLA_due_date to case_due_date

In [17]:
df.withColumnRenamed('SLA_due_date','case_due_date').show(1,vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 case_due_date        | 9/26/20 0:42         
 case_late            | NO                   
 num_days_late        | -998.5087616         
 case_closed          | YES                  
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
only showing top 1 row



### correct datatypes

#### change close_closed and case_late columns into boolean values

In [18]:
#use condition to make true and false
df.select('case_closed','case_late').show(1)

+-----------+---------+
|case_closed|case_late|
+-----------+---------+
|        YES|       NO|
+-----------+---------+
only showing top 1 row



In [20]:
df.groupby('case_closed').count().show()

+-----------+-----+
|case_closed|count|
+-----------+-----+
|        YES|98547|
|         NO| 1453|
+-----------+-----+



In [19]:
df.select('case_closed').distinct().show()

+-----------+
|case_closed|
+-----------+
|        YES|
|         NO|
+-----------+



In [22]:
from pyspark.sql import functions as F

In [24]:
#use condition to make true and false
df.withColumn(
    'case_closed',
    df.case_closed == 'YES'
    #F.expr('case_closed == "YES"')
).withColumn(
)


.select('case_closed').show(1)

+-----------+
|case_closed|
+-----------+
|       true|
+-----------+
only showing top 1 row



#### change council_district datatype to string

In [None]:
#use .cast()
df.withColumn('council_district',
             F.col)

#### change dates to datetype

format date strings: https://docs.oracle.com/javase/10/docs/api/java/time/format/DateTimeFormatter.html

In [None]:
#use to_timestamp
df.select('case_opened_date',
          'case_closed_date')
    F.to_timestamp('case_opened_date'),
    F.to_timestamp('case_closed_date')

### data transformation

#### normalize address
- `lower`: lowercase everything
- `trim`: remove whitespace on the edges 

#### change num_days_late to num_weeks_late

#### change council_district to int and pad with 00s

### new features

#### create zip code column

#### create case_lifetime column

- case_age: how long since the case first opened
- days_to_close: the number of days between days opened and days closed
- case_lifetime: if the case is open, how long since the case opened, if the case is closed, the number of days to close


In [None]:
#use datediff() to find the difference between two dates


In [None]:
#create case_lifetime column


In [None]:
#drop unnecessary columns


### join the dept table from sql to our current df

In [None]:
df.select('dept_division').show(5)

In [None]:
#get dept table from sql
query = 'select * from dept'

In [None]:
url = get_db_url('311_data')
dept = pd.read_sql(query, url)

In [None]:
dept = spark.createDataFrame(dept)
dept

### train, validate, test split

- `.randomSplit` to split df