<a href="https://colab.research.google.com/github/Matteo-Artuso/pyspark_exemple/blob/main/4_spark_data_joining.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 40 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 56.8 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=0141c44efdf3e0f5ad4b46a891369bbc48d0340b61db52eb201cb064aaebd5b8
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.1


In [None]:
# Import the basic spark library
from pyspark.sql import SparkSession

# Create an entry point to the PySpark Application
spark = SparkSession.builder \
      .master("local") \
      .appName("MyFirstSparkApplication") \
      .getOrCreate()

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, FloatType, ArrayType

#Createe the schema using StructField(Name, Type, Nullable)
schema = StructType([ \
    StructField("Pizza Name", StringType(), True), \
    StructField("Price", FloatType(), True), \
    StructField("Ingredients_ID", StringType(), True) \
])

df_data = [("Margherita", 5.95, "IG_1"),
        ("Calzone", 7.95, "IG_2"),
        ("Diavola", 5.95, "IG_3"),
        ("Prosciutto", 7.95, "IG_4"),
        ("Speck & Brie", 7.95, "IG_7"),
        ("Tonno & Cipolle", 7.95, "IG_8")]

df = spark.createDataFrame(data = df_data, schema = schema)
df.printSchema()
df.show(truncate=False)

root
 |-- Pizza Name: string (nullable = true)
 |-- Price: float (nullable = true)
 |-- Ingredients_ID: string (nullable = true)

+---------------+-----+--------------+
|Pizza Name     |Price|Ingredients_ID|
+---------------+-----+--------------+
|Margherita     |5.95 |IG_1          |
|Calzone        |7.95 |IG_2          |
|Diavola        |5.95 |IG_3          |
|Prosciutto     |7.95 |IG_4          |
|Speck & Brie   |7.95 |IG_7          |
|Tonno & Cipolle|7.95 |IG_8          |
+---------------+-----+--------------+



In [None]:
ingredient_schema = StructType([ \
    StructField("Ingredients_ID", StringType(), True), \
    StructField("Ingredients", ArrayType(StringType()), True) \
])

ingredient_df_data = [("IG_1", ["Tomato Sauce", "Mozzarella Cheese", "Basil"]),
                    ("IG_2", ["Tomato Sauce", "Mozzarella Cheese", "Prosciutto Cotto"]),
                    ("IG_3", ["Tomato Sauce", "Mozzarella Cheese", "Spicy Salame"]),
                    ("IG_4", ["Tomato Sauce", "Mozzarella Cheese", "Prosciutto Cotto"]),
                    ("IG_5", ["Tomato Sauce", "Mozzarella Cheese", "Speck", "Brie"]),
                    ("IG_6", ["Tomato Sauce", "Mozzarella Cheese", "Tuna", "Onions"])]

ingredient_df = spark.createDataFrame(data = ingredient_df_data, schema = ingredient_schema)
ingredient_df.printSchema()
ingredient_df.show(truncate=False)

root
 |-- Ingredients_ID: string (nullable = true)
 |-- Ingredients: array (nullable = true)
 |    |-- element: string (containsNull = true)

+--------------+---------------------------------------------------+
|Ingredients_ID|Ingredients                                        |
+--------------+---------------------------------------------------+
|IG_1          |[Tomato Sauce, Mozzarella Cheese, Basil]           |
|IG_2          |[Tomato Sauce, Mozzarella Cheese, Prosciutto Cotto]|
|IG_3          |[Tomato Sauce, Mozzarella Cheese, Spicy Salame]    |
|IG_4          |[Tomato Sauce, Mozzarella Cheese, Prosciutto Cotto]|
|IG_5          |[Tomato Sauce, Mozzarella Cheese, Speck, Brie]     |
|IG_6          |[Tomato Sauce, Mozzarella Cheese, Tuna, Onions]    |
+--------------+---------------------------------------------------+



<h4>Join Operations</h4>

In [None]:
# Inner join - returns the tuples that matched in both tables
df.join(ingredient_df, df.Ingredients_ID == ingredient_df.Ingredients_ID, "inner") \
     .show(truncate=False)

