<a href="https://colab.research.google.com/github/Shivayogi-A/Pyspark_programming/blob/master/Pyspark_Joins.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!apt-get update # Update apt-get repository.
!apt-get install openjdk-8-jdk-headless -qq > /dev/null # Install Java.
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz # Download Apache Sparks.
!tar xf spark-3.1.1-bin-hadoop3.2.tgz # Unzip the tgz file.
!pip install -q findspark # Install findspark. Adds PySpark to the System path during runtime.

# Set environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

!ls

# Initialize findspark
import findspark
findspark.init()

# create a spark session

from pyspark.sql import SparkSession
Spark = SparkSession.builder\
        .appName("Studentfilter")\
        .getOrCreate()

Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
Get:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Hit:3 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:4 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Ign:5 https://r2u.stat.illinois.edu/ubuntu jammy InRelease
Get:6 https://r2u.stat.illinois.edu/ubuntu jammy Release [5,713 B]
Get:7 https://r2u.stat.illinois.edu/ubuntu jammy Release.gpg [793 B]
Get:8 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Hit:9 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Get:10 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [127 kB]
Hit:11 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Get:12 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  Packages [907 kB]
Hit:13 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Get

PySpark SQL Joins comes with more optimizations by default (thanks to DataFrames), but there are still some performance issues to consider while using it. Understanding how to effectively utilize PySpark joins is essential for conducting comprehensive data analysis, building data pipelines, and deriving valuable insights from large-scale datasets.

In this PySpark SQL Join, you will learn different Join syntaxes and use different Join types on two or more DataFrames and Datasets using examples.

**PySpark Join Syntax**

* PySpark SQL join has a below syntax and it can be accessed directly from DataFrame.

In [None]:
first_DF.join(second_DF,first_DF.column_name ==  second_DF.column_name,"JoinType")

**PySpark Join Types**


Below are the different Join Types PySpark supports.

**Join String** - - - - - - - - - - - - -  - - - - - - - - - - -**Equivalent SQL Join**\
inner	 - - - - - - - - - - - - - - - - - - - - - - - - -- - - - -INNER JOIN\
outer, full, fullouter, full_outer - - - - - - - - - 	FULL OUTER JOIN\
left, leftouter, left_outer - - - - - - - - - - - - - - 	LEFT JOIN\
right, rightouter, right_outer - - - - - - - - - - - RIGHT JOIN\
cross\
anti, leftanti, left_anti\
semi, leftsemi, left_semi



Before diving into PySpark SQL Join illustrations, let’s initiate “emp” and “dept” DataFrames.The emp DataFrame contains the “emp_id” column with unique values, while the dept DataFrame contains the “dept_id” column with unique values. Additionally, the “emp_dept_id” from “emp” refers to the “dept_id” in the “dept” dataset.



In [3]:
emp = [(1,"Smith",-1,"2018","10","M",3000),
       (2,"Rose",1,"2010","20","M",4000),
       (3,"Williams",1,"2010","10","M",1000),
       (4,"Jones",2,"2005","10","F",2000),
       (5,"Brown",2,"2010","40","",-1),
       (6,"Brown",2,"2010","50","",-1)
       ]

empColumns = ["emp_id","name","superior_emp_id","year_joined","emp_dept_id","gender","salary"]

empDF = Spark.createDataFrame(data=emp, schema = empColumns)
empDF.printSchema()
empDF.show()

dept = [("Finance",10),
        ("Marketing",20),
        ("Sales",30), \
        ("IT",40) \
        ]

deptColumns = ["dept_name","dept_id"]

deptDF = Spark.createDataFrame(data=dept, schema = deptColumns)
deptDF.printSchema()
deptDF.show()

root
 |-- emp_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- superior_emp_id: long (nullable = true)
 |-- year_joined: string (nullable = true)
 |-- emp_dept_id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)

+------+--------+---------------+-----------+-----------+------+------+
|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|     1|   Smith|             -1|       2018|         10|     M|  3000|
|     2|    Rose|              1|       2010|         20|     M|  4000|
|     3|Williams|              1|       2010|         10|     M|  1000|
|     4|   Jones|              2|       2005|         10|     F|  2000|
|     5|   Brown|              2|       2010|         40|      |    -1|
|     6|   Brown|              2|       2010|         50|      |    -1|
+------+--------+---------------+-----------+-----------+------+-----

**How Join works?**

PySpark’s join operation combines data from two or more Datasets based on a common column or key. It is a fundamental operation in PySpark and is similar to SQL joins.

**Common Key**: In order to join two or more datasets we need a common key or a column on which you want to join. This key is used to join the matching rows from the datasets.

