In [1]:
# Spark Session
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("Working with Strings & Dates")
    .master("local[*]")
    .getOrCreate()
)

spark

In [2]:
# Emp Data & Schema

emp_data = [
    ["001","101","John Doe","30","Male","50000","2015-01-01"],
    ["002","101","Jane Smith","25","Female","45000","2016-02-15"],
    ["003","102","Bob Brown","35","Male","55000","2014-05-01"],
    ["004","102","Alice Lee","28","Female","48000","2017-09-30"],
    ["005","103","Jack Chan","40","Male","60000","2013-04-01"],
    ["006","103","Jill Wong","32","Female","52000","2018-07-01"],
    ["007","101","James Johnson","42","Male","70000","2012-03-15"],
    ["008","102","Kate Kim","29","Female","51000","2019-10-01"],
    ["009","103","Tom Tan","33","Male","58000","2016-06-01"],
    ["010","104","Lisa Lee","27","Female","47000","2018-08-01"],
    ["011","104","David Park","38","Male","65000","2015-11-01"],
    ["012","105","Susan Chen","31","Female","54000","2017-02-15"],
    ["013","106","Brian Kim","45","Male","75000","2011-07-01"],
    ["014","107","Emily Lee","26","Female","46000","2019-01-01"],
    ["015","106","Michael Lee","37","Male","63000","2014-09-30"],
    ["016","107","Kelly Zhang","30","Female","49000","2018-04-01"],
    ["017","105","George Wang","34","Male","57000","2016-03-15"],
    ["018","104","Nancy Liu","29","","50000","2017-06-01"],
    ["019","103","Steven Chen","36","Male","62000","2015-08-01"],
    ["020","102","Grace Kim","32","Female","53000","2018-11-01"]
]

emp_schema = "employee_id string, department_id string, name string, age string, gender string, salary string, hire_date string"

In [6]:
# Create emp DataFrame

emp_df = spark.createDataFrame(data=emp_data, schema=emp_schema)

In [7]:
# Show emp dataframe (ACTION)

emp_df.show()

+-----------+-------------+-------------+---+------+------+----------+
|employee_id|department_id|         name|age|gender|salary| hire_date|
+-----------+-------------+-------------+---+------+------+----------+
|        001|          101|     John Doe| 30|  Male| 50000|2015-01-01|
|        002|          101|   Jane Smith| 25|Female| 45000|2016-02-15|
|        003|          102|    Bob Brown| 35|  Male| 55000|2014-05-01|
|        004|          102|    Alice Lee| 28|Female| 48000|2017-09-30|
|        005|          103|    Jack Chan| 40|  Male| 60000|2013-04-01|
|        006|          103|    Jill Wong| 32|Female| 52000|2018-07-01|
|        007|          101|James Johnson| 42|  Male| 70000|2012-03-15|
|        008|          102|     Kate Kim| 29|Female| 51000|2019-10-01|
|        009|          103|      Tom Tan| 33|  Male| 58000|2016-06-01|
|        010|          104|     Lisa Lee| 27|Female| 47000|2018-08-01|
|        011|          104|   David Park| 38|  Male| 65000|2015-11-01|
|     

In [8]:
# Print Schema

emp_df.printSchema()

root
 |-- employee_id: string (nullable = true)
 |-- department_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- hire_date: string (nullable = true)



In [12]:
# Case When
# select employee_id, name, age, salary, gender,
# case when gender = 'Male' then 'M' when gender = 'Female' then 'F' else null end as new_gender, hire_date from emp
from pyspark.sql.functions import when, col, expr

emp_gender_df = emp_df.withColumn("new_gender", when(col("gender") == "Male", "M")
                                               .when(col("gender") == "Female", "F")
                                               .otherwise(None)
                                 )

emp_gender_df2 = emp_df.withColumn("new_gender", expr("""case when gender = 'Male' then 'M'
                                                              when gender = 'Female' then 'F'
                                                              else null
                                                          end
                                                       """)
                                  )


In [11]:
emp_gender_df.show()

