# Overview of PySpark data management

In this notebook, we will illustrate how various data verbs are implemented in `pyspark`.

## `polars` $\approx$ `pyspark`

You will see a lot of similarities between `polars` and `pyspark`

1. Lazy evaluation and column expression,
2. Parallel processing out-of-the-box,
3. Dot-chained queries, and
4. Data verbs related to `SQL` and/or `dplyr`.

## Data verbs in `pyspark`

In this lecture, we will look at how the common data verbs are implemented in `pyspark`.  Luckily, the implementation is similar to `polars`, so it should be a relatively pain-free transition.

### Overview of Basic Data Verbs in `polars` and `pyspark`

Verb/Function | `polars` | `pyspark` |
--------------|----------|-----------|
Column expr.  | `pl.col('name') ...` | `col('name') ...`|
SELECT | `.select(...)` | `.select(...)` |
FILTER | `.filter(...)` | `.where(...)` |
MUTATE | `.with_columns(...)` | `.withColumn(...)` |
GROUPBY | `.group_by(...)` | `.groupBy(...)`|
AGGREGATE | `.agg(...)` | `.agg(...)` |
JOIN | `l_tbl.join(r_tbl,...)` | `l_tbl.join(r_tbl,...)`|
UNION | `pl.concat` or SQL | `t1.union(t2)` | 
STACK COLUMNS | `.unpivot(...)` | `.unpivot(...)`|
UNSTACK COLUMNS | `.pivot(...)` | `.groupBy(...).pivot(...).<aggfunc>(...)`|

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, mean

spark = SparkSession.builder.appName('Ops').getOrCreate()
heroes = spark.read.csv('./data/heroes_information.csv', inferSchema=True, header=True)

heroes.limit(5).toPandas()

## Selecting Columns

The first verb, `select` 

* filters the *columns*
* At the core of `SQL` statements

In [None]:
from pyspark.sql.functions import col

(select_query :=
 heroes
 .select(heroes.name,      # Column via dataframe.name
         col('Gender'),    # Column expression (lazy)
         'Weight')         # String
).limit(5).toPandas()      # <-- outside the saved query

In [None]:
select_query  # <-- lazy query

## Filtering Rows

The next verb, `filter` 

* filters the *rows*
* is related to the `SQL` `WHERE` clause
* `pyspark`: Use the `where` method

In [None]:
col('Gender') == 'Male' # <-- Lazy column expression

In [None]:
(heroes
 .where(col('Gender') == 'Male')
).limit(5).toPandas()

## Chaining Data Verbs

* Processing df $\rightarrow$ chaining data verbs
* Accomplished through dot-chains

In [None]:
(heroes
 .where(col('Gender') == 'Male')
 .select('name', 
         'Gender', 
         'Weight')
).limit(5).toPandas()

## Constructing New Columns

The third verb, `mutate` 

* Creates new columns
* Changes existing columns
* `pyspark`: Use the `withColumns` method

### Example 3 - Converting Weight to kilograms

In [None]:
(heroes
 .select('name', 
         'Gender', 
         'Weight')
 .withColumn('Weight_kg', col('Weight')/2.2046)
).limit(5).toPandas()

## Referencing a new column

 Use the `col` function with the label from `withColumn`

In [None]:
(new_col_result := 
 heroes
 .select('name', 
         'Gender', 
         'Weight')
 .withColumn('Weight_kg', col('Weight')/2.2046)
 .where(col('Weight_kg') < 100)  # <-- one reason we need lazy expressions
).limit(5).toPandas()

## Simple and Grouped Aggregation

In [None]:
(pitching :=  
 spark.read.csv('./data/Pitching.csv', inferSchema=True, header=True)
).limit(5).toPandas()

### Simple Aggregation

A **simple aggregation** collapses all rows into one row.

<img src="https://github.com/wsu-stat489/module5_intro_to_pyspark/blob/main/img/simple_aggregation.png?raw=1" width=800>

In [None]:
from pyspark.sql.functions import mean, std, max, min

(pitching
  .agg(mean('ERA').alias('mean_era'),
       std('ERA').alias('sd_era'),
       max('W').alias('max_wins'),
       min('W').alias('min_wins'))
).toPandas()

### Group and Aggregate

<img src="https://github.com/wsu-stat489/module5_intro_to_pyspark/blob/main/img/group_and_aggregate.png?raw=1" width=800>