**Partitioning**: PySpark Datasets are distributed and partitioned across multiple nodes in a cluster. Ideally, data with the same join key should be located in the same partition. If the Datasets are not already partitioned on the join key, PySpark may perform a shuffle operation to redistribute the data, ensuring that rows with the same join key are on the same node. Shuffling can be an expensive operation, especially for large Datasets.

**Join Type Specification**: We can specify the type of join like inner join, full join, left join, etc., by specifying on “how” parameter of the .join() method. This parameter determines which rows should be included or excluded in the resulting Dataset.

**Join Execution**: PySpark performs the join by comparing the values in the common key column between the Datasets.

**Inner Join**: Returns only the rows with matching keys in both DataFrames.


**Left Join**: Returns all rows from the left DataFrame and matching rows from the right DataFrame.


**Right Join**: Returns all rows from the right DataFrame and matching rows from the left DataFrame.


**Full Outer Join**: Returns all rows from both DataFrames, including matching and non-matching rows.


**Left Semi Join**: Returns all rows from the left DataFrame where there is a match in the right DataFrame.


**Left Anti Join**: Returns all rows from the left DataFrame where there is no match in the right DataFrame.




 **PySpark Inner Join DataFrame**

The default join in PySpark is the inner join, commonly used to retrieve data from two or more DataFrames based on a shared key. An Inner join combines two DataFrames based on the key (common column) provided and results in rows where there is a matching found. Rows from both DataFrames are dropped with a non-matching key.

In [7]:
in_df = empDF.join(deptDF,empDF.emp_dept_id == deptDF.dept_id, "inner")
in_df.show(truncate = False)

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+



This example drops “emp_dept_id” with value 50 from “emp” And “dept_id” with value 30 from “dept” datasets. Following is the result of the above Join statement.

 **PySpark Left Outer Join**

Left a.k.a Leftouter join returns all rows from the left dataset regardless of match found on the right dataset when join expression doesn’t match, it assigns null for that record and drops records from right where match not found.

In [9]:
left_df = empDF.join(deptDF,empDF.emp_dept_id == deptDF.dept_id, "left")
left_df.show(truncate = False)

## another way to write the same expression is
"""empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"leftouter")
    .show(truncate=False)"""

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|6     |Brown   |2              |2010       |50         |      |-1    |null     |null   |
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+



'empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"leftouter")\n    .show(truncate=False)'

In our dataset, the record with “emp_dept_id” 50 does not have a corresponding entry in the “dept” dataset, resulting in null values in the “dept” columns (dept_name & dept_id). Additionally, the entry with “dept_id” 30 from the “dept” dataset is omitted from the results.

**Right Outer Join**

Right a.k.a Rightouter join is opposite of left join, here it returns all rows from the right dataset regardless of math found on the left dataset, when join expression doesn’t match, it assigns null for that record and drops records from left where match not found.

In [10]:
right_df = empDF.join(deptDF, empDF.emp_dept_id == deptDF.dept_id, "right")
right_df.show(truncate = False)

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|null  |null    |null           |null       |null       |null  |null  |Sales    |30     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+



In our example, the dataset on the right, containing “dept_id” with a value of 30, does not have a corresponding record in the left dataset “emp”. Consequently, this record contains null values for the columns from “emp”. Additionally, the record with “emp_dept_id” value 50 is dropped as no match was found in the left dataset. Below is the result of the aforementioned join expression.

**Full Outer Join**

Outer a.k.a full, fullouter join in PySpark combines the results of both left and right outer joins, ensuring that all records from both DataFrames are included in the resulting DataFrame. It includes all rows from both DataFrames and fills in missing values with nulls where there is no match. In other words, it merges the DataFrames based on a common key, but retains all rows from both DataFrames, even if there’s no match. This join type is useful when you want to preserve all the information from both datasets, regardless of whether there’s a match on the key or not.

In [12]:
full_outer_df = empDF.join(deptDF, empDF.emp_dept_id == deptDF.dept_id, "outer")
full_outer_df.show(truncate = False)

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|6     |Brown   |2              |2010       |50         |      |-1    |null     |null   |
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|null  |null    |null           |null       |null       |null  |null  |Sales    |30     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+



This code snippet performs a full outer join between two PySpark DataFrames, empDF and deptDF, based on the condition that emp_dept_id from empDF is equal to dept_id from deptDF.

In our “emp” dataset, the “emp_dept_id” with a value of 50 does not have a corresponding record in the “dept” dataset, resulting in null values in the “dept” columns. Similarly, the “dept_id” 30 does not have a record in the “emp” dataset, hence you observe null values in the “emp” columns.