+-----------+-------------+-------------+---+------+------+----------+----------+
|employee_id|department_id|         name|age|gender|salary| hire_date|new_gender|
+-----------+-------------+-------------+---+------+------+----------+----------+
|        001|          101|     John Doe| 30|  Male| 50000|2015-01-01|         M|
|        002|          101|   Jane Smith| 25|Female| 45000|2016-02-15|         F|
|        003|          102|    Bob Brown| 35|  Male| 55000|2014-05-01|         M|
|        004|          102|    Alice Lee| 28|Female| 48000|2017-09-30|         F|
|        005|          103|    Jack Chan| 40|  Male| 60000|2013-04-01|         M|
|        006|          103|    Jill Wong| 32|Female| 52000|2018-07-01|         F|
|        007|          101|James Johnson| 42|  Male| 70000|2012-03-15|         M|
|        008|          102|     Kate Kim| 29|Female| 51000|2019-10-01|         F|
|        009|          103|      Tom Tan| 33|  Male| 58000|2016-06-01|         M|
|        010|   

In [13]:
emp_gender_df2.show()

+-----------+-------------+-------------+---+------+------+----------+----------+
|employee_id|department_id|         name|age|gender|salary| hire_date|new_gender|
+-----------+-------------+-------------+---+------+------+----------+----------+
|        001|          101|     John Doe| 30|  Male| 50000|2015-01-01|         M|
|        002|          101|   Jane Smith| 25|Female| 45000|2016-02-15|         F|
|        003|          102|    Bob Brown| 35|  Male| 55000|2014-05-01|         M|
|        004|          102|    Alice Lee| 28|Female| 48000|2017-09-30|         F|
|        005|          103|    Jack Chan| 40|  Male| 60000|2013-04-01|         M|
|        006|          103|    Jill Wong| 32|Female| 52000|2018-07-01|         F|
|        007|          101|James Johnson| 42|  Male| 70000|2012-03-15|         M|
|        008|          102|     Kate Kim| 29|Female| 51000|2019-10-01|         F|
|        009|          103|      Tom Tan| 33|  Male| 58000|2016-06-01|         M|
|        010|   

In [14]:
# Replace in Strings
# select employee_id, name, replace(name, 'J', 'Z') as new_name, age, salary, gender, new_gender, hire_date from emp_gender_fixed
# link for string functions on spark
# https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html#string-functions
from pyspark.sql.functions import regexp_replace

emp_name_changed_df = emp_gender_df.withColumn('new_name', regexp_replace(col('name'), 'J', 'Z'))

In [15]:
emp_name_changed_df.show(5)

+-----------+-------------+----------+---+------+------+----------+----------+----------+
|employee_id|department_id|      name|age|gender|salary| hire_date|new_gender|  new_name|
+-----------+-------------+----------+---+------+------+----------+----------+----------+
|        001|          101|  John Doe| 30|  Male| 50000|2015-01-01|         M|  Zohn Doe|
|        002|          101|Jane Smith| 25|Female| 45000|2016-02-15|         F|Zane Smith|
|        003|          102| Bob Brown| 35|  Male| 55000|2014-05-01|         M| Bob Brown|
|        004|          102| Alice Lee| 28|Female| 48000|2017-09-30|         F| Alice Lee|
|        005|          103| Jack Chan| 40|  Male| 60000|2013-04-01|         M| Zack Chan|
+-----------+-------------+----------+---+------+------+----------+----------+----------+
only showing top 5 rows



In [18]:
# Convert Date
# select *,  to_date(hire_date, 'YYYY-MM-DD') as hire_date from emp_name_fixed
# Link for date finctions and patterns in spark
# https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html
from pyspark.sql.functions import to_date

emp_date_fixed_df = emp_name_changed_df.withColumn('new_hire_date', to_date(col('hire_date'), 'yyyy-MM-dd'))

# this will replace the column
# emp_date_fixed_df = emp_name_changed_df.withColumn('hire_date', to_date(col('hire_date'), 'yyyy-MM-dd'))

In [20]:
emp_date_fixed_df.printSchema()

