In [0]:
spark.conf.set("spark.sql.adaptive.enabled", "false")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

In [0]:
# Avoid Shuffel

In [0]:
sales_data = [
    (1, 101, 'Apple', 100),
    (2, 102, 'Banana', 200),
    (3, 101, 'Orange', 150),
    (4, 103, 'Grapes', 250),
    (5, 102, 'Pear', 300)
]

store_data = [
    (101, 'Store A', 'New York'),
    (102, 'Store B', 'Chicago'),
    (103, 'Store C', 'Los Angeles')
]

In [0]:
sales_cols = ['id', 'store_id', 'item', 'units']
store_cols = ['store_id', 'store_name', 'location']

sales_df = spark.createDataFrame(sales_data, sales_cols)
store_df = spark.createDataFrame(store_data, store_cols)

In [0]:
sales_df.display()

In [0]:
store_df.display()

In [0]:
spark.conf.get("spark.sql.autoBroadcastJoinThreshold")

In [0]:
df = (
    sales_df
    .join(
        store_df,
        sales_df.store_id == store_df.store_id,
        how="left"
    )
)
df.display()

In [0]:
df = (
    sales_df
    .join(
        store_df,
        on=["store_id"],
        how="left"
    )
)
df.display()

In [0]:
sales_df.rdd.getNumPartitions()

In [0]:
store_df.rdd.getNumPartitions()

In [0]:
sales_df = (
    sales_df
    .repartition(3, "store_id")
    .sortWithinPartitions("store_id")
)

store_df = (
    store_df
    .repartition(3, "store_id")
    .sortWithinPartitions("store_id")
)

In [0]:
sales_df.rdd.getNumPartitions()

In [0]:
store_df.rdd.getNumPartitions()

In [0]:
df = (
    sales_df
    .join(
        store_df,
        on=["store_id"],
        how="left"
    )
)
df.display()

In [0]:
# Bucketing

In [0]:
data = [
    (1, "Chiranjeevi", 100),
    (2, "Nagarjuna", 200),
    (3, "BalaKrishna", 300),
    (4, "Venkatesh", 400),
    (5, "Upendra", 500),
]

df1 = spark.createDataFrame(data, schema=["id", "name", "salary"])
display(df1)

In [0]:
data2 = [
    (1, "HR"),
    (2, "Engineering"),
    (3, "Finance"),
    (4, "Marketing"),
    (5, "sales")
]

df2 = spark.createDataFrame(data2, ['id', 'department'])
df2.display()

In [0]:
df = (
    df1
    .join(
        df2,
        on=["id"]
    )
)
df.display()

In [0]:
%fs
ls "/mnt"

In [0]:
df1.write.format("parquet").bucketBy(2, "id").sortBy("id").saveAsTable("employee")

In [0]:
%sql
select * from employee;

In [0]:
%fs
ls "dbfs:/user/hive/warehouse/employee"

In [0]:
%sql
drop table employee;

In [0]:
dbutils.fs.rm("dbfs:/user/hive/warehouse/employee", True)

In [0]:
(
    df1
    .write
    .format("parquet")
    .bucketBy(2, "id")
    .sortBy("id")
    .saveAsTable(
        'employee', 
        format='parquet', 
        mode='overwrite',
        path='dbfs:/mnt/j2dadlscontainer/bucket_tables'
    )
)

In [0]:
(
    df1
    .write
    .format("parquet")
    .bucketBy(2, "id")
    .sortBy("id")
    .saveAsTable(
        'employee', 
        format='parquet', 
        mode='overwrite',
        path='dbfs:/mnt/j2dadlscontainer/bucket_tables/employee'
    )
)

In [0]:
(
    df2
    .write
    .format("parquet")
    .bucketBy(2, "id")
    .sortBy("id")
    .saveAsTable(
        'department', 
        format='parquet', 
        mode='overwrite',
        path='dbfs:/mnt/j2dadlscontainer/bucket_tables/department'
    )
)

In [0]:
(
    df2
    .write
    .format("parquet")
    .bucketBy(2, "id")
    .sortBy("id")
    .saveAsTable(
        'dep', 
    )
)

In [0]:
%sql
-- drop external table 
drop table if exists employee;

In [0]:
%sql
select * from employee;

In [0]:
%sql
drop table dep;

Normal Join without bucket

In [0]:
unbucketed_join = (
    df1
    .join(
        df2,
        on=["id"],
        how="inner"
    )
)
unbucketed_join.display()

In [0]:
df1.write.format("parquet").bucketBy(2, "id").sortBy("id").saveAsTable("bucketed_emp")
df2.write.format("parquet").bucketBy(2, "id").sortBy("id").saveAsTable("bucketed_dep")

In [0]:
%fs
ls "dbfs:/user/hive/warehouse/bucketed_emp"

In [0]:
bucketed_join = (
    bucketed_emp
    .join(
        bucketed_dep,
        on=["id"],
        how="inner"
    )
)

bucketed_join.display()