# Joins

## Prerrequisites

Install Spark and Java in VM

In [1]:
# install Java8
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# download spark 3.5.0
!wget -q https://apache.osuosl.org/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz

In [2]:
ls -l # check the .tgz is there

total 391016
drwxr-xr-x 1 root root      4096 Jan 11 17:02 [0m[01;34msample_data[0m/
-rw-r--r-- 1 root root 400395283 Sep  9 02:10 spark-3.5.0-bin-hadoop3.tgz


In [3]:
# unzip it
!tar xf spark-3.5.0-bin-hadoop3.tgz

In [4]:
!pip install -q findspark

In [5]:
!pip install py4j

# For maps
!pip install folium
!pip install plotly



Define the environment

In [6]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"
os.environ["PYSPARK_SUBMIT_ARGS"] = "--master local[*] pyspark-shell"

Start Spark Session

---

In [7]:
import findspark
findspark.init("spark-3.5.0-bin-hadoop3")# SPARK_HOME

from pyspark.sql import SparkSession

# create the session
spark = SparkSession \
        .builder \
        .appName("Joins") \
        .master("local[*]") \
        .getOrCreate()

spark.version

'3.5.0'

In [8]:
spark

In [9]:
# Import sql functions
from pyspark.sql.functions import *

Download datasets

In [10]:
!mkdir -p dataset
!wget -q https://raw.githubusercontent.com/paponsro/spark_edem_2324/master/dataset/guitars.json -P /dataset
!wget -q https://raw.githubusercontent.com/paponsro/spark_edem_2324/master/dataset/guitarPlayers.json -P /dataset
!wget -q https://raw.githubusercontent.com/paponsro/spark_edem_2324/master/dataset/bands.json -P /dataset
!wget -q https://raw.githubusercontent.com/paponsro/spark_edem_2324/master/dataset/employees.csv -P /dataset
!wget -q https://raw.githubusercontent.com/paponsro/spark_edem_2324/master/dataset/salaries.csv -P /dataset
!wget -q https://raw.githubusercontent.com/paponsro/spark_edem_2324/master/dataset/deptmanagers.csv -P /dataset
!wget -q https://raw.githubusercontent.com/paponsro/spark_edem_2324/master/dataset/titles.csv -P /dataset
!ls /dataset

bands.json	  employees.csv       guitars.json  titles.csv
deptmanagers.csv  guitarPlayers.json  salaries.csv


Read JSONs

In [11]:
guitarsDF = spark.read \
    .option("inferSchema", "true") \
    .json("/dataset/guitars.json")

guitaristsDF = spark.read \
    .option("inferSchema", "true") \
    .json("/dataset/guitarPlayers.json")

bandsDF = spark.read \
    .option("inferSchema", "true") \
    .json("/dataset/bands.json")

In [12]:
guitarsDF.show(3)
guitaristsDF.show(3)
bandsDF.show(3)


+--------------------+---+------+------------+
|          guitarType| id|  make|       model|
+--------------------+---+------+------------+
|Electric double-n...|  0|Gibson|    EDS-1275|
|            Electric|  5|Fender|Stratocaster|
|            Electric|  1|Gibson|          SG|
+--------------------+---+------+------------+
only showing top 3 rows

+----+-------+---+------------+
|band|guitars| id|        name|
+----+-------+---+------------+
|   0|    [0]|  0|  Jimmy Page|
|   1|    [1]|  1| Angus Young|
|   2| [1, 5]|  2|Eric Clapton|
+----+-------+---+------------+
only showing top 3 rows

+-----------+---+------------+----+
|   hometown| id|        name|year|
+-----------+---+------------+----+
|     Sydney|  1|       AC/DC|1973|
|     London|  0|Led Zeppelin|1968|
|Los Angeles|  3|   Metallica|1981|
+-----------+---+------------+----+
only showing top 3 rows



## Examples

Inner Join =  everything from BOTH DFs for which there is a row in the right DF satisfying the condition


In [13]:
joinCondition = guitaristsDF.band == bandsDF.id
guitaristsBandsDF = guitaristsDF.join(bandsDF, joinCondition, "inner")
guitaristsBandsDF.show(3)

+----+-------+---+------------+-----------+---+------------+----+
|band|guitars| id|        name|   hometown| id|        name|year|
+----+-------+---+------------+-----------+---+------------+----+
|   1|    [1]|  1| Angus Young|     Sydney|  1|       AC/DC|1973|
|   0|    [0]|  0|  Jimmy Page|     London|  0|Led Zeppelin|1968|
|   3|    [3]|  3|Kirk Hammett|Los Angeles|  3|   Metallica|1981|
+----+-------+---+------------+-----------+---+------------+----+



In [14]:
# things to bear in mind
# guitaristsBandsDF.select("id", "band").show(3) # will crash because there are two columns named "id"

In [15]:
# you can rename the column
bandsDFMod = bandsDF.withColumnRenamed("id", "bandId") \
    .withColumnRenamed("name", "bandName")
guitaristsBandsDF2 = guitaristsDF.join(bandsDFMod,
    guitaristsDF.band == bandsDFMod.bandId, "inner").orderBy("name")
guitaristsBandsDF2.show(3)

# there are other options but it is the best practice if it is possible

+----+-------+---+------------+-----------+------+------------+----+
|band|guitars| id|        name|   hometown|bandId|    bandName|year|
+----+-------+---+------------+-----------+------+------------+----+
|   1|    [1]|  1| Angus Young|     Sydney|     1|       AC/DC|1973|
|   0|    [0]|  0|  Jimmy Page|     London|     0|Led Zeppelin|1968|
|   3|    [3]|  3|Kirk Hammett|Los Angeles|     3|   Metallica|1981|
+----+-------+---+------------+-----------+------+------------+----+



Left Outer = everything in the inner join + all the rows in the LEFT DF, with nulls in where the data is missing

In [16]:
joinCondition2 = guitaristsDF.band == bandsDFMod.bandId
guitaristsDF.join(bandsDFMod, joinCondition2, "left_outer").orderBy("name").show(3)
# see nulls on Eric Calpton

+----+-------+---+------------+--------+------+------------+----+
|band|guitars| id|        name|hometown|bandId|    bandName|year|
+----+-------+---+------------+--------+------+------------+----+
|   1|    [1]|  1| Angus Young|  Sydney|     1|       AC/DC|1973|
|   2| [1, 5]|  2|Eric Clapton|    NULL|  NULL|        NULL|NULL|
|   0|    [0]|  0|  Jimmy Page|  London|     0|Led Zeppelin|1968|
+----+-------+---+------------+--------+------+------------+----+
only showing top 3 rows



  Right Outer = everything in the inner join + all the rows in the RIGHT DF, with nulls in where the data is missing


In [17]:
guitaristsDF.join(bandsDFMod, joinCondition2, "right_outer").orderBy("name").show(3)
# see nulls for The Beatles

+----+-------+----+-----------+---------+------+------------+----+
|band|guitars|  id|       name| hometown|bandId|    bandName|year|
+----+-------+----+-----------+---------+------+------------+----+
|NULL|   NULL|NULL|       NULL|Liverpool|     4| The Beatles|1960|
|   1|    [1]|   1|Angus Young|   Sydney|     1|       AC/DC|1973|
|   0|    [0]|   0| Jimmy Page|   London|     0|Led Zeppelin|1968|
+----+-------+----+-----------+---------+------+------------+----+
only showing top 3 rows



 Outer Join = everything in the inner join + all the rows in BOTH DFs, with nulls in where the data is missing


In [18]:
guitaristsDF.join(bandsDFMod, joinCondition2, "outer").orderBy("name").show(3)
# see nulls for The Beatles and Eric Clapton

+----+-------+----+------------+---------+------+-----------+----+
|band|guitars|  id|        name| hometown|bandId|   bandName|year|
+----+-------+----+------------+---------+------+-----------+----+
|NULL|   NULL|NULL|        NULL|Liverpool|     4|The Beatles|1960|
|   1|    [1]|   1| Angus Young|   Sydney|     1|      AC/DC|1973|
|   2| [1, 5]|   2|Eric Clapton|     NULL|  NULL|       NULL|NULL|
+----+-------+----+------------+---------+------+-----------+----+
only showing top 3 rows



  Semi-joins = everything in the LEFT DF for which there is a row in the right DF satisfying the condition


In [19]:
guitaristsDF.join(bandsDFMod, joinCondition2, "left_semi").orderBy("name").show(3)


+----+-------+---+------------+
|band|guitars| id|        name|
+----+-------+---+------------+
|   1|    [1]|  1| Angus Young|
|   0|    [0]|  0|  Jimmy Page|
|   3|    [3]|  3|Kirk Hammett|
+----+-------+---+------------+



Anti-joins = everything in the LEFT DF for which there is NO row in the right DF satisfying the condition


In [20]:
guitaristsDF.join(bandsDFMod, joinCondition2, "anti").orderBy("name").show(3)

+----+-------+---+------------+
|band|guitars| id|        name|
+----+-------+---+------------+
|   2| [1, 5]|  2|Eric Clapton|
+----+-------+---+------------+



Cross-join = takes every instance in the LEFT DF and create a new one for every intance in the right DF (not very common)

In [21]:
guitaristsDF.crossJoin(bandsDFMod).show(20)

+----+-------+---+------------+-----------+------+------------+----+
|band|guitars| id|        name|   hometown|bandId|    bandName|year|
+----+-------+---+------------+-----------+------+------------+----+
|   0|    [0]|  0|  Jimmy Page|     Sydney|     1|       AC/DC|1973|
|   1|    [1]|  1| Angus Young|     Sydney|     1|       AC/DC|1973|
|   2| [1, 5]|  2|Eric Clapton|     Sydney|     1|       AC/DC|1973|
|   3|    [3]|  3|Kirk Hammett|     Sydney|     1|       AC/DC|1973|
|   0|    [0]|  0|  Jimmy Page|     London|     0|Led Zeppelin|1968|
|   1|    [1]|  1| Angus Young|     London|     0|Led Zeppelin|1968|
|   2| [1, 5]|  2|Eric Clapton|     London|     0|Led Zeppelin|1968|
|   3|    [3]|  3|Kirk Hammett|     London|     0|Led Zeppelin|1968|
|   0|    [0]|  0|  Jimmy Page|Los Angeles|     3|   Metallica|1981|
|   1|    [1]|  1| Angus Young|Los Angeles|     3|   Metallica|1981|
|   2| [1, 5]|  2|Eric Clapton|Los Angeles|     3|   Metallica|1981|
|   3|    [3]|  3|Kirk Hammett|Los

*Union Join is just an Union (note it only works if both DFs have the same schema)

## Exercises
1. Read employees.csv, deptmanagers.csv, salaries.csv and titles.csv to a DFs.
2. Show all employees and their max salary (there can be different salaries registered for the same employee)
3. Show all employees who were never managers. Check it by getting all the managers and checking that they are not in the table
4. Find the job titles of the best paid 10 employees in the company (note that there can be different titles registered for the same employee)

Exercise 1

In [22]:
!ls /dataset

employeesDF = spark.read.option("header", "true").csv("/dataset/employees.csv")
employeesDF.show(15)

deptManagersDF = spark.read.option("header", "true").csv("/dataset/deptmanagers.csv")
deptManagersDF.show(15)

salariesDF = spark.read.option("header", "true").csv("/dataset/salaries.csv")
salariesDF.show(15)

titlesDF = spark.read.option("header", "true").csv("/dataset/titles.csv")
titlesDF.show(15)

employeesDF.printSchema()

deptManagersDF.printSchema()

salariesDF.printSchema()

titlesDF.printSchema()

bands.json	  employees.csv       guitars.json  titles.csv
deptmanagers.csv  guitarPlayers.json  salaries.csv
+------+----------+----------+-----------+------+----------+
|emp_no|birth_date|first_name|  last_name|gender| hire_date|
+------+----------+----------+-----------+------+----------+
| 10010|1963-06-01| Duangkaew|   Piveteau|     F|1989-08-24|
| 10020|1952-12-24|    Mayuko|    Warwick|     M|1991-01-26|
| 10030|1958-07-14|     Elvis|    Demeyer|     M|1994-02-17|
| 10040|1959-09-13|     Weiyi|    Meriste|     F|1993-02-14|
| 10050|1958-05-21|   Yinghua|     Dredge|     M|1990-12-25|
| 10060|1961-10-15|  Breannda|Billingsley|     M|1987-11-02|
| 10070|1955-08-20|    Reuven| Garigliano|     M|1985-10-14|
| 10080|1957-12-03|    Premal|       Baek|     M|1985-11-19|
| 10090|1961-05-30|    Kendra|    Hofting|     M|1986-03-14|
| 10100|1953-04-21|  Hironobu|  Haraldson|     F|1987-09-21|
| 10110|1957-03-07|    Xuejia|     Ullian|     F|1986-08-22|
| 10120|1960-03-26|    Armond| Fairtl

Exercise 2

In [23]:
joinCondition = employeesDF.emp_no == salariesDF.emp_no
employeesSalariesDF = employeesDF.join(salariesDF, joinCondition, "inner")
employeesSalariesDF.show(3)

employeesSalariesDF.groupBy(employeesSalariesDF.first_name, employeesSalariesDF.last_name).agg(max(employeesSalariesDF.salary).alias("Salario máximo de cada director")).show()

+------+----------+----------+---------+------+----------+------+------+----------+----------+
|emp_no|birth_date|first_name|last_name|gender| hire_date|emp_no|salary| from_date|   to_date|
+------+----------+----------+---------+------+----------+------+------+----------+----------+
| 10010|1963-06-01| Duangkaew| Piveteau|     F|1989-08-24| 10010| 72488|1996-11-24|1997-11-24|
| 10010|1963-06-01| Duangkaew| Piveteau|     F|1989-08-24| 10010| 74347|1997-11-24|1998-11-24|
| 10010|1963-06-01| Duangkaew| Piveteau|     F|1989-08-24| 10010| 75405|1998-11-24|1999-11-24|
+------+----------+----------+---------+------+----------+------+------+----------+----------+
only showing top 3 rows

+----------+-----------+-------------------------------+
|first_name|  last_name|Salario máximo de cada director|
+----------+-----------+-------------------------------+
|     Aamer|       Bahl|                          66474|
|     Aamer|  Bharadwaj|                          64254|
|     Aamer|    Cardazo| 

Exercise 3

In [24]:
joinCondition = employeesDF.emp_no == deptManagersDF.emp_no
employeesManagersDF = employeesDF.join(deptManagersDF, joinCondition, "left_anti")

employeesManagersDF.filter(employeesManagersDF.emp_no == 110420).show(15)

+------+----------+----------+---------+------+----------+
|emp_no|birth_date|first_name|last_name|gender| hire_date|
+------+----------+----------+---------+------+----------+
+------+----------+----------+---------+------+----------+



Exercise 4

In [67]:

employeesSalariesDF = employeesDF.join(salariesDF, "emp_no", "inner")

# Agrupar y obtener el salario máximo por empleado
employeesSalariesDFGrouped = employeesSalariesDF.groupBy(
    employeesSalariesDF.emp_no,
    employeesSalariesDF.first_name,
    employeesSalariesDF.last_name
).agg(
    F.max(employeesSalariesDF.salary).alias("Salario máximo")
).orderBy(desc("Salario máximo"))

# Unir con el DataFrame de títulos
employeesSalariesTitlesDF = employeesSalariesDFGrouped.join(
    titlesDF, "emp_no", "inner"
)

# Usar ventana para obtener el título correspondiente a la fecha máxima
windowSpec = Window.partitionBy(employeesSalariesTitlesDF["emp_no"]).orderBy(
    employeesSalariesTitlesDF["from_date"].desc()
)

# Seleccionar el título correspondiente a la fecha máxima
employeesSalariesTitlesDF = employeesSalariesTitlesDF.withColumn(
    "row_num", F.row_number().over(windowSpec)
).filter("row_num == 1").drop("row_num")

# Seleccionar las columnas deseadas y mostrar los primeros 10 resultados
resultDF = employeesSalariesTitlesDF.select(
    employeesSalariesTitlesDF["emp_no"],
    employeesSalariesTitlesDF["first_name"],
    employeesSalariesTitlesDF["last_name"],
    employeesSalariesTitlesDF["title"],
    employeesSalariesTitlesDF["Salario máximo"]
).orderBy(desc("Salario máximo"))

resultDF.show(10)


+------+----------+-----------+----------------+--------------+
|emp_no|first_name|  last_name|           title|Salario máximo|
+------+----------+-----------+----------------+--------------+
|283410|      Mads|   Henseler| Senior Engineer|         99999|
|299970|    Yuping| Pargaonkar|    Senior Staff|         99999|
|248210|   Odoardo|  Muchinsky|    Senior Staff|         99998|
|254170|      Suvo|     Kakkar|    Senior Staff|         99998|
|463940|     Jinya|      Kroha|    Senior Staff|         99997|
|434780|      Duro|     Zaiane|    Senior Staff|         99996|
|489450|     Aloke|   Schieder|Technique Leader|         99993|
| 42160|   Bezalel|    Dalphin|    Senior Staff|         99991|
|295950|  Godehard|  Braunmuhl|    Senior Staff|         99986|
|248620|  Tetsurou|Mayerwieser|    Senior Staff|         99984|
+------+----------+-----------+----------------+--------------+
only showing top 10 rows