**Left Semi Join**\
A Left Semi Join in PySpark returns only the rows from the left DataFrame (the first DataFrame mentioned in the join operation) where there is a match with the right DataFrame (the second DataFrame). It does not include any columns from the right DataFrame in the resulting DataFrame. This join type is useful when you only want to filter rows from the left DataFrame based on whether they have a matching key in the right DataFrame.

Left Semi Join can also be achieved by selecting only the columns from the left dataset from the result of the inner join

In [15]:
left_semi_df = empDF.join(deptDF,empDF.emp_dept_id == deptDF.dept_id, "left_semi")
left_semi_df.show(truncate = False)

+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |
|3     |Williams|1              |2010       |10         |M     |1000  |
|4     |Jones   |2              |2005       |10         |F     |2000  |
|2     |Rose    |1              |2010       |20         |M     |4000  |
|5     |Brown   |2              |2010       |40         |      |-1    |
+------+--------+---------------+-----------+-----------+------+------+



**Left Anti Join**\
A Left Anti Join in PySpark returns only the rows from the left DataFrame (the first DataFrame mentioned in the join operation) where there is no match with the right DataFrame (the second DataFrame). It excludes any rows from the left DataFrame that have a corresponding key in the right DataFrame. This join type is useful when you want to filter out rows from the left DataFrame that have matching keys in the right DataFrame.

In [16]:
left_anti_df = empDF.join(deptDF, empDF.emp_dept_id == deptDF.dept_id, "left_anti")
left_anti_df.show(truncate = False)

+------+-----+---------------+-----------+-----------+------+------+
|emp_id|name |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+-----+---------------+-----------+-----------+------+------+
|6     |Brown|2              |2010       |50         |      |-1    |
+------+-----+---------------+-----------+-----------+------+------+



**Using SQL Expression**\
Alternatively, you can also use SQL query to join DataFrames/tables in PySpark. To do so, first, create a temporary view using createOrReplaceTempView(), then use the spark.sql() to execute the join query.

In [26]:
empDF.createOrReplaceTempView("EMP")
deptDF.createOrReplaceTempView("DEPT")

joinDF = Spark.sql("select * from EMP e, DEPT d where e.emp_dept_id == d.dept_id") \
  .show(truncate=False)

joinDF2 = Spark.sql("select * from EMP e INNER JOIN DEPT d ON e.emp_dept_id == d.dept_id") \
  .show(truncate=False)

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+-

**PySpark SQL Join on multiple DataFrames**\
When you need to join more than two tables, you either use SQL expression after creating a temporary view on the DataFrame or use the result of join operation to join with another DataFrame like chaining them. for example

In [28]:
# syntax for joining multiple joins
df1.join(df2,df1.id1 == df2.id2,"inner") \
   .join(df3,df1.id1 == df3.id3,"inner")

+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |
|2     |Rose    |1              |2010       |20         |M     |4000  |
|3     |Williams|1              |2010       |10         |M     |1000  |
|4     |Jones   |2              |2005       |10         |F     |2000  |
|5     |Brown   |2              |2010       |40         |      |-1    |
|6     |Brown   |2              |2010       |50         |      |-1    |
+------+--------+---------------+-----------+-----------+------+------+



**Frequently asked questions on PySpark Joins**\
**What is the default join in PySpark?**\
In PySpark the default join type is “inner” join when using with .join() method. If you don’t explicitly specify the join type using the “how” parameter, it will perform the inner join. One can change the join type using the how parameter of .join().

**Is join expensive in PySpark?**\
Yes, Join in PySpark is expensive because of the data shuffling (wider transformation) that happens between the partitioned data in a cluster. It basically depends on the data size, data skew, cluster configuration, join type being performed, partitioning, and broadcast joins.

**Can we join on multiple columns in PySpark?**\
Yes, we can join on multiple columns. Joining on multiple columns involves more join conditions with multiple keys for matching the rows between the datasets.It can be achieved by passing a list of column names as the join condition when using the .join() method.

**How do I drop duplicate columns after joining PySpark?**\
PySpark distinct() function is used to drop/remove the duplicate rows (all columns) from Dataset and dropDuplicates() is used to drop rows based on selected (one or multiple) columns

**What is the difference between the inner join and the left join?**\
The key difference is that an inner join includes only the rows with matching keys in both Datasets, while a left join includes all the rows from the left Dataset and matches them with rows from the right Dataset where there’s a match. Non-matching rows in the left Dataset in a left join are included with null values in the columns from the right Dataset.

**What is the difference between left join and left outer join?**\
Both terms refer to the same type of join operation, and they can be used interchangeably. The “OUTER” keyword is optional when specifying a “LEFT JOIN.”