In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, BooleanType, DateType
from pyspark.sql.functions import col, broadcast, shuffle
from pyspark.conf import SparkConf

conf = spark.sparkContext._conf.setAll([('spark.sql.autoBroadcastJoinThreshold', 10485760),('spark.sql.join.preferSortMergeJoin', 'false')])
spark = SparkSession.builder.appName("Spark SQL Join Practice").config(conf=conf).getOrCreate()
# spark.sparkContext._conf.getAll()


In [None]:
emp_df_schema=StructType([
    StructField('id', IntegerType(), False , None),
    StructField('name', StringType(), False , None),
    StructField('join_year', IntegerType(), False , None),
    StructField('club_id', IntegerType(), True , None),
    StructField('salary', IntegerType(), True , None),
    StructField('manager_id', IntegerType(), False, None)    
])



emp_df_data=[
    (1, 'Mourinho', 2010, 1001,  20000, -1),
    (2, 'Guardiola', 2004, 1002,  20000, -1),
    (3, 'Klopp', 2015, 1003,  30000, -1),
    (4, 'Salah', 2017, 1003,  40000, 3),
    (5, 'Messi', 2000, 1002,  50000, 1),
    (6, 'Ronaldo', 2009, 1001, 50000, 2),
    (7, 'Zlatan', 1996, 1012,  50000, -1),
    (8, 'Pele', 2009, 1021, 50000, -1)
]

emp_df = spark.createDataFrame(emp_df_data, emp_df_schema)
# emp_df.printSchema()
# emp_df.show(truncate=False)
emp_df.createOrReplaceTempView('EMPLOYEE_VW')

club_df_schema= StructType([
    StructField('club_id', IntegerType(), False, None),
    StructField('club_name', StringType(), False, None),
    StructField('club_country', StringType(), False, None),
    StructField('club_continent', StringType(), False, None)
])

club_df_data = [
    (1001, 'RealMadrid', 'Spain', 'Europe'),
    (1002, 'Barcelona', 'Spain', 'Europe'),
    (1003, 'Liverpool', 'UK', 'Europe'), 
    (1004, 'Dortmund', 'Germany', 'Europe'),
    (1005, 'Manchester City', 'UK', 'Europe'),
    (1006, 'Newcastle United', 'UK', 'Europe'),
    (1007, 'Bocca Juniors', 'Brazil', 'South America')

]

club_df = spark.createDataFrame(club_df_data, club_df_schema)
# club_df.printSchema()
# club_df.show(truncate=False)
club_df.createOrReplaceTempView('CLUB_VW')

print(' Players/Managers matching with Clubs ')
spark.sql("Select E.name, C.club_name FROM EMPLOYEE_VW E JOIN CLUB_VW C ON E.club_id = C.club_id;").show()
emp_df.join(club_df, emp_df['club_id'] == club_df['club_id'], 'inner').select('name', 'club_name').show()

print(' All available Players/Managers and All available Clubs ')
spark.sql("Select E.name, C.club_name FROM EMPLOYEE_VW E FULL OUTER JOIN CLUB_VW C ON E.club_id = C.club_id;").show()
emp_df.join(club_df, emp_df['club_id'] == club_df['club_id'], 'outer').select('name', 'club_name').show()

print(' All Players/Managers irrespective of Club ')
spark.sql("Select distinct E.name FROM EMPLOYEE_VW E LEFT OUTER JOIN CLUB_VW C ON E.club_id = C.club_id;").show()
emp_df.join(club_df, emp_df['club_id'] == club_df['club_id'], 'left').select('name').distinct().show()

print(' All Clubs irrespective of Players/Managers ')
spark.sql("Select distinct C.club_name FROM EMPLOYEE_VW E RIGHT OUTER JOIN CLUB_VW C ON E.club_id = C.club_id;").show()
emp_df.join(club_df, emp_df['club_id'] == club_df['club_id'], 'right').select('club_name').distinct().show()

print(' Players/Managers matching with Clubs, but dont care about Club Details ')
spark.sql("Select * FROM EMPLOYEE_VW E LEFT SEMI JOIN CLUB_VW C ON E.club_id = C.club_id;").show()
emp_df.join(club_df, emp_df['club_id'] == club_df['club_id'], 'leftsemi').show()

