# Joins in `pyspark`

Performed with `df_left.join(df_right, how=type_str)`

In [1]:
from pyspark.sql import SparkSession
from more_pyspark import to_pandas
spark = SparkSession.builder.appName('Ops').getOrCreate()
deptk = spark.read.csv("./data/department.csv",  header=True, inferSchema=True)
deptk.collect() >> to_pandas

Unnamed: 0,DeptID,DeptName
0,31,Sales
1,33,Engineering
2,34,Clerical
3,35,Marketing


In [2]:
emplk = spark.read.csv("./data/employee.csv",  header=True, inferSchema=True)
emplk.collect() >> to_pandas

Unnamed: 0,DeptID,LastName
0,31.0,Rafferty
1,33.0,Jones
2,33.0,Heisenberg
3,34.0,Robinson
4,34.0,Smith
5,,Williams


#### Inner join

In [3]:
(emplk.join(deptk, emplk.DeptID == deptk.DeptID, how='inner')
 .collect()) >> to_pandas

Unnamed: 0,DeptID,LastName,DeptName
0,31,Rafferty,Sales
1,33,Jones,Engineering
2,33,Heisenberg,Engineering
3,34,Robinson,Clerical
4,34,Smith,Clerical


#### Left join

In [4]:
(emplk.join(deptk, emplk.DeptID == deptk.DeptID, how='left')
 .collect()) >> to_pandas

Unnamed: 0,DeptID,LastName,DeptName
0,31.0,Rafferty,Sales
1,33.0,Jones,Engineering
2,33.0,Heisenberg,Engineering
3,34.0,Robinson,Clerical
4,34.0,Smith,Clerical
5,,Williams,


#### Right join

In [5]:
deptk

DataFrame[DeptID: int, DeptName: string]

In [6]:
(emplk.join(deptk, emplk.DeptID == deptk.DeptID, how='right')
 .collect()) >> to_pandas

Unnamed: 0,DeptID,LastName,DeptName
0,31,Rafferty,Sales
1,33,Heisenberg,Engineering
2,33,Jones,Engineering
3,34,Smith,Clerical
4,34,Robinson,Clerical
5,35,,Marketing


#### Outer join

In [7]:
(emplk.join(deptk, emplk.DeptID == deptk.DeptID, how='outer')
 .collect()) >> to_pandas

Unnamed: 0,DeptID,LastName,DeptName
0,,Williams,
1,35.0,,Marketing
2,34.0,Robinson,Clerical
3,34.0,Smith,Clerical
4,31.0,Rafferty,Sales
5,33.0,Jones,Engineering
6,33.0,Heisenberg,Engineering


## <font color="red"> Exercise 2 </font>

Determine all the players that have hit more than 100 home runs in a season.  The final table should include the players proper name, as well as the team name.  

**Hint:** You will need join the files listed below.  To get credit for this exercise, use the join `pyspark` join methods presented above.

In [8]:
files = ("./data/baseball/core/Batting.csv", 
              "./data/baseball/core/People.csv",
              "./data/baseball/core/Teams.csv")

In [9]:
from pyspark.sql import SparkSession
from more_pyspark import get_spark_types, to_pandas
from functoolz import pipeable
from dfply import *

In [10]:
spark = SparkSession.builder.appName('Ops').getOrCreate()

In [11]:
# Your code here
batting, people, teams = [spark.read.csv(f, header=True, inferSchema=True) for f in files]


In [12]:
battingData = batting.collect() >> to_pandas >> select(X.yearID, X.playerID, X.HR)

In [13]:
TeamData = teams.collect() >> to_pandas >> select(X.name, X.yearID)

In [14]:
playerData = people.collect() >> to_pandas >> select(X.nameGiven, X.playerID)

In [15]:
finalTable = (battingData 
                >> inner_join(TeamData, by = 'yearID') 
                >> inner_join(playerData, by = 'playerID')
                >> filter_by(X.HR> 50))
                

In [16]:
finalTable


Unnamed: 0,yearID,playerID,HR,name,nameGiven
296648,1920,ruthba01,54,Boston Red Sox,George Herman
296649,1920,ruthba01,54,Brooklyn Robins,George Herman
296650,1920,ruthba01,54,Boston Braves,George Herman
296651,1920,ruthba01,54,Chicago White Sox,George Herman
296652,1920,ruthba01,54,Chicago Cubs,George Herman
296653,1920,ruthba01,54,Cincinnati Reds,George Herman
296654,1920,ruthba01,54,Cleveland Indians,George Herman
296655,1920,ruthba01,54,Detroit Tigers,George Herman
296656,1920,ruthba01,54,New York Giants,George Herman
296657,1920,ruthba01,54,New York Yankees,George Herman


## Up Next

Stuff