# API Review

## Create the spark session

In [None]:
import pyspark

spark = pyspark.sql.SparkSession.builder.getOrCreate()

## Create Dataframes

In [None]:
import pandas as pd
import numpy as np

np.random.seed(123)

pd_df = pd.DataFrame(dict(n=np.arange(20), group=np.random.choice(list("abc"), 20)))

pd_df

Convert to a spark dataframe

In [None]:
df = spark.createDataFrame(pd_df)

#must run .show() to see the spark dataframe 
df.show(2)

In [None]:
df.describe().show()

In [None]:
from pydataset import data

mpg = spark.createDataFrame(data("mpg"))
mpg.show(2)

## Create Columns

This returns a column object:

In [None]:

mpg.hwy

To select the values in the column object, we follow it with show. And we can use .select to select multiple column objects. 

In [None]:
# select 3 columns and show 2 rows
mpg.select(mpg.hwy, mpg.cty, mpg.model).show(2)

In [None]:
# select 1 column, then select that column and add one to each of the values, return and show both columns. 

mpg.select(mpg.hwy, mpg.hwy + 1).show(2)

In [None]:
# select & alias hwy column name
mpg.select(mpg.hwy.alias("highway_mileage")).show(2)

In [None]:
# create a var col1 to store the column object of hwy, aliased as highway_mileage
col1 = mpg.hwy.alias("highway_mileage")

# create a var col2 to store the column object of hwy divided by 2, aliased as highway_mileage_halved
col2 = (mpg.hwy/2).alias("highway_mileage_halved")

# select both, referencing the new variables, col1 and col2
mpg.select(col1, col2).show(1)

In [None]:
from pyspark.sql.functions import col, expr

In [None]:
col("hwy")

In [None]:
avg_col = (col("hwy") + col("cty")) / 2

mpg.select(
    col("hwy").alias("highway_mileage"),
    mpg.cty.alias("city_mileage"),
    avg_col.alias("avg_mileage")
).show(2)

Another way to do what we did above, using expr() ...

In [None]:
mpg.select(
    expr("hwy"),  # the same as `col`
    expr("hwy + 1"),  # an arithmetic expression
    expr("hwy AS highway_mileage"),  # using an alias
    expr("hwy + 1 AS highway_incremented"),  # a combination of the above
).show(5)


Briging together all the different ways to accomplish the same task...select a column & alias it. 

In [None]:
mpg.select(
    mpg.hwy.alias("highway"),
    col("hwy").alias("highway"),
    expr("hwy").alias("highway"),
    expr("hwy AS highway"),
).show(5)

## Spark SQL

In [None]:
# register the table with spark
mpg.createOrReplaceTempView("mpg")

In [None]:
spark.sql(
    """
    SELECT hwy, cty, (hwy + cty) / 2 as avg
    FROM mpg
    """
).show(2)

## Type Casting

In [None]:
mpg.dtypes

In [None]:
mpg.printSchema()

In [None]:
mpg.select(mpg.hwy.cast("string")).printSchema()


In [None]:
# shows null because can't be converted. 
mpg.select(mpg.model, mpg.model.cast("int")).show(2)

## Built in Functions

In [None]:
# avg and mean are aliases of each other 
from pyspark.sql.functions import concat, sum, avg, min, max, count, mean
# from pyspark.sql.functions import *

In [None]:
mpg.select(
    sum(mpg.hwy) / count(mpg.hwy).alias("average_1"),
    avg(mpg.hwy).alias("average_2"),
    min(mpg.hwy),
    max(mpg.hwy),
).show()

In [None]:
mpg.select(concat(mpg.manufacturer, mpg.model)).show(5)


The function for string literals: lit

In [None]:
from pyspark.sql.functions import lit


In [None]:
mpg.select(concat(mpg.cyl, lit(" cylinders"))).show(5)

More String Manipulation

In [None]:
from pyspark.sql.functions import regexp_extract, regexp_replace

In [None]:
textdf = spark.createDataFrame(
    pd.DataFrame(
        {
            "address": [
                "600 Navarro St ste 600, San Antonio, TX 78205",
                "3130 Broadway St, San Antonio, TX 78209",
                "303 Pearl Pkwy, San Antonio, TX 78215",
                "1255 SW Loop 410, San Antonio, TX 78227",
            ]
        }
    )
)