+----------+-----+--------------+--------------+---------------------------------------------------+
|Pizza Name|Price|Ingredients_ID|Ingredients_ID|Ingredients                                        |
+----------+-----+--------------+--------------+---------------------------------------------------+
|Margherita|5.95 |IG_1          |IG_1          |[Tomato Sauce, Mozzarella Cheese, Basil]           |
|Calzone   |7.95 |IG_2          |IG_2          |[Tomato Sauce, Mozzarella Cheese, Prosciutto Cotto]|
|Diavola   |5.95 |IG_3          |IG_3          |[Tomato Sauce, Mozzarella Cheese, Spicy Salame]    |
|Prosciutto|7.95 |IG_4          |IG_4          |[Tomato Sauce, Mozzarella Cheese, Prosciutto Cotto]|
+----------+-----+--------------+--------------+---------------------------------------------------+



In [None]:
# Outer join - returns all the tuples from both tables, if no matches are found, the tuples are returned with null values
df.join(ingredient_df, df.Ingredients_ID == ingredient_df.Ingredients_ID, "outer") \
     .show(truncate=False)

# Outer join
df.join(ingredient_df, df.Ingredients_ID == ingredient_df.Ingredients_ID, "full") \
     .show(truncate=False)

# Outer join
df.join(ingredient_df, df.Ingredients_ID == ingredient_df.Ingredients_ID, "fullouter") \
     .show(truncate=False)

+---------------+-----+--------------+--------------+---------------------------------------------------+
|Pizza Name     |Price|Ingredients_ID|Ingredients_ID|Ingredients                                        |
+---------------+-----+--------------+--------------+---------------------------------------------------+
|Margherita     |5.95 |IG_1          |IG_1          |[Tomato Sauce, Mozzarella Cheese, Basil]           |
|Calzone        |7.95 |IG_2          |IG_2          |[Tomato Sauce, Mozzarella Cheese, Prosciutto Cotto]|
|Diavola        |5.95 |IG_3          |IG_3          |[Tomato Sauce, Mozzarella Cheese, Spicy Salame]    |
|Prosciutto     |7.95 |IG_4          |IG_4          |[Tomato Sauce, Mozzarella Cheese, Prosciutto Cotto]|
|null           |null |null          |IG_5          |[Tomato Sauce, Mozzarella Cheese, Speck, Brie]     |
|null           |null |null          |IG_6          |[Tomato Sauce, Mozzarella Cheese, Tuna, Onions]    |
|Speck & Brie   |7.95 |IG_7          |null    

In [None]:
# Left join
df.join(ingredient_df, df.Ingredients_ID == ingredient_df.Ingredients_ID, "left") \
     .show(truncate=False)

# Left Outer join
df.join(ingredient_df, df.Ingredients_ID == ingredient_df.Ingredients_ID, "leftouter") \
     .show(truncate=False)

+---------------+-----+--------------+--------------+---------------------------------------------------+
|Pizza Name     |Price|Ingredients_ID|Ingredients_ID|Ingredients                                        |
+---------------+-----+--------------+--------------+---------------------------------------------------+
|Margherita     |5.95 |IG_1          |IG_1          |[Tomato Sauce, Mozzarella Cheese, Basil]           |
|Prosciutto     |7.95 |IG_4          |IG_4          |[Tomato Sauce, Mozzarella Cheese, Prosciutto Cotto]|
|Calzone        |7.95 |IG_2          |IG_2          |[Tomato Sauce, Mozzarella Cheese, Prosciutto Cotto]|
|Diavola        |5.95 |IG_3          |IG_3          |[Tomato Sauce, Mozzarella Cheese, Spicy Salame]    |
|Tonno & Cipolle|7.95 |IG_8          |null          |null                                               |
|Speck & Brie   |7.95 |IG_7          |null          |null                                               |
+---------------+-----+--------------+--------

In [None]:
# Right join
df.join(ingredient_df, df.Ingredients_ID == ingredient_df.Ingredients_ID, "right") \
     .show(truncate=False)

# Right Outer join
df.join(ingredient_df, df.Ingredients_ID == ingredient_df.Ingredients_ID, "rightouter") \
     .show(truncate=False)

