In [2]:
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("6_partitionDetails_and_Joins")
    .master("local[*]")
    .getOrCreate()
)

spark

In [4]:
# Emp Data & Schema

emp_data = [
    ["001","101","John Doe","30","Male","50000","2015-01-01"],
    ["002","101","Jane Smith","25","Female","45000","2016-02-15"],
    ["003","102","Bob Brown","35","Male","55000","2014-05-01"],
    ["004","102","Alice Lee","28","Female","48000","2017-09-30"],
    ["005","103","Jack Chan","40","Male","60000","2013-04-01"],
    ["006","103","Jill Wong","32","Female","52000","2018-07-01"],
    ["007","101","James Johnson","42","Male","70000","2012-03-15"],
    ["008","102","Kate Kim","29","Female","51000","2019-10-01"],
    ["009","103","Tom Tan","33","Male","58000","2016-06-01"],
    ["010","104","Lisa Lee","27","Female","47000","2018-08-01"],
    ["011","104","David Park","38","Male","65000","2015-11-01"],
    ["012","105","Susan Chen","31","Female","54000","2017-02-15"],
    ["013","106","Brian Kim","45","Male","75000","2011-07-01"],
    ["014","107","Emily Lee","26","Female","46000","2019-01-01"],
    ["015","106","Michael Lee","37","Male","63000","2014-09-30"],
    ["016","107","Kelly Zhang","30","Female","49000","2018-04-01"],
    ["017","105","George Wang","34","Male","57000","2016-03-15"],
    ["018","104","Nancy Liu","29","","50000","2017-06-01"],
    ["019","103","Steven Chen","36","Male","62000","2015-08-01"],
    ["020","102","Grace Kim","32","Female","53000","2018-11-01"]
]

emp_schema = "employee_id string, department_id string, name string, age string, gender string, salary string, hire_date string"

dept_data = [
    ["101", "Sales", "NYC", "US", "1000000"],
    ["102", "Marketing", "LA", "US", "900000"],
    ["103", "Finance", "London", "UK", "1200000"],
    ["104", "Engineering", "Beijing", "China", "1500000"],
    ["105", "Human Resources", "Tokyo", "Japan", "800000"],
    ["106", "Research and Development", "Perth", "Australia", "1100000"],
    ["107", "Customer Service", "Sydney", "Australia", "950000"]
]

dept_schema = "department_id string, department_name string, city string, country string, budget string"

emp = spark.createDataFrame(data = emp_data, schema = emp_schema)
dept = spark.createDataFrame(data = dept_data, schema = dept_schema)


emp.show(5)
dept.show(5)

+-----------+-------------+----------+---+------+------+----------+
|employee_id|department_id|      name|age|gender|salary| hire_date|
+-----------+-------------+----------+---+------+------+----------+
|        001|          101|  John Doe| 30|  Male| 50000|2015-01-01|
|        002|          101|Jane Smith| 25|Female| 45000|2016-02-15|
|        003|          102| Bob Brown| 35|  Male| 55000|2014-05-01|
|        004|          102| Alice Lee| 28|Female| 48000|2017-09-30|
|        005|          103| Jack Chan| 40|  Male| 60000|2013-04-01|
+-----------+-------------+----------+---+------+------+----------+
only showing top 5 rows

+-------------+---------------+-------+-------+-------+
|department_id|department_name|   city|country| budget|
+-------------+---------------+-------+-------+-------+
|          101|          Sales|    NYC|     US|1000000|
|          102|      Marketing|     LA|     US| 900000|
|          103|        Finance| London|     UK|1200000|
|          104|    Engineer

In [6]:
emp.rdd.getNumPartitions()


12

In [7]:
dept.rdd.getNumPartitions()

12

In [8]:
# Repartition of data using repartition & coalesce
#emp.repartition(100) - This is allowed.
emp = emp.repartition(4, "department_id")
emp.show(5)

+-----------+-------------+-----------+---+------+------+----------+
|employee_id|department_id|       name|age|gender|salary| hire_date|
+-----------+-------------+-----------+---+------+------+----------+
|        003|          102|  Bob Brown| 35|  Male| 55000|2014-05-01|
|        004|          102|  Alice Lee| 28|Female| 48000|2017-09-30|
|        008|          102|   Kate Kim| 29|Female| 51000|2019-10-01|
|        014|          107|  Emily Lee| 26|Female| 46000|2019-01-01|
|        016|          107|Kelly Zhang| 30|Female| 49000|2018-04-01|
+-----------+-------------+-----------+---+------+------+----------+
only showing top 5 rows



In [11]:
from pyspark.sql.functions import spark_partition_id

emp = emp.withColumn("Partition_ID", spark_partition_id())
emp.show()

+-----------+-------------+-------------+---+------+------+----------+------------+
|employee_id|department_id|         name|age|gender|salary| hire_date|Partition_ID|
+-----------+-------------+-------------+---+------+------+----------+------------+
|        003|          102|    Bob Brown| 35|  Male| 55000|2014-05-01|           0|
|        004|          102|    Alice Lee| 28|Female| 48000|2017-09-30|           0|
|        008|          102|     Kate Kim| 29|Female| 51000|2019-10-01|           0|
|        014|          107|    Emily Lee| 26|Female| 46000|2019-01-01|           0|
|        016|          107|  Kelly Zhang| 30|Female| 49000|2018-04-01|           0|
|        020|          102|    Grace Kim| 32|Female| 53000|2018-11-01|           0|
|        012|          105|   Susan Chen| 31|Female| 54000|2017-02-15|           1|
|        017|          105|  George Wang| 34|  Male| 57000|2016-03-15|           1|
|        010|          104|     Lisa Lee| 27|Female| 47000|2018-08-01|      