textdf.show(truncate=False)

Using regexp_extract - extract at least one capture group and create new column of that. 

In [None]:
textdf.select(
    "address",
    regexp_extract("address", r"^(\d+)", 1).alias("street_no"),
    regexp_extract("address", r"^\d+\s([\w\s]+?),", 1).alias("street"),
).show(truncate=False)

regexp_replace lets us make substitutions based on a regular expression.

In [None]:
textdf.select(
    "address",
    regexp_replace("address", r"^.*?,\s*", "").alias("city_state_zip"),
).show(truncate=False)


## Filtering with .filter and .where

In [None]:
mpg.filter(mpg.cyl == 4).where(mpg["class"] == "subcompact").show()

## Conditionals with When and Otherwise

In [None]:
from pyspark.sql.functions import when

In [None]:
mpg.select(
    mpg.displ,
    (
        when(mpg.displ < 2, "small")
        .when(mpg.displ < 3, "medium")
        .otherwise("large")
        .alias("engine_size")
    ),
).show(10)

## Sorting & Ordering 

In [None]:
mpg.sort(mpg.hwy).show(8)

In [None]:
from pyspark.sql.functions import asc, desc

In [None]:
mpg.sort(mpg.hwy.desc())
# is the same as
mpg.sort(col("hwy").desc())
# is the same as
mpg.sort(desc("hwy")).show(5)

In [None]:
mpg.sort(desc("class"), mpg.cyl.asc(), col("hwy").desc()).show()

## Grouping & Aggregating

In [None]:
mpg.groupBy(mpg.cyl)
mpg.groupBy(col("cyl"))
mpg.groupBy("cyl")

In [None]:
mpg.groupBy(mpg.cyl).agg(avg(mpg.cty), avg(mpg.hwy)).show()

In [None]:
mpg.groupBy("cyl", "class").agg(avg(mpg.cty), avg(mpg.hwy)).show()

Rollup will do the same aggregations, but also include overall totals. 

In [None]:
mpg.rollup("cyl").count().sort("cyl").show()

Here the null value in cyl indicates the total count.

In [None]:
mpg.rollup("cyl").agg(expr("avg(hwy)")).sort("cyl").show()

Here the null value in cyl indicates the total count.

In [None]:
mpg.rollup("cyl", "class").mean("hwy").sort(col("cyl"), col("class")).show()


## Crosstables & Pivot Tables

Crosstab is a simple way to get counts. 

In [None]:
mpg.crosstab("class", "cyl").show()

We can use pivot to compute different aggregations than count. 

In [None]:
mpg.groupby("class").pivot("cyl").mean("hwy").show()


## Missing Values

In [None]:
df = spark.createDataFrame(
    pd.DataFrame(
        {"x": [1, 2, np.nan, 4, 5, np.nan], "y": [np.nan, 0, 0, 3, 1, np.nan]}
    )
)
df.show()

In [None]:
df.na.drop().show()

In [None]:
df.na.fill(0).show()

In [None]:
df.na.fill(0, subset="x").show()

In [None]:
df.na.drop(subset="y").show()

## Transformations of Dataframes

In [None]:
# how is spark thinking about our df? 
mpg.explain()


Only a single step above ^

This one below shows another step after "Scan ExistingRDD", a "Project" that contains the names of the columns we are looking for.

In [None]:
mpg.select(mpg.cyl, mpg.hwy).explain()


And now we are going to do a more advanced select calcluation, but this is still just a single step. 

In [None]:
mpg.select(((mpg.cyl + mpg.hwy) / 2).alias("avg_mpg")).explain()


Notice that our filter below is also a single step.



In [None]:
mpg.filter(mpg.cyl == 6).explain()

In [None]:
mpg.select("cyl", "hwy").filter(expr("cyl = 6")).explain()
mpg.filter(expr("cyl = 6")).select("cyl", "hwy").explain()


## More DF Manipulations

For these examples, we'll be working with a dataset of observations of the weather in seattle.



In [None]:
from vega_datasets import data

weather = data.seattle_weather().assign(date=lambda df: df.date.astype(str))
weather = spark.createDataFrame(weather)
weather.show(6)

