In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
# Emp Data & Schema

emp_data_1 = [
    ["001","101","John Doe","30","Male","50000","2015-01-01"],
    ["002","101","Jane Smith","25","Female","45000","2016-02-15"],
    ["003","102","Bob Brown","35","Male","55000","2014-05-01"],
    ["004","102","Alice Lee","28","Female","48000","2017-09-30"],
    ["005","103","Jack Chan","40","Male","60000","2013-04-01"],
    ["006","103","Jill Wong","32","Female","52000","2018-07-01"],
    ["007","101","James Johnson","42","Male","70000","2012-03-15"],
    ["008","102","Kate Kim","29","Female",None,"2019-10-01"],
    ["009","103","Tom Tan","33","Male","58000","2016-06-01"],
    ["010","104","Lisa Lee","27","Female","47000","2018-08-01"],
    ["020","102","Grace Kim","32","Female","53000","2018-11-01"]
    
]

emp_data_2 = [
    ["011","104","David Park","38","Male","65000","2015-11-01"],
    ["012","105","Susan Chen","31","Female","54000","2017-02-15"],
    ["013","106","Brian Kim","45","Male",None,"2011-07-01"],
    ["014","107","Emily Lee","26","Female","46000","2019-01-01"],
    ["015","106","Michael Lee","37","Male","63000","2014-09-30"],
    ["016","107","Kelly Zhang","30","Female","49000","2018-04-01"],
    ["017","105","George Wang","34","Male","57000","2016-03-15"],
    ["018","104","Nancy Liu","29","","50000","2017-06-01"],
    ["019","103","Steven Chen","36","Male","62000","2015-08-01"],
    ["020","102","Grace Kim","32","Female","53000","2018-11-01"]
]

emp_schema = "employee_id string, department_id string, name string, age string, gender string, salary string, hire_date string"

In [0]:
# Create emp DataFrame

emp_data_1 = spark.createDataFrame(data=emp_data_1, schema=emp_schema)
emp_data_2 = spark.createDataFrame(data=emp_data_2, schema=emp_schema)

**_Union in Pyspark_** <br>_In PySpark, there is no difference between union and `unionAll` because PySpark only has `.union()`, and it behaves like SQL's UNION ALL—it keeps all rows, including duplicates._

In [0]:
emp_data_1.show()

+-----------+-------------+-------------+---+------+------+----------+
|employee_id|department_id|         name|age|gender|salary| hire_date|
+-----------+-------------+-------------+---+------+------+----------+
|        001|          101|     John Doe| 30|  Male| 50000|2015-01-01|
|        002|          101|   Jane Smith| 25|Female| 45000|2016-02-15|
|        003|          102|    Bob Brown| 35|  Male| 55000|2014-05-01|
|        004|          102|    Alice Lee| 28|Female| 48000|2017-09-30|
|        005|          103|    Jack Chan| 40|  Male| 60000|2013-04-01|
|        006|          103|    Jill Wong| 32|Female| 52000|2018-07-01|
|        007|          101|James Johnson| 42|  Male| 70000|2012-03-15|
|        008|          102|     Kate Kim| 29|Female| 51000|2019-10-01|
|        009|          103|      Tom Tan| 33|  Male| 58000|2016-06-01|
|        010|          104|     Lisa Lee| 27|Female| 47000|2018-08-01|
|        020|          102|    Grace Kim| 32|Female| 53000|2018-11-01|
+-----

In [0]:
emp_data_2.show()

+-----------+-------------+-----------+---+------+------+----------+
|employee_id|department_id|       name|age|gender|salary| hire_date|
+-----------+-------------+-----------+---+------+------+----------+
|        011|          104| David Park| 38|  Male| 65000|2015-11-01|
|        012|          105| Susan Chen| 31|Female| 54000|2017-02-15|
|        013|          106|  Brian Kim| 45|  Male| 75000|2011-07-01|
|        014|          107|  Emily Lee| 26|Female| 46000|2019-01-01|
|        015|          106|Michael Lee| 37|  Male| 63000|2014-09-30|
|        016|          107|Kelly Zhang| 30|Female| 49000|2018-04-01|
|        017|          105|George Wang| 34|  Male| 57000|2016-03-15|
|        018|          104|  Nancy Liu| 29|      | 50000|2017-06-01|
|        019|          103|Steven Chen| 36|  Male| 62000|2015-08-01|
|        020|          102|  Grace Kim| 32|Female| 53000|2018-11-01|
+-----------+-------------+-----------+---+------+------+----------+



In [0]:
unioned_df = emp_data_1.unionAll(emp_data_2)

In [0]:
unioned_df.count()

Out[16]: 21

In [0]:
unioned_df.show(30,False)