In [14]:
dept = dept.coalesce(4, "department_id")

TypeError: DataFrame.coalesce() takes 2 positional arguments but 3 were given

In [17]:
# coalesce does not take column to re partition the data on!
dept = dept.coalesce(4)

In [18]:
dept = dept.withColumn("Partition_id", spark_partition_id())
dept.show()

+-------------+--------------------+-------+---------+-------+------------+
|department_id|     department_name|   city|  country| budget|Partition_id|
+-------------+--------------------+-------+---------+-------+------------+
|          101|               Sales|    NYC|       US|1000000|           0|
|          102|           Marketing|     LA|       US| 900000|           1|
|          103|             Finance| London|       UK|1200000|           1|
|          104|         Engineering|Beijing|    China|1500000|           2|
|          105|     Human Resources|  Tokyo|    Japan| 800000|           2|
|          106|Research and Deve...|  Perth|Australia|1100000|           3|
|          107|    Customer Service| Sydney|Australia| 950000|           3|
+-------------+--------------------+-------+---------+-------+------------+



In [21]:
#inner join
emp.join(dept, how = "inner", on = emp.department_id == dept.department_id).show(5)

+-----------+-------------+-------------+---+------+------+----------+------------+-------------+---------------+----+-------+-------+------------+
|employee_id|department_id|         name|age|gender|salary| hire_date|Partition_ID|department_id|department_name|city|country| budget|Partition_id|
+-----------+-------------+-------------+---+------+------+----------+------------+-------------+---------------+----+-------+-------+------------+
|        007|          101|James Johnson| 42|  Male| 70000|2012-03-15|           3|          101|          Sales| NYC|     US|1000000|           0|
|        002|          101|   Jane Smith| 25|Female| 45000|2016-02-15|           3|          101|          Sales| NYC|     US|1000000|           0|
|        001|          101|     John Doe| 30|  Male| 50000|2015-01-01|           3|          101|          Sales| NYC|     US|1000000|           0|
|        020|          102|    Grace Kim| 32|Female| 53000|2018-11-01|           0|          102|      Marketing

In [22]:
emp.join(dept, how = 'inner', on = emp.department_id == dept.department_id).select(emp.name, dept.department_name).show(5)

+-------------+----------------+
|         name| department_name|
+-------------+----------------+
|     John Doe|           Sales|
|   Jane Smith|           Sales|
|James Johnson|           Sales|
|    Emily Lee|Customer Service|
|  Kelly Zhang|Customer Service|
+-------------+----------------+
only showing top 5 rows



In [29]:
#alias

emp.alias("e").join(dept.alias("d"), how = 'inner', on = e.department_id == d.department_id).select(e.name, d.department_id).show(5)

NameError: name 'e' is not defined

In [30]:
#alias

emp.alias("e").join(dept.alias("d"), how = 'inner', on = emp.department_id == d.department_id).select(e.name, d.department_id).show(5)

NameError: name 'd' is not defined

In [31]:
#alias

emp.alias("e").join(dept.alias("d"), how = 'inner', on = emp.department_id == dept.department_id).select(e.name, d.department_id).show(5)

NameError: name 'e' is not defined

In [35]:
#what is the use of alias if we cannot use in joins as above?
alias_df = emp.alias("e").join(dept.alias("d"), how = 'inner', on = emp.department_id == dept.department_id).limit(5)
alias_df.select("e.name", "d.department_id").show()

+-------------+-------------+
|         name|department_id|
+-------------+-------------+
|     John Doe|          101|
|   Jane Smith|          101|
|James Johnson|          101|
|    Emily Lee|          107|
|  Kelly Zhang|          107|
+-------------+-------------+



In [26]:
emp.join(dept, how = 'left_outer', on = (emp.department_id == dept.department_id) & (emp.department_id == "101")).show()

+-----------+-------------+-------------+---+------+------+----------+------------+-------------+---------------+----+-------+-------+------------+
|employee_id|department_id|         name|age|gender|salary| hire_date|Partition_ID|department_id|department_name|city|country| budget|Partition_id|
+-----------+-------------+-------------+---+------+------+----------+------------+-------------+---------------+----+-------+-------+------------+
|        003|          102|    Bob Brown| 35|  Male| 55000|2014-05-01|           0|         null|           null|null|   null|   null|        null|
|        004|          102|    Alice Lee| 28|Female| 48000|2017-09-30|           0|         null|           null|null|   null|   null|        null|
|        008|          102|     Kate Kim| 29|Female| 51000|2019-10-01|           0|         null|           null|null|   null|   null|        null|
|        014|          107|    Emily Lee| 26|Female| 46000|2019-01-01|           0|         null|           null

In [28]:
#isNull() and isNotNull()
emp.join(dept, how = 'left_outer', on = (emp.department_id == dept.department_id) & (emp.department_id == "101")& (dept.department_id.isNull())).show(5)


+-----------+-------------+-----------+---+------+------+----------+------------+-------------+---------------+----+-------+------+------------+
|employee_id|department_id|       name|age|gender|salary| hire_date|Partition_ID|department_id|department_name|city|country|budget|Partition_id|
+-----------+-------------+-----------+---+------+------+----------+------------+-------------+---------------+----+-------+------+------------+
|        003|          102|  Bob Brown| 35|  Male| 55000|2014-05-01|           0|         null|           null|null|   null|  null|        null|
|        004|          102|  Alice Lee| 28|Female| 48000|2017-09-30|           0|         null|           null|null|   null|  null|        null|
|        008|          102|   Kate Kim| 29|Female| 51000|2019-10-01|           0|         null|           null|null|   null|  null|        null|
|        014|          107|  Emily Lee| 26|Female| 46000|2019-01-01|           0|         null|           null|null|   null|  null

In [36]:
spark.stop()