root
 |-- employee_id: string (nullable = true)
 |-- department_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- hire_date: string (nullable = true)
 |-- new_gender: string (nullable = true)
 |-- new_name: string (nullable = true)
 |-- new_hire_date: date (nullable = true)



In [19]:
emp_date_fixed_df.show(5)

+-----------+-------------+----------+---+------+------+----------+----------+----------+-------------+
|employee_id|department_id|      name|age|gender|salary| hire_date|new_gender|  new_name|new_hire_date|
+-----------+-------------+----------+---+------+------+----------+----------+----------+-------------+
|        001|          101|  John Doe| 30|  Male| 50000|2015-01-01|         M|  Zohn Doe|   2015-01-01|
|        002|          101|Jane Smith| 25|Female| 45000|2016-02-15|         F|Zane Smith|   2016-02-15|
|        003|          102| Bob Brown| 35|  Male| 55000|2014-05-01|         M| Bob Brown|   2014-05-01|
|        004|          102| Alice Lee| 28|Female| 48000|2017-09-30|         F| Alice Lee|   2017-09-30|
|        005|          103| Jack Chan| 40|  Male| 60000|2013-04-01|         M| Zack Chan|   2013-04-01|
+-----------+-------------+----------+---+------+------+----------+----------+----------+-------------+
only showing top 5 rows



In [21]:
# Add Date Columns
# Add current_date, current_timestamp, extract year from hire_date
from pyspark.sql.functions import current_date, current_timestamp

emp_dated_df = emp_date_fixed_df.withColumn('date_now', current_date()).withColumn('timestamp_now', current_timestamp())

In [23]:
emp_dated_df.printSchema()

root
 |-- employee_id: string (nullable = true)
 |-- department_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- hire_date: string (nullable = true)
 |-- new_gender: string (nullable = true)
 |-- new_name: string (nullable = true)
 |-- new_hire_date: date (nullable = true)
 |-- date_now: date (nullable = false)
 |-- timestamp_now: timestamp (nullable = false)



In [25]:
emp_dated_droped_df = emp_dated_df.drop('gender', 'name', 'hire_date').withColumnRenamed('new_name', 'name') \
                                                                      .withColumnRenamed('new_hire_date', 'hire_date')\
                                                                      .withColumnRenamed('new_gender', 'gender')

In [26]:
emp_dated_droped_df.printSchema()

root
 |-- employee_id: string (nullable = true)
 |-- department_id: string (nullable = true)
 |-- age: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- name: string (nullable = true)
 |-- hire_date: date (nullable = true)
 |-- date_now: date (nullable = false)
 |-- timestamp_now: timestamp (nullable = false)



In [28]:
emp_dated_droped_df.show()

+-----------+-------------+---+------+------+-------------+----------+----------+--------------------+
|employee_id|department_id|age|salary|gender|         name| hire_date|  date_now|       timestamp_now|
+-----------+-------------+---+------+------+-------------+----------+----------+--------------------+
|        001|          101| 30| 50000|     M|     Zohn Doe|2015-01-01|2024-12-22|2024-12-22 22:53:...|
|        002|          101| 25| 45000|     F|   Zane Smith|2016-02-15|2024-12-22|2024-12-22 22:53:...|
|        003|          102| 35| 55000|     M|    Bob Brown|2014-05-01|2024-12-22|2024-12-22 22:53:...|
|        004|          102| 28| 48000|     F|    Alice Lee|2017-09-30|2024-12-22|2024-12-22 22:53:...|
|        005|          103| 40| 60000|     M|    Zack Chan|2013-04-01|2024-12-22|2024-12-22 22:53:...|
|        006|          103| 32| 52000|     F|    Zill Wong|2018-07-01|2024-12-22|2024-12-22 22:53:...|
|        007|          101| 42| 70000|     M|Zames Zohnson|2012-03-15|202

In [29]:
# Drop Null gender records
emp_dated_droped_null_df = emp_dated_droped_df.na.drop()

In [31]:
emp_dated_droped_null_df.show()

