
## Table of Contents

* [Parallel Join Strategies](#parallel-join)
    * [Broadcast Hash Join](#bhj)
    * [Sort Merge Join](#smj)
* [Parallel Joins](#other-joins)
    * [Inner Join](#inner)        
    * [Left Join](#left)        
    * [Full Outer Join](#full_outer)        
    * [Left Semi Join](#left_semi)        
    * [Left Anti Join](#left_anti)            

## Import Spark classes and create Spark Context

In [1]:
from pyspark import SparkConf
from pyspark.sql import SparkSession

master = "local[*]"
app_name = "Parallel Join"

spark_conf = SparkConf().setMaster(master).setAppName(app_name)
spark = SparkSession.builder.config(conf=spark_conf).getOrCreate()

sc = spark.sparkContext
sc.setLogLevel('ERROR')


## Parallel Join Strategies

In [2]:
###### Setting dataset
import random
random.seed(0)

# List of tuples
tableA = [(i,'A'+str(i)) for i in range(100,110)]
tableB = [(i,'B'+str(i)) for i in range(10,1000)]

# Shuffle the lists to not have it ordered
random.shuffle(tableA)
random.shuffle(tableB)

# Converting to dataframe each list of tuples
df_A = spark.createDataFrame(tableA , ["id", "valueA"])
df_A.show()
df_B = spark.createDataFrame(tableB , ["id", "valueB"])
df_B.show()

+---+------+
| id|valueA|
+---+------+
|107|  A107|
|108|  A108|
|101|  A101|
|105|  A105|
|103|  A103|
|104|  A104|
|102|  A102|
|100|  A100|
|109|  A109|
|106|  A106|
+---+------+

+---+------+
| id|valueB|
+---+------+
|450|  B450|
|633|  B633|
|170|  B170|
|561|  B561|
|247|  B247|
|400|  B400|
|590|  B590|
|290|  B290|
|134|  B134|
|739|  B739|
|991|  B991|
|504|  B504|
|241|  B241|
|311|  B311|
|964|  B964|
|669|  B669|
|928|  B928|
|831|  B831|
|270|  B270|
|119|  B119|
+---+------+
only showing top 20 rows



### 1. Broadcast Hash Join
In this type of join, one dataset(the smaller one) is broadcasted (sent over) to each executor. By doing this, we can avoid the shuffle for the other larger dataset. Not doing the shuffle increase the speed of the join operation.

<i>We need to use the broadcast function inside the join to broadcast the table</i>

In [3]:
from pyspark.sql.functions import broadcast

# Use broadcast function to specify the use of BroadcastHashJoin algorithm
df_joined_broadcast = df_B.join(broadcast(df_A),df_A.id==df_B.id,how='inner')
df_joined_broadcast.show()

+---+------+---+------+
| id|valueB| id|valueA|
+---+------+---+------+
|108|  B108|108|  A108|
|104|  B104|104|  A104|
|100|  B100|100|  A100|
|102|  B102|102|  A102|
|106|  B106|106|  A106|
|109|  B109|109|  A109|
|101|  B101|101|  A101|
|103|  B103|103|  A103|
|105|  B105|105|  A105|
|107|  B107|107|  A107|
+---+------+---+------+



#### Explanation Query Plan with Broadcast Hash Join
The order of execution goes from top to bottom. The steps are:
1. Scan dataframe A (left side)
  - Filter id not null in dataframe A
2. Scan dataframe B (right side)
  - Filter id not null in dataframe B
3. Broadcast dataframe B: Send dataframe B to each each partition
4. BroadcastHashJoin: Perform join between each partition and the broadcasted dataframe B
5. Project: Select the attributes from both dataframes (df_A: id,valueA and df_b: id,valueB)
6. Collect all the results to the driver

### 2. Sort Merge Join
In this join approach, the datasets are sorted first and the second operation merges the sorted data in the partition. This is the <strong>default</strong> join algorithm used by spark.

In [4]:
df_joined_sortmerge = df_A.join(df_B,df_A.id==df_B.id,how='inner')
df_joined_sortmerge.show()

+---+------+---+------+
| id|valueA| id|valueB|
+---+------+---+------+
|107|  A107|107|  B107|
|108|  A108|108|  B108|
|101|  A101|101|  B101|
|103|  A103|103|  B103|
|105|  A105|105|  B105|
|104|  A104|104|  B104|
|102|  A102|102|  B102|
|100|  A100|100|  B100|
|106|  A106|106|  B106|
|109|  A109|109|  B109|
+---+------+---+------+



#### Explanation Query Plan with Sort Merge Join
The order of execution goes from top to bottom. The steps are:
1. Scan dataframe A (left side)
  - Filter id not null in dataframe A
2. Scan dataframe B (right side)
  - Filter id not null in dataframe B
3. Exchange dataframe A: Partition dataframe A with hash partitioning
4. Exchange dataframe B: Partition dataframe B with hash partitioning
5. Sort dataframe A: Sort data within each partition
6. Sort dataframe B: Sort data within each partition
7. Perform Sort Merge Join between both dataframes
5. Project: Select the attributes from both dataframes (df_A: id,valueA and df_b: id,valueB)
6. Collect all the results to the driver

## Parallel Join

Now we will implement multiple join operations and visualise the parallelism embedded in Spark to perform these kind of queries. The join queries that we will perform are:
1. Inner Join
1. Left Join
1. Full Outer Join
1. Left Semi Join
1. Left Anti Join


In [5]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Dictionary Schema
schema_dictionary = StructType([
    StructField("Country", StringType(), True),
    StructField("Code", StringType(), True),
    StructField("Population", StringType(), True),  # Assuming Population as string to match your structure; you might want it as IntegerType
    StructField("GDP per Capita", StringType(), True),  # Assuming GDP per Capita as string; consider FloatType for actual calculations
])

# Summer Schema
schema_summer = StructType([
    StructField("Year", StringType(), True),
    StructField("City", StringType(), True),
    StructField("Sport", StringType(), True),
    StructField("Discipline", StringType(), True),
    StructField("Athlete", StringType(), True),
    StructField("Country", StringType(), True),
    StructField("Gender", StringType(), True),
    StructField("Event", StringType(), True),
    StructField("Medal", StringType(), True),
])

# Winter Schema
schema_winter = StructType([
    StructField("Year", StringType(), True),
    StructField("City", StringType(), True),
    StructField("Sport", StringType(), True),
    StructField("Discipline", StringType(), True),
    StructField("Athlete", StringType(), True),
    StructField("Country", StringType(), True),
    StructField("Gender", StringType(), True),
    StructField("Event", StringType(), True),
    StructField("Medal", StringType(), True),
])

data_dictionary = [
    ("United States", "USA", "331002651", "59939"),
    ("China", "CHN", "1439323776", "10416"),
    ("Japan", "JPN", "126476461", "39434"),
]

# Sample Data for Summer Olympics
data_summer = [
    ("2020", "Tokyo", "Swimming", "100m Freestyle", "John Doe", "USA", "M", "100m Freestyle", "Gold"),
    ("2020", "Tokyo", "Athletics", "Marathon", "Jane Doe", "KEN", "F", "Marathon", "Silver"),
]

# Sample Data for Winter Olympics
data_winter = [
    ("2018", "Pyeongchang", "Skiing", "Downhill", "Alex Smith", "USA", "M", "Downhill", "Bronze"),
    ("2018", "Pyeongchang", "Skating", "Figure Skating", "Emily White", "CAN", "F", "Singles", "Gold"),
]

# Create DataFrames
df_dictionary = spark.createDataFrame(data=data_dictionary, schema=schema_dictionary)
df_summer = spark.createDataFrame(data=data_summer, schema=schema_summer)
df_winter = spark.createDataFrame(data=data_winter, schema=schema_winter)

# Repartition Summer and Winter DataFrames as described
df_summer = df_summer.repartition(4)
df_winter = df_winter.repartition(4)

df_dictionary.createOrReplaceTempView("sql_dictionary")
df_summer.createOrReplaceTempView("sql_summer")
df_winter.createOrReplaceTempView("sql_winter")


In [6]:
## Verifying the number of partitions for each dataframe
## You can explore the data of each csv file with the function printSchema()
print(f"DICTIONARY INFO:")
print(f"Number of partitions: {df_dictionary.rdd.getNumPartitions()}")
df_dictionary.printSchema()
      
print(f"SUMMER INFO:")
print(f"Number of partitions: {df_summer.rdd.getNumPartitions()}")
df_summer.printSchema()
      
print(f"WINTER INFO:")
print(f"Number of partitions: {df_winter.rdd.getNumPartitions()}")
df_winter.printSchema()

DICTIONARY INFO:
Number of partitions: 8
root
 |-- Country: string (nullable = true)
 |-- Code: string (nullable = true)
 |-- Population: string (nullable = true)
 |-- GDP per Capita: string (nullable = true)

SUMMER INFO:
Number of partitions: 4
root
 |-- Year: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Sport: string (nullable = true)
 |-- Discipline: string (nullable = true)
 |-- Athlete: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Event: string (nullable = true)
 |-- Medal: string (nullable = true)

WINTER INFO:
Number of partitions: 4
root
 |-- Year: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Sport: string (nullable = true)
 |-- Discipline: string (nullable = true)
 |-- Athlete: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Event: string (nullable = true)
 |-- Medal: string (nullable = true)



## 1. Inner Join
This join operation returns the result set that have matching values in both dataframes.

In [7]:
#### Join summer and dictionary using Dataframes
df_dict_inner_summ = df_dictionary.join(df_summer,df_dictionary.Code==df_summer.Country,how='inner')
print(df_dict_inner_summ.count())
df_dict_inner_summ.show()

## Join summer and dictionary using SQL
sql_dict_inner_summ = spark.sql('''
  SELECT d.*,w.*
  FROM sql_dictionary d JOIN sql_summer w
  ON d.Code=w.Country
''')
print(sql_dict_inner_summ.count())
sql_dict_inner_summ.show()

1
+-------------+----+----------+--------------+----+-----+--------+--------------+--------+-------+------+--------------+-----+
|      Country|Code|Population|GDP per Capita|Year| City|   Sport|    Discipline| Athlete|Country|Gender|         Event|Medal|
+-------------+----+----------+--------------+----+-----+--------+--------------+--------+-------+------+--------------+-----+
|United States| USA| 331002651|         59939|2020|Tokyo|Swimming|100m Freestyle|John Doe|    USA|     M|100m Freestyle| Gold|
+-------------+----+----------+--------------+----+-----+--------+--------------+--------+-------+------+--------------+-----+

1
+-------------+----+----------+--------------+----+-----+--------+--------------+--------+-------+------+--------------+-----+
|      Country|Code|Population|GDP per Capita|Year| City|   Sport|    Discipline| Athlete|Country|Gender|         Event|Medal|
+-------------+----+----------+--------------+----+-----+--------+--------------+--------+-------+------+-

## 2. Left Join
This join operation returns all records from the left dataframe and the matched records from the right dataframe.

In [8]:
from pyspark.sql.functions import col

#### Join summer and dictionary using Dataframes
df_dict_left_summ = df_dictionary.join(df_summer,df_dictionary.Code==df_summer.Country,how='left')
# df_dict_inner_summ = df_dict_inner_summ.filter(col('Discipline').isNull())
print(df_dict_left_summ.count())
df_dict_left_summ.show()

## Join summer and dictionary using SQL
sql_dict_left_summ = spark.sql('''
  SELECT d.*,w.*
  FROM sql_dictionary d LEFT JOIN sql_summer w
  ON d.Code=w.Country
''')
print(sql_dict_left_summ.count())
sql_dict_left_summ.show()

3
+-------------+----+----------+--------------+----+-----+--------+--------------+--------+-------+------+--------------+-----+
|      Country|Code|Population|GDP per Capita|Year| City|   Sport|    Discipline| Athlete|Country|Gender|         Event|Medal|
+-------------+----+----------+--------------+----+-----+--------+--------------+--------+-------+------+--------------+-----+
|        China| CHN|1439323776|         10416|NULL| NULL|    NULL|          NULL|    NULL|   NULL|  NULL|          NULL| NULL|
|        Japan| JPN| 126476461|         39434|NULL| NULL|    NULL|          NULL|    NULL|   NULL|  NULL|          NULL| NULL|
|United States| USA| 331002651|         59939|2020|Tokyo|Swimming|100m Freestyle|John Doe|    USA|     M|100m Freestyle| Gold|
+-------------+----+----------+--------------+----+-----+--------+--------------+--------+-------+------+--------------+-----+

3
+-------------+----+----------+--------------+----+-----+--------+--------------+--------+-------+------+-

## 3. Full Outer Join
This join operation returns a result set that includes rows from both left and right dataframes.

In [9]:
#### Join summer and dictionary using Dataframes
df_dict_outer_summ = df_dictionary.join(df_summer,df_dictionary.Code==df_summer.Country,how='outer')
print(df_dict_outer_summ.count())
df_dict_outer_summ.show()

## Join summer and dictionary using SQL
sql_dict_outer_summ = spark.sql('''
  SELECT d.*,w.*
  FROM sql_dictionary d FULL OUTER JOIN sql_summer w
  ON d.Code=w.Country
''')
print(sql_dict_outer_summ.count())
sql_dict_outer_summ.show()

4
+-------------+----+----------+--------------+----+-----+---------+--------------+--------+-------+------+--------------+------+
|      Country|Code|Population|GDP per Capita|Year| City|    Sport|    Discipline| Athlete|Country|Gender|         Event| Medal|
+-------------+----+----------+--------------+----+-----+---------+--------------+--------+-------+------+--------------+------+
|        China| CHN|1439323776|         10416|NULL| NULL|     NULL|          NULL|    NULL|   NULL|  NULL|          NULL|  NULL|
|        Japan| JPN| 126476461|         39434|NULL| NULL|     NULL|          NULL|    NULL|   NULL|  NULL|          NULL|  NULL|
|         NULL|NULL|      NULL|          NULL|2020|Tokyo|Athletics|      Marathon|Jane Doe|    KEN|     F|      Marathon|Silver|
|United States| USA| 331002651|         59939|2020|Tokyo| Swimming|100m Freestyle|John Doe|    USA|     M|100m Freestyle|  Gold|
+-------------+----+----------+--------------+----+-----+---------+--------------+--------+----

## 4. Left Semi Join
This join operation is like an inner join, but only the left dataframe columns and values are selected

In [10]:
#### Join summer and dictionary using Dataframes
df_dict_semi_summ = df_dictionary.join(df_summer,df_dictionary.Code==df_summer.Country,how='left_semi')
print(df_dict_semi_summ.count())
df_dict_semi_summ.show()

sql_dict_outer_summ = spark.sql('''
  SELECT d.*
  FROM sql_dictionary d LEFT SEMI JOIN sql_summer w
  ON d.Code=w.Country
''')
print(sql_dict_outer_summ.count())
sql_dict_outer_summ.show()

1
+-------------+----+----------+--------------+
|      Country|Code|Population|GDP per Capita|
+-------------+----+----------+--------------+
|United States| USA| 331002651|         59939|
+-------------+----+----------+--------------+

1
+-------------+----+----------+--------------+
|      Country|Code|Population|GDP per Capita|
+-------------+----+----------+--------------+
|United States| USA| 331002651|         59939|
+-------------+----+----------+--------------+



## 5. Left Anti Join
This join operation is the difference of the left dataframe minus the right dataframe, as it selects all rows from df1 that are not present in df2

In [11]:
#### Join summer and dictionary using Dataframes
df_dict_anti_summ = df_dictionary.join(df_summer,df_dictionary.Code==df_summer.Country,how='left_anti')
print(df_dict_anti_summ.count())
df_dict_anti_summ.show()

sql_dict_outer_summ = spark.sql('''
  SELECT d.*
  FROM sql_dictionary d LEFT ANTI JOIN sql_summer w
  ON d.Code=w.Country
''')
print(sql_dict_outer_summ.count())
sql_dict_outer_summ.show()


2
+-------+----+----------+--------------+
|Country|Code|Population|GDP per Capita|
+-------+----+----------+--------------+
|  China| CHN|1439323776|         10416|
|  Japan| JPN| 126476461|         39434|
+-------+----+----------+--------------+

2
+-------+----+----------+--------------+
|Country|Code|Population|GDP per Capita|
+-------+----+----------+--------------+
|  China| CHN|1439323776|         10416|
|  Japan| JPN| 126476461|         39434|
+-------+----+----------+--------------+