In [None]:
from pyspark.sql.functions import when, col

(eras := 
 pitching
 .where((col('yearID') >= 1900) & (col('yearID') < 1940)) 
 .withColumn('era', (when(col('yearID') < 1920, "dead ball") 
                     .otherwise("after dead ball" ) 
                    )
            )
 .groupby('era')
 .agg(mean('R').alias('mean_runs'))
).toPandas()

### Grouping by more than one category

* `group_by` accepts multiple columns
* Groups all combinations

In [None]:
from pyspark.sql.functions import sum

(pitching
 .select('yearID', 'teamID', 'W')
 .where(col('yearID') >= 1946)
 .groupby('yearID', 'teamID')
 .agg(sum('W').alias('total_wins'))
 .where(col('total_wins') >= 100)
 .sort(col('yearID').asc(), col('total_wins').desc())
).toPandas()

## Joins in `pyspark`

Performed with `df_left.join(df_right, how=type_str)`

In [None]:
(dept := 
 spark.read.csv("./data/department.csv",  header=True, inferSchema=True)
).toPandas()

In [None]:
(empl := 
 spark.read.csv("./data/employee.csv",  header=True, inferSchema=True)
).toPandas()

#### Inner join

In [None]:
(empl.join(dept, empl.DeptID == dept.DeptID, how='inner')
).toPandas()

#### Left join

In [None]:
(empl.join(dept, empl.DeptID == dept.DeptID, how='left')
).toPandas()

#### Right join

In [None]:
(empl.join(dept, empl.DeptID == dept.DeptID, how='right')
).toPandas()

#### Outer join

In [None]:
(empl.join(dept, empl.DeptID == dept.DeptID, how='outer')
).toPandas()

## Joining on multiple keys

Next, we will look at table joins that require matching multiple keys.

### Example -- Total At Bats, Hits, and Runs Allowed in 2010

To illustrate joining on multiple keys, lets

1. Compute the totals for H and R in 2010 for each team from the `Pitching` table.
2. Join on the team name and park.

This is a good example, because team information can change over the years, so we need to match both `teamID` and `yearID`.

#### Step 1. Read and process the pitching table

In [None]:
(pitching := 
 spark.read.csv("./data/Pitching.csv", header=True, inferSchema=True)
).limit(5).toPandas()

In [None]:
(teams := 
 spark.read.csv("./data/Teams.csv", header=True, inferSchema=True)
).limit(5).toPandas()

In [None]:
(pitching_totals_2010 := 
 pitching
 .select('teamID', 'yearID', 'R', 'H')
 #.where(col('yearID') == 2010)
 #.groupBy('teamID', 'yearID')
 #.agg(sum('R').alias('Total Runs'), 
     # sum('H').alias('Total Hits'))
).limit(5).toPandas()

#### Step 2. Read and process the teams table

In [None]:
(team_name_and_park := 
 teams
 .select('yearID', 'teamID', col('name').alias('Team Name'), 'park')
).limit(5).toPandas()

#### Step 3. Perform a left-join.

Since we want to keep all rows in the totals table, and only add the team information when available, we will perform a left join on the totals table.

Notice that the second `on` argument is now a `list` of column expressions, one for each matching rule.

In [None]:
(pitching_totals_2010
 .join(team_name_and_park,
       on = [pitching_totals_2010.yearID == team_name_and_park.yearID,
             pitching_totals_2010.teamID == team_name_and_park.teamID],
       how='left')
).limit(5).toPandas()

## Concatenating Tables with Set-Like Operations in `pyspark`

Now let's look at combining tables with `union`, `intersect`, and `except` in `pyspark`.

In [None]:
(sales_apr := 
 spark.read.csv("./data/auto_sales_apr.csv",  header=True, inferSchema=True)
).toPandas()

In [None]:
(sales_may := 
 spark.read.csv("./data/auto_sales_may.csv",  header=True, inferSchema=True)
).toPandas()

#### UNION and UNION DISTINCT

In [None]:
(combined_sales :=
 sales_apr
 .union(sales_may)
).toPandas()

In [None]:
(sales_apr
 .union(sales_may)
 .distinct()
).toPandas()

#### Including information from the file name

In [None]:
from pyspark.sql.functions import lit