+-----------+-------------+-------------+---+------+------+----------+
|employee_id|department_id|name         |age|gender|salary|hire_date |
+-----------+-------------+-------------+---+------+------+----------+
|001        |101          |John Doe     |30 |Male  |50000 |2015-01-01|
|002        |101          |Jane Smith   |25 |Female|45000 |2016-02-15|
|003        |102          |Bob Brown    |35 |Male  |55000 |2014-05-01|
|004        |102          |Alice Lee    |28 |Female|48000 |2017-09-30|
|005        |103          |Jack Chan    |40 |Male  |60000 |2013-04-01|
|006        |103          |Jill Wong    |32 |Female|52000 |2018-07-01|
|007        |101          |James Johnson|42 |Male  |70000 |2012-03-15|
|008        |102          |Kate Kim     |29 |Female|51000 |2019-10-01|
|009        |103          |Tom Tan      |33 |Male  |58000 |2016-06-01|
|010        |104          |Lisa Lee     |27 |Female|47000 |2018-08-01|
|020        |102          |Grace Kim    |32 |Female|53000 |2018-11-01|
|011  

In [0]:
emp_data_2.columns

Out[19]: ['employee_id',
 'department_id',
 'name',
 'age',
 'gender',
 'salary',
 'hire_date']

In [0]:
emp2 = emp_data_2.withColumn("hire_date", col("hire_date").cast(DateType()))

In [0]:
df_final = emp2.union(emp_data_1)

In [0]:
df_final.show()

+-----------+-------------+-------------+---+------+------+----------+
|employee_id|department_id|         name|age|gender|salary| hire_date|
+-----------+-------------+-------------+---+------+------+----------+
|        001|          101|     John Doe| 30|  Male| 50000|2015-01-01|
|        002|          101|   Jane Smith| 25|Female| 45000|2016-02-15|
|        003|          102|    Bob Brown| 35|  Male| 55000|2014-05-01|
|        004|          102|    Alice Lee| 28|Female| 48000|2017-09-30|
|        005|          103|    Jack Chan| 40|  Male| 60000|2013-04-01|
|        006|          103|    Jill Wong| 32|Female| 52000|2018-07-01|
|        007|          101|James Johnson| 42|  Male| 70000|2012-03-15|
|        008|          102|     Kate Kim| 29|Female| 51000|2019-10-01|
|        009|          103|      Tom Tan| 33|  Male| 58000|2016-06-01|
|        010|          104|     Lisa Lee| 27|Female| 47000|2018-08-01|
|        020|          102|    Grace Kim| 32|Female| 53000|2018-11-01|
|     

In [0]:
emp_data_1.printSchema()

root
 |-- employee_id: string (nullable = true)
 |-- department_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- hire_date: string (nullable = true)



In [0]:
emp2.printSchema()

root
 |-- employee_id: string (nullable = true)
 |-- department_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- hire_date: date (nullable = true)



In [0]:
df_final.printSchema()

root
 |-- employee_id: string (nullable = true)
 |-- department_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- hire_date: string (nullable = true)



**_Sorting in PySpark_**

In [0]:
df_final.columns

Out[30]: ['employee_id',
 'department_id',
 'name',
 'age',
 'gender',
 'salary',
 'hire_date']

In [0]:
df_final.sort(desc("salary")).show()

+-----------+-------------+-------------+---+------+------+----------+
|employee_id|department_id|         name|age|gender|salary| hire_date|
+-----------+-------------+-------------+---+------+------+----------+
|        013|          106|    Brian Kim| 45|  Male| 75000|2011-07-01|
|        007|          101|James Johnson| 42|  Male| 70000|2012-03-15|
|        011|          104|   David Park| 38|  Male| 65000|2015-11-01|
|        015|          106|  Michael Lee| 37|  Male| 63000|2014-09-30|
|        019|          103|  Steven Chen| 36|  Male| 62000|2015-08-01|
|        005|          103|    Jack Chan| 40|  Male| 60000|2013-04-01|
|        009|          103|      Tom Tan| 33|  Male| 58000|2016-06-01|
|        017|          105|  George Wang| 34|  Male| 57000|2016-03-15|
|        003|          102|    Bob Brown| 35|  Male| 55000|2014-05-01|
|        012|          105|   Susan Chen| 31|Female| 54000|2017-02-15|
|        020|          102|    Grace Kim| 32|Female| 53000|2018-11-01|
|     

In [0]:
df_final.orderBy(desc("salary")).show()

+-----------+-------------+-------------+---+------+------+----------+
|employee_id|department_id|         name|age|gender|salary| hire_date|
+-----------+-------------+-------------+---+------+------+----------+
|        013|          106|    Brian Kim| 45|  Male| 75000|2011-07-01|
|        007|          101|James Johnson| 42|  Male| 70000|2012-03-15|
|        011|          104|   David Park| 38|  Male| 65000|2015-11-01|
|        015|          106|  Michael Lee| 37|  Male| 63000|2014-09-30|
|        019|          103|  Steven Chen| 36|  Male| 62000|2015-08-01|
|        005|          103|    Jack Chan| 40|  Male| 60000|2013-04-01|
|        009|          103|      Tom Tan| 33|  Male| 58000|2016-06-01|
|        017|          105|  George Wang| 34|  Male| 57000|2016-03-15|
|        003|          102|    Bob Brown| 35|  Male| 55000|2014-05-01|
|        012|          105|   Susan Chen| 31|Female| 54000|2017-02-15|
|        020|          102|    Grace Kim| 32|Female| 53000|2018-11-01|
|     

In [0]:
df_final.orderBy(asc("gender"), desc("salary")).show()

+-----------+-------------+-------------+---+------+------+----------+
|employee_id|department_id|         name|age|gender|salary| hire_date|
+-----------+-------------+-------------+---+------+------+----------+
|        018|          104|    Nancy Liu| 29|      | 50000|2017-06-01|
|        012|          105|   Susan Chen| 31|Female| 54000|2017-02-15|
|        020|          102|    Grace Kim| 32|Female| 53000|2018-11-01|
|        020|          102|    Grace Kim| 32|Female| 53000|2018-11-01|
|        006|          103|    Jill Wong| 32|Female| 52000|2018-07-01|
|        008|          102|     Kate Kim| 29|Female| 51000|2019-10-01|
|        016|          107|  Kelly Zhang| 30|Female| 49000|2018-04-01|
|        004|          102|    Alice Lee| 28|Female| 48000|2017-09-30|
|        010|          104|     Lisa Lee| 27|Female| 47000|2018-08-01|
|        014|          107|    Emily Lee| 26|Female| 46000|2019-01-01|
|        002|          101|   Jane Smith| 25|Female| 45000|2016-02-15|
|     

**_Aggregation in Pyspark_**

In [0]:
#Let calculate average salary by gender

In [0]:
avg_sal_by_gen = df_final.groupBy("gender").agg(round(avg("salary")).alias("avg_salary"))

In [0]:
avg_sal_by_gen.show()

+------+----------+
|gender|avg_salary|
+------+----------+
|  Male|   60000.0|
|Female|   49667.0|
|      |   50000.0|
+------+----------+



In [0]:
male_avg_sal,female_avg_sal = avg_sal_by_gen.collect()[0][1],avg_sal_by_gen.collect()[1][1]

In [0]:
df_final.columns

Out[56]: ['employee_id',
 'department_id',
 'name',
 'age',
 'gender',
 'salary',
 'hire_date']

In [0]:
df_final.filter(col("salary").isNull()).show()

+-----------+-------------+---------+---+------+------+----------+
|employee_id|department_id|     name|age|gender|salary| hire_date|
+-----------+-------------+---------+---+------+------+----------+
|        013|          106|Brian Kim| 45|  Male|  null|2011-07-01|
|        008|          102| Kate Kim| 29|Female|  null|2019-10-01|
+-----------+-------------+---------+---+------+------+----------+



**_Filling the missing values with average based on gender_**

In [0]:
df_final.withColumn("salary", when((col("gender") == "Male") & (col("salary").isNull()), lit(male_avg_sal))\
    .when((col("gender") == "Female") & (col("salary").isNull()), lit(female_avg_sal)).otherwise(col("salary"))
    ).show()

+-----------+-------------+-------------+---+------+-------+----------+
|employee_id|department_id|         name|age|gender| salary| hire_date|
+-----------+-------------+-------------+---+------+-------+----------+
|        011|          104|   David Park| 38|  Male|  65000|2015-11-01|
|        012|          105|   Susan Chen| 31|Female|  54000|2017-02-15|
|        013|          106|    Brian Kim| 45|  Male|60000.0|2011-07-01|
|        014|          107|    Emily Lee| 26|Female|  46000|2019-01-01|
|        015|          106|  Michael Lee| 37|  Male|  63000|2014-09-30|
|        016|          107|  Kelly Zhang| 30|Female|  49000|2018-04-01|
|        017|          105|  George Wang| 34|  Male|  57000|2016-03-15|
|        018|          104|    Nancy Liu| 29|      |  50000|2017-06-01|
|        019|          103|  Steven Chen| 36|  Male|  62000|2015-08-01|
|        020|          102|    Grace Kim| 32|Female|  53000|2018-11-01|
|        001|          101|     John Doe| 30|  Male|  50000|2015

In [0]:
#Count of employees for each department

df_final.groupBy("department_id").count().show()

+-------------+-----+
|department_id|count|
+-------------+-----+
|          104|    3|
|          105|    2|
|          106|    2|
|          107|    2|
|          102|    5|
|          103|    4|
|          101|    3|
+-------------+-----+

