# Data Wrangling

In this lesson, we will acquire and prepare the data we will use in the rest of this module.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.getOrCreate()

## Reading Data

Spark lets us read data in from a variety of data sources using what it calls a `DataFrameReader`. We can access the `read` property of our `spark` object and then set various options and read from a data source.

In [3]:
# df = spark.read.csv("data/source.csv", sep=",", header=True, inferSchema=True)

In [None]:
# The above code and also be written as

(
    spark.read.format("csv")
    .option("sep", ",")
    .option("inferSchema", True)
    .option("header", True)
    .load("data/source.csv")
)

## Data Schemas

Spark incldues a concept of a *data schema*, which is a way to specify the types of our data ahead of time. Doing so lets us be sure about the structure of our data, and can significantly increase the speed of loading data (inferring the schema can be a costly operation for a large datasets).

We'll import several things from the `pyspark.sql.types` module:

* `StringType`
* `DoubleType`
* `IntegerType`
* `LongType`
* `ShortType`
* `TimestampType`
* `FloatType`
* `DataType`

All of the above types will go inside of a `StructField`, which wil be encaptsulated in a `StructType`, and the resulting object will represent our data schema.

In [4]:
# from pyspark.sql.types import StructType, StructField, StringType

# schema = StructType(
#     [
#         StructField("source_id", StringType()),
#         StructField("source_username", StringType()),
#     ]
# )

# spark.read.csv("data/source.csv", header=True, schema=schema)

## Writing Data

A spark dataframe can be written to a local destination using the `.write` property. Serveral common output formats are:

* `csv`: for writing to a local csv file
* `parquet`: Parquet is a very populat columnar storage format for Hadoop
* `json`: for writing to a local json file(s)
* `jdbc`: for writing to a SQL databse table

In [5]:
# # for demo purposes
# from pydataset import data

# mpg = spark.createDataFrame(data("mpg"))

# mpg.write.json("data/mpg_json", mode="overwrite")

# # like much else in spark, there's multiple ways we could do this:
# (
#     mpg.write.format("csv")
#     .mode("overwrite")
#     .option("header", "true")
#     .save("data/mpg_csv")
# )

## Data Preparation

For the rest of this lession, we'll take a look at the `case` data from the San Antionio 311 calls dataset.

In [6]:
# df = spark.read.csv("data/case.csv", header=True, inferSchema=True)
# df.show(2, vertical=True)

### Rename Columns

We'll rename this column to match with the other date-type columns

In [None]:
df = df.withColumnRename("SLA_due_date", "case_due_date")

### Correct Data Types

Two columns, `case_closed` and `case_late` store yes/no values. Currently spark things they are strings; let's turn them into booleans

In [None]:
df = df.withColumn("case_closed", expr('case_closed == "YES"')).withColumn(
    "case_late", expr('case_late == "YES"')
)

df.select("case_closed", "case_late").show(5)

The `council_district` column appears as though it is an integer, but this is just a unique identifier for each district, that is, we aren't going to be performing arithmetic with this number, so we will turn it into a string type.

In [None]:
df = df.withColumn("council_district", col("council_district").cast("string"))

Now we will handle the 3 columns that have dates in them. We'll use spark's `to_timestamp` function for this.

In order to work properly, we'll need to provide the date format when using `to_timestamp`. The date format is a little different than the date functionality we've worked with in pandas, this is becuase it is using Java's SimpleDateFormat.

In [None]:
print("--- Before handling dates")
df.select("case_opened_date", "case_closed_date", "case_due_date").show(5)

fmt = "M/d/yy H:mm"
df = (
    df.withColumn("case_opened_date", to_timestamp("case_opened_date", fmt))
    .withColumn("case_closed_date", to_timestamp("case_opened_date", fmt))
    .withColumn("case_due_date", to_timestamp("case_opened_date", fmt))
)

print("--- After")
df.select("case_opened_date", "case_closed_date", "case_due_date").show(5)

### Data Transformations

Now that we have everything stored as the correct data type, we will make a few transformations to the data.

We'll begin by normalizing the request address field. Using the `trim` and `lower` functions lets us strip any leading or trainling whitespace and converty everything to lowercase.

In [None]:
print("--- Before")
df.select("request_address").show(5)

df = df.withColumn("request_address", trim(lower(df.request_address)))

print("--- After")
df.select("request_address").show(5)

We will convert the number of days a case is late to a number of weeks

In [None]:
df = df.withColumn(
    "num_weeks_late", expr("num_days_late / 7 AS num_weeks_late")
)

df.select("num_days_late", "num_weeks_late").show(5)

Lastly, we can format the council district column a little differently. We'll add leading 0s to it:

In [None]:
df = df.withColumn("council_district", col("council_district").cast("int"))

# '%03d' means at least 3 digits, pad with 0s
#
# In order to use the format_string function the way we are, we'll need to
# convert council_district back to an integer temporarily, but the final output
# will be a string.
df = df.withColumn(
    "council_district",
    format_string("%03d", col("council_district").cast("int")),
)

df.select("council_district").show(5)

### New Features

Let's now create some new features basd on our existing data.

We will first extract the zipcode from the address:

In [None]:
df = df.withColumn("zipcode", regexp_extract("request_address", r"\d+$", 0))

df.select("zipcode").show(5)

Here we have defined the zipcode as the last sequence of digits at the end of the string.

Next we will create several new, related columns:

* `case_age`: How old the case is; the difference in days between when the case was opened and the current day
* `days_to_closed`: The number of days between when the case was opened and when it was closed
* `case_lifetime`: Number of days between when the case was opened and when it was closed, if the case is still open, the number of days since the case was opened

In [None]:
df = (
    df.withColumn(
        "case_age", datediff(current_timestamp(), "case_opened_date")
    )
    .withColumn(
        "days_to_closed", datediff("case_closed_date", "case_opened_date")
    )
    .withColumn(
        "case_lifetime",
        when(expr("! case_closed"), col("case_age")).otherwise(
            col("days_to_closed")
        ),
    )
)

df.select(
    "case_closed",
    "case_opened_date",
    "case_closed_date",
    "case_age",
    "days_to_closed",
    "case_lifetime",
).where(expr("case_closed")).show(5)

df.select(
    "case_closed",
    "case_opened_date",
    "case_closed_date",
    "case_age",
    "days_to_closed",
    "case_lifetime",
).where(expr("! case_closed")).show(5)

### Joining Department Data

We have access to another dataset, `dept.csv`, that contains more information about the vaiours different departments. 

In [None]:
dept = spark.read.csv("data/dept.csv", header=True, inferSchema=True)
dept.show(5)

It might be useful to include this data, so we can join it to our case dataframe using the `dept_division` column.

In [None]:
df = (
    df
    # left join on dept_division
    .join(dept, "dept_division", "left")
    # drop all the columns except for standardized name, as it has much fewer unique values
    .drop(dept.dept_division)
    .drop(dept.dept_name)
    .drop(df.dept_division)
    .withColumnRenamed("standardized_dept_name", "department")
    # convert to a boolean
    .withColumn("dept_subject_to_SLA", col("dept_subject_to_SLA") == "YES")
)

df.show(2, vertical=True)

## Train Test Split

In [None]:
train, test = df.randomSplit([0.8, 0.2])
#
train, validate, test = df.randomSplit([0.6, 0.2, 0.2])

# Class Discussion