+-----------+-------------+---+------+------+-------------+----------+----------+--------------------+
|employee_id|department_id|age|salary|gender|         name| hire_date|  date_now|       timestamp_now|
+-----------+-------------+---+------+------+-------------+----------+----------+--------------------+
|        001|          101| 30| 50000|     M|     Zohn Doe|2015-01-01|2024-12-22|2024-12-22 22:53:...|
|        002|          101| 25| 45000|     F|   Zane Smith|2016-02-15|2024-12-22|2024-12-22 22:53:...|
|        003|          102| 35| 55000|     M|    Bob Brown|2014-05-01|2024-12-22|2024-12-22 22:53:...|
|        004|          102| 28| 48000|     F|    Alice Lee|2017-09-30|2024-12-22|2024-12-22 22:53:...|
|        005|          103| 40| 60000|     M|    Zack Chan|2013-04-01|2024-12-22|2024-12-22 22:53:...|
|        006|          103| 32| 52000|     F|    Zill Wong|2018-07-01|2024-12-22|2024-12-22 22:53:...|
|        007|          101| 42| 70000|     M|Zames Zohnson|2012-03-15|202

In [32]:
# Fix Null values
# select *, nvl('new_gender', 'O') as new_gender from emp_dated
from pyspark.sql.functions import coalesce, lit

emp_dated_droped_fix_null_df = emp_dated_droped_df.withColumn("gender", coalesce(col("gender"), lit("O")))

In [33]:
emp_dated_droped_fix_null_df.show()

+-----------+-------------+---+------+------+-------------+----------+----------+--------------------+
|employee_id|department_id|age|salary|gender|         name| hire_date|  date_now|       timestamp_now|
+-----------+-------------+---+------+------+-------------+----------+----------+--------------------+
|        001|          101| 30| 50000|     M|     Zohn Doe|2015-01-01|2024-12-22|2024-12-22 22:55:...|
|        002|          101| 25| 45000|     F|   Zane Smith|2016-02-15|2024-12-22|2024-12-22 22:55:...|
|        003|          102| 35| 55000|     M|    Bob Brown|2014-05-01|2024-12-22|2024-12-22 22:55:...|
|        004|          102| 28| 48000|     F|    Alice Lee|2017-09-30|2024-12-22|2024-12-22 22:55:...|
|        005|          103| 40| 60000|     M|    Zack Chan|2013-04-01|2024-12-22|2024-12-22 22:55:...|
|        006|          103| 32| 52000|     F|    Zill Wong|2018-07-01|2024-12-22|2024-12-22 22:55:...|
|        007|          101| 42| 70000|     M|Zames Zohnson|2012-03-15|202

In [43]:
# Convert date into String and extract date information
from pyspark.sql.functions import date_format

emp_final_df = emp_dated_droped_fix_null_df.withColumn('year', date_format(col('hire_date'),'y')) \
                                           .withColumn('quarter', date_format(col('hire_date'),'Q/q'))

In [44]:
emp_final_df.show()

+-----------+-------------+---+------+------+-------------+----------+----------+--------------------+----+-------+
|employee_id|department_id|age|salary|gender|         name| hire_date|  date_now|       timestamp_now|year|quarter|
+-----------+-------------+---+------+------+-------------+----------+----------+--------------------+----+-------+
|        001|          101| 30| 50000|     M|     Zohn Doe|2015-01-01|2024-12-22|2024-12-22 23:01:...|2015|    1/1|
|        002|          101| 25| 45000|     F|   Zane Smith|2016-02-15|2024-12-22|2024-12-22 23:01:...|2016|    1/1|
|        003|          102| 35| 55000|     M|    Bob Brown|2014-05-01|2024-12-22|2024-12-22 23:01:...|2014|    2/2|
|        004|          102| 28| 48000|     F|    Alice Lee|2017-09-30|2024-12-22|2024-12-22 23:01:...|2017|    3/3|
|        005|          103| 40| 60000|     M|    Zack Chan|2013-04-01|2024-12-22|2024-12-22 23:01:...|2013|    2/2|
|        006|          103| 32| 52000|     F|    Zill Wong|2018-07-01|20