(combined_sales :=
 sales_apr
 .drop('ID')
 .withColumn('Month', lit('Apr'))  # <-- use `lit` to provide a Java literal (similar to pl.lit in polars)
 .union(sales_may
        .drop('ID')
        .withColumn('Month', lit('May'))
       )
).toPandas()


#### INTERSECTION

In [None]:
(sales_apr
 .intersect(sales_may)
).toPandas()

#### DIFFERENCE

In [None]:
(sales_apr
 .exceptAll(sales_may)
).toPandas()

## Reshaping tables

#### Stacking columns with `unpivot`

In [None]:
(combined_sales
 .unpivot(ids = ['Salesperson', 'Month'],
          values = ['Compact','Sedan','SUV','Truck'],
          variableColumnName='Type',
          valueColumnName='Sales'
         )
).toPandas()

#### Unstacking columns with GROUPBY + PIVOT + SUMMARY METHOD

In [None]:
(combined_sales
 .unpivot(ids = ['Salesperson', 'Month'],
          values = ['Compact','Sedan','SUV','Truck'],
          variableColumnName='Type',
          valueColumnName='Sales'
         )
 .groupBy('Salesperson')
 .pivot('Type')
 .sum('Sales')
).toPandas()

## Review of Basic Data Verbs in `polars` and `pyspark`

Verb/Function | `polars` | `pyspark` |
--------------|----------|-----------|
Column expr.  | `pl.col('name') ...` | `col('name') ...`|
SELECT | `.select(...)` | `.select(...)` |
FILTER | `.filter(...)` | `.where(...)` |
MUTATE | `.with_columns(...)` | `.withColumn(...)` |
GROUPBY | `.group_by(...)` | `.groupBy(...)`|
AGGREGATE | `.agg(...)` | `.agg(...)` |
JOIN | `l_tbl.join(r_tbl,...)` | `l_tbl.join(r_tbl,...)`|
UNION | `pl.concat` or SQL | `t1.union(t2)` | 
STACK COLUMNS | `.unpivot(...)` | `.unpivot(...)`|
UNSTACK COLUMNS | `.pivot(...)` | `.groupBy(...).pivot(...).<aggfunc>(...)`|

## <font color="red"> Exercise 4.3 </font>

Determine all the players that have hit more than 40 home runs in a season in the modern era (e.g., since 1946).  The final table should include the players proper name, as well as the team name. 

**Tasks.**

1. Select and filter where possible,
2. Be sure to aggregate across the stints to compute total HR for each player-year,
3. Remove and keys after joining proper names, and
4. Sort the results by year (outer; ascending) and total HR (inner; descending)

**Hint:** You will need join the files listed below.  To get credit for this exercise, use the join `pyspark` join methods presented above.

In [None]:
(teams := 
 spark.read.csv("./data/teams.csv",  header=True, inferSchema=True)
).toPandas()

In [None]:
(batting := 
 spark.read.csv("./data/batting.csv",  header=True, inferSchema=True)
).toPandas().columns

In [None]:
(people :=
 spark.read.csv('./data/People.csv', header = True, inferSchema=True)
).toPandas().columns

In [None]:
(batting_teams :=
 batting.join(teams, batting.yearID == teams.yearID, how='inner')
).limit(5).toPandas().columns

In [88]:
(forty_homers :=
 batting
  .select('playerID','yearID', 'HR', 'teamID')
  .where(col('yearID') > 1946)
  .groupBy('playerID', 'yearID')
  .agg(sum('HR').alias('total_HR'))
  .where(col('total_HR') > 40)
  .join(people, batting.playerID == people.playerID, "inner")
  .drop('playerID')
  .join(teams,
       on = batting.yearID == teams.yearID,
       how='inner')
  .select(batting.yearID, 'nameFirst', 'nameLast','total_HR')
  .distinct()
  .sort(col('yearID').asc(), col('total_HR').desc())
).toPandas()

Unnamed: 0,yearID,nameFirst,nameLast,total_HR
0,1947,Ralph,Kiner,51
1,1947,Johnny,Mize,51
2,1949,Ralph,Kiner,54
3,1949,Ted,Williams,43
4,1950,Ralph,Kiner,47
...,...,...,...,...
255,2019,Christian,Yelich,44
256,2019,Alex,Bregman,41
257,2019,Nelson,Cruz,41
258,2019,Nolan,Arenado,41
