In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import os

In [2]:
os.environ['SPARK_HOME'] = r'C:\spark\spark-3.5.4-bin-hadoop3'
os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'
os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'lab'
os.environ['PYSPARK_PYTHON'] = 'python'

In [3]:
spark = (
    SparkSession
    .builder
    .appName("PySpark Zero to Hero")
    .master("local[*]")
    .config("spark.executor.memory", "16g")
    .config("spark.driver.memory", "16g")
    .config("spark.executor.cores", "4")
    .config("spark.sql.shuffle.partitions", "80")
    .config("spark.dynamicAllocation.enabled", "true")
    .config("spark.dynamicAllocation.minExecutors", "2")
    .config("spark.dynamicAllocation.initialExecutors", "24")
    .config("spark.dynamicAllocation.maxExecutors", "50")
    .config("spark.shuffle.service.enabled", "true")
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .getOrCreate()
)

In [4]:
spark.conf.set("spark.sql.adaptive.enabled", False)
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", False)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

In [5]:
df_path = r'F:\DataSpell\-pyspark_training\YouTube\PySpark - Zero to Hero\datasets\employee_records.csv'

_schema = "first_name string, last_name string, job_title string, dob string, email string, phone string, salary double, department_id int"

emp = spark.read.format("csv").schema(_schema).option("header", True).load(df_path)

In [6]:
dept_path = r'F:\DataSpell\-pyspark_training\YouTube\PySpark - Zero to Hero\datasets\department_data.csv'

_dept_schema = "department_id int, department_name string, description string, city string, state string, country string"

dept = spark.read.format("csv").schema(_dept_schema).option("header", True).load(df_path)

In [7]:
spark

In [8]:
df_joined = emp.join(dept, on=emp['department_id'] == dept['department_id'], how='left_outer')

In [9]:
df_joined.write.format('noop').mode('overwrite').save()

In [10]:
df_joined.explain()

