# Normal Join vs Bucket Join

In [1]:
!pip install pyspark -q

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


## Normal Join

In [2]:
from pyspark.sql import SparkSession
import time

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Normal Join Example") \
    .getOrCreate()

# Sample data
data1 = [(1, "John"), (2, "Jane"), (3, "Jake"), (4, "Jill"), (5, "Joe")]
data2 = [(1, 1000), (2, 1500), (3, 2000), (4, 2500), (5, 3000)]

# Define schema
schema1 = ["id", "name"]
schema2 = ["id", "salary"]

# Create DataFrames
df1 = spark.createDataFrame(data1, schema1)
df2 = spark.createDataFrame(data2, schema2)

# Perform normal join with timing
start_time = time.time()
normal_joined_df = df1.join(df2, "id")
end_time = time.time()

# Show the result
normal_joined_df.show()

# Calculate and print execution time
execution_time = end_time - start_time
print("Execution Time (Normal Join): {:.2f} seconds".format(execution_time))

# Stop Spark session
spark.stop()


+---+----+------+
| id|name|salary|
+---+----+------+
|  1|John|  1000|
|  2|Jane|  1500|
|  3|Jake|  2000|
|  4|Jill|  2500|
|  5| Joe|  3000|
+---+----+------+

Execution Time (Normal Join): 0.14 seconds


## Bucket Join

In [3]:
from pyspark.sql import SparkSession
import time

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Bucket Join Example") \
    .getOrCreate()

# Sample data
data1 = [(1, "John"), (2, "Jane"), (3, "Jake"), (4, "Jill"), (5, "Joe")]
data2 = [(1, 1000), (2, 1500), (3, 2000), (4, 2500), (5, 3000)]

# Define schema
schema1 = ["id", "name"]
schema2 = ["id", "salary"]

# Create DataFrames
df1 = spark.createDataFrame(data1, schema1)
df2 = spark.createDataFrame(data2, schema2)

# Save DataFrames as bucketed tables
df1.write.bucketBy(4, "id").saveAsTable("bucketed_table1")
df2.write.bucketBy(4, "id").saveAsTable("bucketed_table2")

# Read the bucketed tables
bucketed_df1 = spark.table("bucketed_table1")
bucketed_df2 = spark.table("bucketed_table2")

# Perform bucket join with timing
start_time = time.time()
bucket_joined_df = bucketed_df1.join(bucketed_df2, "id")
end_time = time.time()

# Show the result
bucket_joined_df.show()

# Calculate and print execution time
execution_time = end_time - start_time
print("Execution Time (Bucket Join): {:.2f} seconds".format(execution_time))

# Stop Spark session
spark.stop()


+---+----+------+
| id|name|salary|
+---+----+------+
|  4|Jill|  2500|
|  5| Joe|  3000|
|  2|Jane|  1500|
|  3|Jake|  2000|
|  1|John|  1000|
+---+----+------+

Execution Time (Bucket Join): 0.03 seconds
