# Left join in Spark



In [1]:
from pyspark.context import SparkContext, SparkConf
from pyspark.sql import SparkSession, Window
import pyspark.sql.functions as F

conf=SparkConf()


# conf.set('spark.sql.join.preferSortMergeJoin', 'true')
conf.set('spark.sql.autoBroadcastJoinThreshold', 0)


sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession(sc)


In [2]:
import numpy as np
import pandas as pd

cats = ["a", "b", "c"]
defs = [f"The letter {e}" for e in cats]


num_rows = 1000
category = np.random.choice(cats,num_rows, p=[0.1, 0.3, 0.6])
values = np.random.randint(1, 100, num_rows)
pd_df_fact = pd.DataFrame({"cat": category, "val": values})

pd_df_fact.to_parquet("df_fact.parquet")

pd_df_dim = pd.DataFrame({"cat": cats, "def": defs})
pd_df_dim.to_parquet("df_dim.parquet")


  result = infer_dtype(pandas_collection)


In [7]:
pd_df_dim

Unnamed: 0,cat,def
0,a,The letter a
1,b,The letter b
2,c,The letter c


In [3]:
df_fact = spark.read.parquet("df_fact.parquet")

In [4]:
df_dim = spark.read.parquet("df_dim.parquet")

In [5]:
df_fact.registerTempTable("df_fact")
df_dim.registerTempTable("df_dim")

sql = """
select df_fact.cat, df_fact.val,  df_dim.def from
df_fact
left join df_dim
on df_fact.cat = df_dim.cat
"""

spark.sql(sql).write.csv("delete/", mode="overwrite")

In [8]:
spark.sql(sql).explain(True)

== Parsed Logical Plan ==
'Project ['df_fact.cat, 'df_fact.val, 'df_dim.def]
+- 'Join LeftOuter, ('df_fact.cat = 'df_dim.cat)
   :- 'UnresolvedRelation `df_fact`
   +- 'UnresolvedRelation `df_dim`

== Analyzed Logical Plan ==
cat: string, val: bigint, def: string
Project [cat#0, val#1L, def#7]
+- Join LeftOuter, (cat#0 = cat#6)
   :- SubqueryAlias `df_fact`
   :  +- Relation[cat#0,val#1L,__index_level_0__#2L] parquet
   +- SubqueryAlias `df_dim`
      +- Relation[cat#6,def#7,__index_level_0__#8L] parquet

== Optimized Logical Plan ==
Project [cat#0, val#1L, def#7]
+- Join LeftOuter, (cat#0 = cat#6)
   :- Project [cat#0, val#1L]
   :  +- Relation[cat#0,val#1L,__index_level_0__#2L] parquet
   +- Project [cat#6, def#7]
      +- Filter isnotnull(cat#6)
         +- Relation[cat#6,def#7,__index_level_0__#8L] parquet

== Physical Plan ==
*(5) Project [cat#0, val#1L, def#7]
+- SortMergeJoin [cat#0], [cat#6], LeftOuter
   :- *(2) Sort [cat#0 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpart