# Spark Data Wrangling

In [None]:
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql.functions import * 

from env import user, password, host

In [None]:
def get_db_url(db):
    '''input df and output sql connection string'''
    return (f'mysql+pymysql://{user}:{password}@{host}/{db}')

## Acquire

In [None]:
#create enviroment
spark = SparkSession.builder.getOrCreate()
spark

### load mpg data set from pydataset

In [None]:
from pydataset import data

In [None]:
mpg = spark.createDataFrame(data('mpg'))
mpg

In [None]:
mpg.show(5)

### write datafame to file

- `json`: for writing to a local json file(s)
- `csv`: for writing to a local csv file(s)
- `parquet`: Parquet is a very popular columnar storage format for Hadoop.
- `jdbc`: for writing to a SQL database table

#### write file to json

In [None]:
#df.write.type
d

#### write dataframe to csv

In [None]:
#df.write.format()


### read files
- spark.read.[type]

#### read json

#### read csv

In [None]:
#keep written csv headers
(
    spark.read.format("csv")
      .option("header", True)
      .load("data/mpg_csv")
).count()

### load source from 311_data in sql

In [None]:
#sql query
url = get_db_url('311_data')
query = 'select source_id, source_username from source'

In [None]:
#make pandas df
pandas_df = pd.read_sql(query, url)
pandas_df.head()

### load cases from 311_data from sql

In [None]:
#sql query
query = 'select * from cases limit 100000'

In [None]:
#pandas df
# pandas_df = pd.read_sql(query, url)

In [None]:
#spark df
df = spark.createDataFrame(pandas_df)
df

In [None]:
df.show(3, vertical=True, truncate=False)

## Prepare

- rename columns
- correct datatypes
- data transformation
- make new features
- join tables

### rename columns

#### change SLA_due_date to case_due_date

### correct datatypes

#### change close_closed and case_late columns into boolean values

In [None]:
#use condition to make true and false


#### change council_district datatype to string

In [None]:
#use .cast()


#### change dates to datetype

format date strings: https://docs.oracle.com/javase/10/docs/api/java/time/format/DateTimeFormatter.html

In [None]:
#use to_timestamp


### data transformation

#### normalize address
- `lower`: lowercase everything
- `trim`: remove whitespace on the edges 

#### change num_days_late to num_weeks_late

#### change council_district to int and pad with 00s

### new features

#### create zip code column

#### create case_lifetime column

- case_age: how long since the case first opened
- days_to_close: the number of days between days opened and days closed
- case_lifetime: if the case is open, how long since the case opened, if the case is closed, the number of days to close


In [None]:
#use datediff() to find the difference between two dates


In [None]:
#create case_lifetime column


In [None]:
#drop unnecessary columns


### join the dept table from sql to our current df

In [None]:
df.select('dept_division').show(5)

In [None]:
#get dept table from sql
query = 'select * from dept'

In [None]:
url = get_db_url('311_data')
dept = pd.read_sql(query, url)

In [None]:
dept = spark.createDataFrame(dept)
dept

### train, validate, test split

- `.randomSplit` to split df