print(' Players/Managers NOT matching with any Clubs, but dont care about Club Details ')
spark.sql("Select * FROM EMPLOYEE_VW E LEFT ANTI JOIN CLUB_VW C ON E.club_id = C.club_id;").show()
emp_df.join(club_df, emp_df['club_id'] == club_df['club_id'], 'leftanti').show()


print(' How many different ways a Players/Manager can match to Club.')
spark.sql("Select distinct E.name, C.club_name FROM EMPLOYEE_VW E CROSS JOIN CLUB_VW C;").show(100)
emp_df.join(club_df).select('name', 'club_name').distinct().orderBy(col('name')).show(100)

print(' Players reporting to Manager ')
spark.sql("Select distinct E.name as Player, M.name as Manager FROM EMPLOYEE_VW E JOIN EMPLOYEE_VW M ON E.manager_id = M.id;").show()
emp_df.alias('p').join(emp_df.alias('m'), col('p.manager_id') == col('m.id'), 'inner').distinct().select(col('p.name').alias('Player'), col('m.name').alias('Manager')).show()



[Spark Joins](https://blog.clairvoyantsoft.com/apache-spark-join-strategies-e4ebc7624b06)

[Shuffle Hash Join](https://www.hadoopinrealworld.com/how-does-shuffle-hash-join-work-in-spark/)
[Shuffle Sort Merge Join](https://www.hadoopinrealworld.com/how-does-shuffle-sort-merge-join-work-in-spark/)


1. Broadcast Hash Join (BKJ)  
    If one of the TWO joined tables is small enough to fit in memory, collect() and broadcast it to all EXECUTORS.
    To Disable : spark.sql.autoBroadcastJoinThreshold=-1
    Max allowable Size: spark.sql.autoBroadcastJoinThreshold=******** (default 10MB)
    Note:
        Based on which side of join HINT is provided, that will be broadcast (ignores autoBroadcastJoinThreshold)
        Consider only when you know one of the table is small and less than autoBroadcastJoinThreshold size
    
    case#1  
           LEFT (LARGE)       VS         RIGHT (SMALL)  
        Parition#1                    Parition#1      
        {barcelona, spain}            {2, Gaurdiola}
        {barcelona, spain}            {3, Klopp}  
        {liverpool, uk}               
        {liverpool, uk}
        Parition#2
        {Dortmund, Germany}            
        {Bocca Juniors, Brazil}       
        {Manchester city, uk}          
        {NewCastle, uk}       
        
        
        
      After Broadcasting:
          LEFT             VS         RIGHT   
        Parition#1                    Parition#1      
        {barcelona, spain}            {2, Gaurdiola}
        {barcelona, spain}    ->      {3, Klopp}  
        {liverpool, uk}               
        {liverpool, uk}
        Parition#2                    Parition#1  
        {Dortmund, Germany}            {2, Gaurdiola}
        {Bocca Juniors, Brazil}  ->    {3, Klopp}
        {Manchester city, uk}          
        {NewCastle, uk} 
              
       
      After Hash Joining (Smallest table will be hashed and Large table will use it for lookup): 
       Let's say for INNER Join - Bucket#1 stores {Guardiola} & Bucket#2 stores {Klopp}  
        for(CLUB in LEFT SIDE) {
           // Get club Hash and search in Bucket#1 or Bucket#2
        }
        
        
    
        
     case#2
          LEFT (SMALL)     VS         RIGHT (LARGE)   
        Parition#1                   Parition#1
        {realmadrid, spain}           {2, Gaurdiola}
        {NewCastle, uk}               {3, Klopp}  
        {liverpool, uk}               {5, Messi}
                                      {4, Salah}
                                      Parition#2
                                      {1, Mourinho}
                                      {6, Ronaldo}
                                      {7, Zlatan}
                                      {8, Pele}
    
        
          
      After Broadcasting:
          LEFT             VS         RIGHT   
        Parition#1                    Parition#1      
        {realmadrid, spain}            {2, Gaurdiola}
        {NewCastle, uk}    <--         {3, Klopp}  
        {liverpool, uk}                 {5, Messi}
                                        {4, Salah}
        Parition#1                    Parition#2  
        {realmadrid, spain}            {1, Mourinho}
        {NewCastle, uk}     <--        {6, Ronaldo}
        {liverpool, uk}                {7, Zlatan}
                                       {8, Pele}
              
       
      After Hash Joining (Smallest table will be hashed and Large table will use it for lookup): 
       Let's say for INNER Join - Bucket#1 stores {spain} & Bucket#2 stores {uk}  
        for(Player in RIGHT SIDE) {
           // Get Player Hash and search in Bucket#1 or Bucket#2
        }
        
        
2. Shuffle Hash Join (SHJ)
    Large Tables Join; COSTLY; spark.sql.join.preferSortMergeJoin=FALSE
    Only For EQUI-Joins
    Shuffle(parition based on Join Column and move same CLUB_ID data into same parition)     
    Parition#1
            LEFT             VS          RIGHT   
        {barcelona, spain}            {2, Gaurdiola}
        {barcelona, spain}            {5, Messi}  
        {liverpool, uk}               {4, Salah}
        {liverpool, uk}               {3, Klopp}
        {Dortmund, Germany}            
        {Bocca Juniors, Brazil}       
        {Manchester city, uk}          
        {NewCastle, uk}
        
    Partition#2
            LEFT             VS          RIGHT   
        {realmadrid, spain}            {1, Mourinho}
        {readmadrid, spain}            {6, Ronaldo}  
                                       {7, Zlatan}
                                       {8, Pele}   

    Hash(Generate a single Node Hash value for each Partition. )
    Parition#1
        RIGHT Side is small, So create a Buckets(Hash) for lookup Map<ID, CLUB>
        Let's say for INNER Join - Bucket#1 stores {Messi,Guardiola} & Bucket#2 stores {Salah, Klopp}  
        for(Player in LEFT SIDE) {
           // Get Player Hash and search in Bucket#1 or Bucket#2
        }

    Partition#3
        LEFT Side is small, So create a Buckets(Hash) for lookup Map<ID, PLAYER>
        E.g: Only One Bucket#1 that have {realmadrid}
        for(Club in RIGHT SIDE) {
           // Get Club's hash and Search in Players Buckets#1
        }


3. Shuffle Sort Merge Join (SMJ) - Both Large Tables
    spark.sql.join.preferSortMergeJoin=true + spark.sql.autoBroadcastJoinThreshold=-1
    Shuffle - same as above
    Sort - Sort each individual partition data.
    Merge - Continue Compare and Merge LEFT and RIGHT sides till LEFT side data not matched to RIGHT..


4. Shuffle Replicate Nested Loop
    
    

In [None]:
print('Spark Optimized Join based on Join Type and HINTS')
# spark.conf.get("spark.sql.join.preferSortMergeJoin")
# spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
# spark.sql("Select /*+ BROADCAST(C) */ E.name, C.club_name FROM EMPLOYEE_VW E JOIN CLUB_VW C ON E.club_id = C.club_id;").explain(True)
# spark.sql("Select /*+ SHUFFLE_HASH(C) */ E.name, C.club_name FROM EMPLOYEE_VW E JOIN CLUB_VW C ON E.club_id = C.club_id;").explain(True)
# spark.sql("Select /*+ SHUFFLE_MERGE(C) */ E.name, C.club_name FROM EMPLOYEE_VW E JOIN CLUB_VW C ON E.club_id = C.club_id;").explain(True)
# spark.sql("Select /*+ SHUFFLE_REPLICATE_NL(C) */ E.name, C.club_name FROM EMPLOYEE_VW E JOIN CLUB_VW C ON E.club_id = C.club_id;").explain(True)
# spark.sql("Select /*+ BROADCAST(C),SHUFFLE_MERGE(C),SHUFFLE_HASH(C), SHUFFLE_REPLICATE_NL(C)  */ E.name, C.club_name FROM EMPLOYEE_VW E JOIN CLUB_VW C ON E.club_id = C.club_id;").explain(True)

emp_df.join(broadcast(club_df), emp_df['club_id'] == club_df['club_id'], 'inner').select('name', 'club_name').explain(True)
emp_df.join(broadcast(club_df), emp_df['club_id'] == club_df['club_id'], 'inner').select('name', 'club_name').explain(True)
emp_df.join(broadcast(club_df), emp_df['club_id'] == club_df['club_id'], 'inner').select('name', 'club_name').explain(True)
emp_df.join(broadcast(club_df), emp_df['club_id'] == club_df['club_id'], 'inner').select('name', 'club_name').explain(True)