== Physical Plan ==
*(4) SortMergeJoin [department_id#7], [department_id#16], LeftOuter
:- *(1) Sort [department_id#7 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(department_id#7, 80), ENSURE_REQUIREMENTS, [plan_id=70]
:     +- FileScan csv [first_name#0,last_name#1,job_title#2,dob#3,email#4,phone#5,salary#6,department_id#7] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/F:/DataSpell/-pyspark_training/YouTube/PySpark - Zero to Hero/da..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<first_name:string,last_name:string,job_title:string,dob:string,email:string,phone:string,s...
+- *(3) Sort [department_id#16 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(department_id#16, 80), ENSURE_REQUIREMENTS, [plan_id=82]
      +- *(2) Filter isnotnull(department_id#16)
         +- FileScan csv [department_id#16,department_name#17,description#18,city#19,state#20,country#21] Batched: false, DataFilters: [isnotnull(depar

In [11]:
df_joined = emp.join(
    F.broadcast(dept),
    on=emp['department_id'] == dept['department_id'],
    how='left_outer')

In [12]:
df_joined.write.format('noop').mode('overwrite').save()

In [13]:
df_joined.explain()

== Physical Plan ==
*(2) BroadcastHashJoin [department_id#7], [department_id#16], LeftOuter, BuildRight, false
:- FileScan csv [first_name#0,last_name#1,job_title#2,dob#3,email#4,phone#5,salary#6,department_id#7] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/F:/DataSpell/-pyspark_training/YouTube/PySpark - Zero to Hero/da..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<first_name:string,last_name:string,job_title:string,dob:string,email:string,phone:string,s...
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=160]
   +- *(1) Filter isnotnull(department_id#16)
      +- FileScan csv [department_id#16,department_name#17,description#18,city#19,state#20,country#21] Batched: false, DataFilters: [isnotnull(department_id#16)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/F:/DataSpell/-pyspark_training/YouTube/PySpark - Zero to Hero/da..., PartitionFilters: [], Pushe

In [14]:
sales_schema = "transacted_at string, trx_id string, retailer_id string, description string, amount double, city_id string"

sales_path = r'F:\DataSpell\-pyspark_training\YouTube\PySpark - Zero to Hero\datasets\new_sales.csv'

sales = spark.read.format("csv").schema(sales_schema).option("header", True).load(sales_path)

In [18]:
city_schema = "city_id string, city string, state string, state_abv string, country string"

cities_path = r'F:\DataSpell\-pyspark_training\YouTube\PySpark - Zero to Hero\datasets\cities.csv'

city = spark.read.format('csv').schema(city_schema).option('header', True).load(cities_path)

In [19]:
city.show(5)

+----------+-----------------+-----+---------+--------------------+
|   city_id|             city|state|state_abv|             country|
+----------+-----------------+-----+---------+--------------------+
| 324732552|ladang ulu bernam| NULL|     NULL|            Malaysia|
|2010991182|     `ali rowshan| NULL|     NULL|                Iran|
| 741912760|           borovo| NULL|     NULL|Bosnia and Herzeg...|
| 604192006|      aillo talor| NULL|     NULL|               Chile|
| 752393249|   sheykheh koreh| NULL|     NULL|                Iran|
+----------+-----------------+-----+---------+--------------------+
only showing top 5 rows



In [20]:
df_sales_joined = sales.join(city, on=sales['city_id'] == city['city_id'], how='left_outer')

In [21]:
df_sales_joined.write.format('noop').mode('overwrite').save()

In [22]:
df_sales_joined = sales.join(F.broadcast(city), on=sales['city_id'] == city['city_id'], how='left_outer')

In [23]:
df_sales_joined.write.format('noop').mode('overwrite').save()

In [29]:
(
    sales.write.format('csv')
    .mode('overwrite')
    .bucketBy(4, 'city_id')
    .option('header', True)
    .option(
        'path',
        r'F:\DataSpell\-pyspark_training\YouTube\PySpark - Zero to Hero\datasets\my_data\sales_bucket.csv'
).saveAsTable('sales_bucket')
)

In [30]:
(
    city.write.format('csv')
    .mode('overwrite')
    .bucketBy(4, 'city_id')
    .option('header', True)
    .option(
        'path',
        r'F:\DataSpell\-pyspark_training\YouTube\PySpark - Zero to Hero\datasets\my_data\city_bucket.csv'
    ).saveAsTable('city_bucket')
)

In [32]:
spark.sql('show tables in default').show()

+---------+------------+-----------+
|namespace|   tableName|isTemporary|
+---------+------------+-----------+
|  default| city_bucket|      false|
|  default|sales_bucket|      false|
+---------+------------+-----------+



In [33]:
sales_bucket = spark.read.table('sales_bucket')

In [34]:
city_bucket = spark.read.table('city_bucket')

In [35]:
df_joined_bucket = sales_bucket.join(
    city_bucket,
    on=sales_bucket['city_id'] == city_bucket['city_id'],
    how='left_outer'
)

In [36]:
df_joined_bucket.write.format('noop').mode('overwrite').save()

In [37]:
df_joined_bucket = sales_bucket.join(
    F.broadcast(city_bucket),
    on=sales_bucket['city_id'] == city_bucket['city_id'],
    how='left_outer'
)

In [38]:
df_joined_bucket.explain()

== Physical Plan ==
*(2) BroadcastHashJoin [city_id#424], [city_id#431], LeftOuter, BuildRight, false
:- FileScan csv spark_catalog.default.sales_bucket[transacted_at#419,trx_id#420,retailer_id#421,description#422,amount#423,city_id#424] Batched: false, Bucketed: false (disabled by query planner), DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/F:/DataSpell/-pyspark_training/YouTube/PySpark - Zero to Hero/da..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<transacted_at:string,trx_id:string,retailer_id:string,description:string,amount:double,cit...
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]),false), [plan_id=405]
   +- *(1) Filter isnotnull(city_id#431)
      +- FileScan csv spark_catalog.default.city_bucket[city_id#431,city#432,state#433,state_abv#434,country#435] Batched: false, Bucketed: false (disabled by query planner), DataFilters: [isnotnull(city_id#431)], Format: CSV, Location: InMemoryFileIndex(1 paths)

In [39]:
spark.stop()