### Joins

In [0]:
from pyspark.sql import functions as F, types as T

rows_customers = [
    (1,  "Asha",  "IN", True),
    (2,  "Bob",   "US", False),
    (3,  "Chen",  "CN", True),
    (4,  "Diana", "US", None),
    (None, "Ghost","UK", False),     # NULL key to demo null join behavior
]

rows_orders = [
    (101, 1,   120.0, "IN"),
    (102, 1,    80.0, "IN"),
    (103, 2,    50.0, "US"),
    (104, 5,    30.0, "DE"),         # no matching customer_id
    (105, 3,   200.0, "CN"),
    (106, None, 15.0, "UK"),         # NULL key wonâ€™t match
    (107, 3,    40.0, "CN"),
    (108, 2,    75.0, "US"),
]

schema_customers = T.StructType([
    T.StructField("customer_id", T.IntegerType(), True),
    T.StructField("name",        T.StringType(),  True),
    T.StructField("country",     T.StringType(),  True),
    T.StructField("vip",         T.BooleanType(), True),
])

schema_orders = T.StructType([
    T.StructField("order_id",    T.IntegerType(), True),
    T.StructField("customer_id", T.IntegerType(), True),
    T.StructField("amount",      T.DoubleType(),  True),
    T.StructField("country",     T.StringType(),  True),  # same column name to show collisions
])

df_customers = spark.createDataFrame(rows_customers, schema_customers)
df_orders    = spark.createDataFrame(rows_orders,    schema_orders)

display(df_customers)
display(df_orders)

#### Inner Join

In [0]:
inner_join=df_orders.join(df_customers,on='customer_id',how='inner')

In [0]:
display(inner_join)

#### Left Join

In [0]:
left_join=df_orders.join(df_customers,on='customer_id',how='left')

In [0]:
display(left_join)

In [0]:
o=df_orders.alias("o")
display(o)

In [0]:
c=df_customers.alias("c")

In [0]:
display(c)

In [0]:
df_alias_inner_join=o.join(c,on="customer_id",how='inner')
display(df_alias_inner_join)

- Remove Ambiguity as country column is twice

In [0]:
from pyspark.sql.functions import col

df_inner_clean=(
    o.join(c,on="customer_id",how='inner')
    .select("order_id","customer_id","amount",
           col("o.country").alias("order_country"),
           "name",
            col("c.country").alias("customer_country"),
            "vip")
)

In [0]:
display(df_inner_clean)

### Full Join

In [0]:
full_join=o.join(c,on="customer_id",how='full')
display(full_join)

### Left semi join

In [0]:
left_semi_join=o.join(c,on="customer_id",how='left_semi') #returns only the rows from the left table that have a match in the right table
display(left_semi_join)

### Left Anti Join

In [0]:
left_anti_join=o.join(c,on="customer_id",how='left_anti') #returns only the rows from the left table that do not have a match in the right table
display(left_anti_join)

### Multi Key Join

In [0]:
df_multi=o.join(c,on=["customer_id","country"],how='inner') #join on multiple columns
display(df_multi)