In [None]:
# print number of rows & columns
print(weather.count(), "rows", len(weather.columns), "columns")


In [None]:
# get the date range of the dataset. 
min_date, max_date = weather.select(min("date"), max("date")).first()
min_date, max_date

In [None]:
# compute temp average 
weather = weather.withColumn(
    "temp_avg", expr("ROUND(temp_min + temp_max) / 2")
).drop("temp_max", "temp_min")

weather.show(6)

Calculate total rainfall

In [None]:
from pyspark.sql.functions import month, year, quarter


In [None]:
(
    weather.withColumn("month", month("date"))
    .groupBy("month")
    .agg(sum("precipitation").alias("total_rainfall"))
    .sort("month")
    .show()
)

Let's now take a look at the average temperature for each type of weather in December 2013:

In [None]:
(
    weather.filter(month("date") == 12)
    .filter(year("date") == 2013)
    .groupBy("weather")
    .agg(mean("temp_avg"))
    .show()
)

Let's now find out how many days had freezing temperatures in each month of 2013.

In [None]:
(
    weather.filter(year("date") == 2013)
    .withColumn("freezing_temps", (weather.temp_avg <= 0).cast("int"))
    .withColumn("month", month("date"))
    .groupBy("month")
    .agg(sum("freezing_temps").alias("no_of_days_with_freezing_temps"))
    .sort("month")
    .show()
)

One last example, let's calculate the average temperature for each quarter of each year:



In [None]:
(
    weather.withColumn("quarter", quarter("date"))
    .withColumn("year", year("date"))
    .groupBy("year", "quarter")
    .agg(mean("temp_avg").alias("temp_avg"))
    .sort("year", "quarter")
    .show()
)



We could use a pivot table instead: 

In [None]:
(
    weather.withColumn("quarter", quarter("date"))
    .withColumn("year", year("date"))
    .groupBy("quarter")
    .pivot("year")
    .agg(expr("ROUND(MEAN(temp_avg), 2) AS temp_avg"))
    .sort("quarter")
    .show()
)



## Joins

We'll start by creating some data that we can join together:



In [None]:
users = spark.createDataFrame(
    pd.DataFrame(
        {
            "id": [1, 2, 3, 4, 5, 6],
            "name": ["bob", "joe", "sally", "adam", "jane", "mike"],
            "role_id": [1, 2, 3, 3, np.nan, np.nan],
        }
    )
)
roles = spark.createDataFrame(
    pd.DataFrame(
        {
            "id": [1, 2, 3, 4],
            "name": ["admin", "author", "reviewer", "commenter"],
        }
    )
)
print("--- users ---")
users.show()
print("--- roles ---")
roles.show()

To join two dataframes together, we'll need to call the .join method on one of them and supply the other as an argument. In addition, we'll need to supply the condition on which we are joining. In our case, we are joining where the role_id column on the users table is equal to the id column on the roles table.

In [None]:
users.join(roles, on=users.role_id == roles.id).show()

By default, spark will perform an inner join, meaning that records from both dataframes will have a match with the other. We can also specify either a left or a right join, which will keep all of the records from either the left or right side, even if those records don't have a match with the other dataframe.

In [None]:
users.join(roles, on=users.role_id == roles.id, how="left").show()

In [None]:
users.join(roles, on=users.role_id == roles.id, how="right").show()


Notice that examples above have a duplicate id column. There are several ways we could go about dealing with this:

alias each dataframe + explicitly select columns after joining (this could also be implemented with spark SQL)
rename duplicated columns before merging
drop duplicated columns after the merge (.drop(right.id))



# Wrangling

In this lesson, we will acquire and prepare the data we will use in the rest of this module.

- Acquiring Data
- Data Prep
- Train Test Split

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.getOrCreate()

## Acquisition

Spark lets us read data in from a variety of data sources using what it calls a DataFrameReader. We can access the read property of our spark object and then set various options and read from a data source.

### Using Data Schemas

In [None]:
df = spark.read.csv("data/source.csv", sep=",", header=True, inferSchema=True)


### Writing Data

## Data Prep

### Column Renaming

### Data Types

### Data Transformations

### New Features

### Joining New Dataset

## Data Splitting