<a href="https://colab.research.google.com/github/arulrajgopal-zerotoone/zero_to_one_spark/blob/main/apache_spark/13_joins.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
!pip install pyspark

from pyspark.sql import SparkSession

#create spark session
spark= SparkSession.builder.appName('mysparksession').getOrCreate()

#create spark context
sc = spark.sparkContext

In [0]:
from pyspark.sql.functions import col,broadcast
from pyspark.sql.types import StructType, StructField, StringType, IntegerType


In [0]:
emp = [(1,"Smith",-1,"2018","10","M",3000), \
    (2,"Rose",1,"2010","20","M",4000), \
    (3,"Williams",1,"2010","10","M",1000), \
    (4,"Jones",2,"2005","10","F",2000), \
    (5,"Brown",2,"2010","40","",-1), \
      (6,"Brown",2,"2010","50","",-1) \
  ]
empColumns = ["emp_id","name","superior_emp_id","year_joined", \
       "emp_dept_id","gender","salary"]

empDF = spark.createDataFrame(data=emp, schema = empColumns)


dept = [("Finance",10), \
    ("Marketing",20), \
    ("Sales",30), \
    ("IT",40) \
  ]
deptColumns = ["dept_name","dept_id"]
deptDF = spark.createDataFrame(data=dept, schema = deptColumns)

empDF.printSchema()
deptDF.printSchema()
empDF.show(truncate=False)
deptDF.show(truncate=False)

## spark joins
1. inner
2. outer, full, fullouter, full_outer
3. left, leftouter, left_outer
4. right, rightouter, right_outer
5. anti, leftanti, left_anti
6. semi, leftsemi, left_semi

#inner

In [0]:
#inner join
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"inner") \
     .show(truncate=False)

#outer

In [0]:
#outer
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"outer")\
    .show(truncate=False)


#right

In [0]:
# right
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"right") \
   .show(truncate=False)

#left

In [0]:
# left
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"left")\
    .show(truncate=False)

#left anti & left semi

In [0]:
# left
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"left")\
      .filter(col("dept_id").isNull())\
      .drop("dept_name","dept_id")\
      .show(truncate=False)

In [0]:
# left anti join
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"leftanti")\
   .show(truncate=False)

In [0]:
# left_semi
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"leftsemi")\
   .show(truncate=False)

#join with multiple keys

In [0]:
record_1 = [1,'A','arul','cricket']
record_2 = [2,'A','sekar','chess']
record_3 = [3,'A','kumar','tennis']
record_4 = [1,'B', 'ganesh','football']
record_5 = [2,'B','vinoth','volleyball']
record_6 = [3,'B','Ravi','hockey']

record_6 = [1, 'A','Engineer']
record_7 = [2, 'A', 'doctor']
record_8 = [2,'B', 'lawyer']

list1 = [record_1, record_2, record_3,record_4,record_5]
list2 = [record_6, record_7, record_8]

df_schema = StructType(fields=[StructField("sr_no", IntegerType(), False),
                               StructField("section", StringType(), False),
                                StructField("name", StringType(), True),
                               StructField("fav_game", StringType(), True)
])

df_2_schema = StructType(fields=[StructField("sr_no", IntegerType(), False),
                                 StructField("section", StringType(), False),
                                StructField("profession", StringType(), True),
])

df = spark.createDataFrame(list1, df_schema)
df_2 = spark.createDataFrame(list2, df_2_schema)
df.show()
df_2.show()

In [0]:
joined_df = df.alias('LH')\
                .join(df_2.alias('RH'), (col('LH.sr_no') == col('RH.sr_no')) & (col('LH.section') == col('RH.section')) , 'left')\
                .select('LH.*','RH.profession')

joined_df.show()
joined_df.explain(True)


#broadcast join
Note: - this is not large enough to broadcast, but for demo purpose, it has been done.

In [0]:
large_df = df
small_df = df_2

result_df = large_df.alias("LH").join(broadcast(small_df.alias("RH")), (col('LH.sr_no') == col('RH.sr_no')) & (col('LH.section') == col('RH.section')), "left")
result_df.show()


result_df.explain(True)


spark.conf.get("spark.sql.autoBroadcastJoinThreshold")


# To disable autoBroadcastJoin >> set -1
# By default it is 10485760 i.e. 10MB
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
spark.conf.get("spark.sql.autoBroadcastJoinThreshold")