+----------+-----+--------------+--------------+---------------------------------------------------+
|Pizza Name|Price|Ingredients_ID|Ingredients_ID|Ingredients                                        |
+----------+-----+--------------+--------------+---------------------------------------------------+
|Margherita|5.95 |IG_1          |IG_1          |[Tomato Sauce, Mozzarella Cheese, Basil]           |
|Prosciutto|7.95 |IG_4          |IG_4          |[Tomato Sauce, Mozzarella Cheese, Prosciutto Cotto]|
|null      |null |null          |IG_5          |[Tomato Sauce, Mozzarella Cheese, Speck, Brie]     |
|Calzone   |7.95 |IG_2          |IG_2          |[Tomato Sauce, Mozzarella Cheese, Prosciutto Cotto]|
|Diavola   |5.95 |IG_3          |IG_3          |[Tomato Sauce, Mozzarella Cheese, Spicy Salame]    |
|null      |null |null          |IG_6          |[Tomato Sauce, Mozzarella Cheese, Tuna, Onions]    |
+----------+-----+--------------+--------------+-------------------------------------------

In [None]:
# Left Semi join (i.e., left inner join)
df.join(ingredient_df, df.Ingredients_ID == ingredient_df.Ingredients_ID, "leftsemi") \
     .show(truncate=False)

+----------+-----+--------------+
|Pizza Name|Price|Ingredients_ID|
+----------+-----+--------------+
|Margherita|5.95 |IG_1          |
|Calzone   |7.95 |IG_2          |
|Diavola   |5.95 |IG_3          |
|Prosciutto|7.95 |IG_4          |
+----------+-----+--------------+



In [None]:
# Left Anti join - returns all the tuples without a match in the other table
df.join(ingredient_df, df.Ingredients_ID == ingredient_df.Ingredients_ID, "leftanti") \
     .show(truncate=False)

+---------------+-----+--------------+
|Pizza Name     |Price|Ingredients_ID|
+---------------+-----+--------------+
|Tonno & Cipolle|7.95 |IG_8          |
|Speck & Brie   |7.95 |IG_7          |
+---------------+-----+--------------+



In [None]:
# Self join
df.join(df, df.Ingredients_ID == df.Ingredients_ID, "inner") \
     .show(truncate=False)

+---------------+-----+--------------+---------------+-----+--------------+
|Pizza Name     |Price|Ingredients_ID|Pizza Name     |Price|Ingredients_ID|
+---------------+-----+--------------+---------------+-----+--------------+
|Margherita     |5.95 |IG_1          |Margherita     |5.95 |IG_1          |
|Calzone        |7.95 |IG_2          |Calzone        |7.95 |IG_2          |
|Diavola        |5.95 |IG_3          |Diavola        |5.95 |IG_3          |
|Prosciutto     |7.95 |IG_4          |Prosciutto     |7.95 |IG_4          |
|Speck & Brie   |7.95 |IG_7          |Speck & Brie   |7.95 |IG_7          |
|Tonno & Cipolle|7.95 |IG_8          |Tonno & Cipolle|7.95 |IG_8          |
+---------------+-----+--------------+---------------+-----+--------------+



In [None]:
# It is also possible to concatenate multiple joins one after another
df.join(ingredient_df, df.Ingredients_ID == ingredient_df.Ingredients_ID, "inner") \
  .drop(ingredient_df.Ingredients_ID) \
  .join(ingredient_df, df.Ingredients_ID == ingredient_df.Ingredients_ID, "right") \
  .show(truncate=False)

+----------+-----+--------------+---------------------------------------------------+--------------+---------------------------------------------------+
|Pizza Name|Price|Ingredients_ID|Ingredients                                        |Ingredients_ID|Ingredients                                        |
+----------+-----+--------------+---------------------------------------------------+--------------+---------------------------------------------------+
|Margherita|5.95 |IG_1          |[Tomato Sauce, Mozzarella Cheese, Basil]           |IG_1          |[Tomato Sauce, Mozzarella Cheese, Basil]           |
|Prosciutto|7.95 |IG_4          |[Tomato Sauce, Mozzarella Cheese, Prosciutto Cotto]|IG_4          |[Tomato Sauce, Mozzarella Cheese, Prosciutto Cotto]|
|null      |null |null          |null                                               |IG_5          |[Tomato Sauce, Mozzarella Cheese, Speck, Brie]     |
|Calzone   |7.95 |IG_2          |[Tomato Sauce, Mozzarella Cheese, Prosciutto Cott