<a href="https://colab.research.google.com/github/AriyantGit/AriyantGit/blob/main/spark_practice_s1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("Basic Transformation - I")
    .master("local[*]")
    .getOrCreate()
)

spark

In [3]:
emp_data = [
    ["001","101","John Doe","30","Male","50000","2015-01-01"],
    ["002","101","Jane Smith","25","Female","45000","2016-02-15"],
    ["003","102","Bob Brown","35","Male","55000","2014-05-01"],
    ["004","102","Alice Lee","28","Female","48000","2017-09-30"],
    ["005","103","Jack Chan","40","Male","60000","2013-04-01"],
    ["006","103","Jill Wong","32","Female","52000","2018-07-01"],
    ["007","101","James Johnson","42","Male","70000","2012-03-15"],
    ["008","102","Kate Kim","29","Female","51000","2019-10-01"],
    ["009","103","Tom Tan","33","Male","58000","2016-06-01"],
    ["010","104","Lisa Lee","27","Female","47000","2018-08-01"],
    ["011","104","David Park","38","Male","65000","2015-11-01"],
    ["012","105","Susan Chen","31","Female","54000","2017-02-15"],
    ["013","106","Brian Kim","45","Male","75000","2011-07-01"],
    ["014","107","Emily Lee","26","Female","46000","2019-01-01"],
    ["015","106","Michael Lee","37","Male","63000","2014-09-30"],
    ["016","107","Kelly Zhang","30","Female","49000","2018-04-01"],
    ["017","105","George Wang","34","Male","57000","2016-03-15"],
    ["018","104","Nancy Liu","29","Female","50000","2017-06-01"],
    ["019","103","Steven Chen","36","Male","62000","2015-08-01"],
    ["020","102","Grace Kim","32","Female","53000","2018-11-01"]
]

emp_schema = "employee_id string, department_id string, name string, age string, gender string, salary string, hire_date string"

In [4]:
emp = spark.createDataFrame(data=emp_data, schema=emp_schema)

In [5]:
emp.show()

+-----------+-------------+-------------+---+------+------+----------+
|employee_id|department_id|         name|age|gender|salary| hire_date|
+-----------+-------------+-------------+---+------+------+----------+
|        001|          101|     John Doe| 30|  Male| 50000|2015-01-01|
|        002|          101|   Jane Smith| 25|Female| 45000|2016-02-15|
|        003|          102|    Bob Brown| 35|  Male| 55000|2014-05-01|
|        004|          102|    Alice Lee| 28|Female| 48000|2017-09-30|
|        005|          103|    Jack Chan| 40|  Male| 60000|2013-04-01|
|        006|          103|    Jill Wong| 32|Female| 52000|2018-07-01|
|        007|          101|James Johnson| 42|  Male| 70000|2012-03-15|
|        008|          102|     Kate Kim| 29|Female| 51000|2019-10-01|
|        009|          103|      Tom Tan| 33|  Male| 58000|2016-06-01|
|        010|          104|     Lisa Lee| 27|Female| 47000|2018-08-01|
|        011|          104|   David Park| 38|  Male| 65000|2015-11-01|
|     

In [6]:
emp_salary = emp.where("salary>50000")

In [7]:
emp_salary.show()

+-----------+-------------+-------------+---+------+------+----------+
|employee_id|department_id|         name|age|gender|salary| hire_date|
+-----------+-------------+-------------+---+------+------+----------+
|        003|          102|    Bob Brown| 35|  Male| 55000|2014-05-01|
|        005|          103|    Jack Chan| 40|  Male| 60000|2013-04-01|
|        006|          103|    Jill Wong| 32|Female| 52000|2018-07-01|
|        007|          101|James Johnson| 42|  Male| 70000|2012-03-15|
|        008|          102|     Kate Kim| 29|Female| 51000|2019-10-01|
|        009|          103|      Tom Tan| 33|  Male| 58000|2016-06-01|
|        011|          104|   David Park| 38|  Male| 65000|2015-11-01|
|        012|          105|   Susan Chen| 31|Female| 54000|2017-02-15|
|        013|          106|    Brian Kim| 45|  Male| 75000|2011-07-01|
|        015|          106|  Michael Lee| 37|  Male| 63000|2014-09-30|
|        017|          105|  George Wang| 34|  Male| 57000|2016-03-15|
|     

In [8]:
emp_salary.rdd.getNumPartitions()

2

In [9]:
emp_salary.write.format("csv").save("data/save/emp_salary.csv")

In [10]:
emp_salary.printSchema()

root
 |-- employee_id: string (nullable = true)
 |-- department_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- hire_date: string (nullable = true)



In [11]:
from pyspark.sql.functions import col,cast

In [12]:
emp_casted = emp.select("employee_id","name",col("age").alias("age").cast("int"),"gender",col("salary").cast("double"),"hire_date")

In [13]:
emp_casted.printSchema()

root
 |-- employee_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: double (nullable = true)
 |-- hire_date: string (nullable = true)



In [23]:
filter_col = emp.selectExpr("employee_id as emp_id","name","cast(age as int)","gender","cast(salary as double)","hire_date")

In [24]:
filter_col.show()

+------+-------------+---+------+-------+----------+
|emp_id|         name|age|gender| salary| hire_date|
+------+-------------+---+------+-------+----------+
|   001|     John Doe| 30|  Male|50000.0|2015-01-01|
|   002|   Jane Smith| 25|Female|45000.0|2016-02-15|
|   003|    Bob Brown| 35|  Male|55000.0|2014-05-01|
|   004|    Alice Lee| 28|Female|48000.0|2017-09-30|
|   005|    Jack Chan| 40|  Male|60000.0|2013-04-01|
|   006|    Jill Wong| 32|Female|52000.0|2018-07-01|
|   007|James Johnson| 42|  Male|70000.0|2012-03-15|
|   008|     Kate Kim| 29|Female|51000.0|2019-10-01|
|   009|      Tom Tan| 33|  Male|58000.0|2016-06-01|
|   010|     Lisa Lee| 27|Female|47000.0|2018-08-01|
|   011|   David Park| 38|  Male|65000.0|2015-11-01|
|   012|   Susan Chen| 31|Female|54000.0|2017-02-15|
|   013|    Brian Kim| 45|  Male|75000.0|2011-07-01|
|   014|    Emily Lee| 26|Female|46000.0|2019-01-01|
|   015|  Michael Lee| 37|  Male|63000.0|2014-09-30|
|   016|  Kelly Zhang| 30|Female|49000.0|2018-

In [22]:
filter_col.printSchema()

root
 |-- emp_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: double (nullable = true)
 |-- hire_date: string (nullable = true)



In [25]:
filter_col.select("emp_id","name","age","salary").where("age>30").show()

+------+-------------+---+-------+
|emp_id|         name|age| salary|
+------+-------------+---+-------+
|   003|    Bob Brown| 35|55000.0|
|   005|    Jack Chan| 40|60000.0|
|   006|    Jill Wong| 32|52000.0|
|   007|James Johnson| 42|70000.0|
|   009|      Tom Tan| 33|58000.0|
|   011|   David Park| 38|65000.0|
|   012|   Susan Chen| 31|54000.0|
|   013|    Brian Kim| 45|75000.0|
|   015|  Michael Lee| 37|63000.0|
|   017|  George Wang| 34|57000.0|
|   019|  Steven Chen| 36|62000.0|
|   020|    Grace Kim| 32|53000.0|
+------+-------------+---+-------+



In [26]:
filter_col

DataFrame[emp_id: string, name: string, age: int, gender: string, salary: double, hire_date: string]

In [27]:
##ADD NEW COLUM TO DATAFRAME

In [60]:
from pyspark.sql.functions import round,when
emp_taxed=filter_col.select("emp_id","name","age","salary").where("salary>10000").withColumn("tax_ammount",round(col("salary")*.14,2).cast("int")).withColumn("gross_Salary",col("salary") - col("tax_ammount"))

In [61]:
emp_taxed.show()

+------+-------------+---+-------+-----------+------------+
|emp_id|         name|age| salary|tax_ammount|gross_Salary|
+------+-------------+---+-------+-----------+------------+
|   001|     John Doe| 30|50000.0|       7000|     43000.0|
|   002|   Jane Smith| 25|45000.0|       6300|     38700.0|
|   003|    Bob Brown| 35|55000.0|       7700|     47300.0|
|   004|    Alice Lee| 28|48000.0|       6720|     41280.0|
|   005|    Jack Chan| 40|60000.0|       8400|     51600.0|
|   006|    Jill Wong| 32|52000.0|       7280|     44720.0|
|   007|James Johnson| 42|70000.0|       9800|     60200.0|
|   008|     Kate Kim| 29|51000.0|       7140|     43860.0|
|   009|      Tom Tan| 33|58000.0|       8120|     49880.0|
|   010|     Lisa Lee| 27|47000.0|       6580|     40420.0|
|   011|   David Park| 38|65000.0|       9100|     55900.0|
|   012|   Susan Chen| 31|54000.0|       7560|     46440.0|
|   013|    Brian Kim| 45|75000.0|      10500|     64500.0|
|   014|    Emily Lee| 26|46000.0|      

In [118]:
all_emp_taxed=filter_col.select("emp_id","name","age","salary").withColumn("tax_ammount",when((col("salary")>=50000) & (col("salary")<=60000),round(col("salary")*.14,2).cast("int")).when(col("salary")>60000,round(col("salary")*.16,2)).otherwise(0)).withColumn("gross_Salary",col("salary") - col("tax_ammount"))

In [73]:
all_emp_taxed.show()

+------+-------------+---+-------+-----------+------------+
|emp_id|         name|age| salary|tax_ammount|gross_Salary|
+------+-------------+---+-------+-----------+------------+
|   001|     John Doe| 30|50000.0|     7000.0|     43000.0|
|   002|   Jane Smith| 25|45000.0|        0.0|     45000.0|
|   003|    Bob Brown| 35|55000.0|     7700.0|     47300.0|
|   004|    Alice Lee| 28|48000.0|        0.0|     48000.0|
|   005|    Jack Chan| 40|60000.0|     8400.0|     51600.0|
|   006|    Jill Wong| 32|52000.0|     7280.0|     44720.0|
|   007|James Johnson| 42|70000.0|    11200.0|     58800.0|
|   008|     Kate Kim| 29|51000.0|     7140.0|     43860.0|
|   009|      Tom Tan| 33|58000.0|     8120.0|     49880.0|
|   010|     Lisa Lee| 27|47000.0|        0.0|     47000.0|
|   011|   David Park| 38|65000.0|    10400.0|     54600.0|
|   012|   Susan Chen| 31|54000.0|     7560.0|     46440.0|
|   013|    Brian Kim| 45|75000.0|    12000.0|     63000.0|
|   014|    Emily Lee| 26|46000.0|      

In [81]:
###Add a static value to a dataframe
from pyspark.sql.functions import lit
all_emp = all_emp_taxed.withColumn("active",lit(True))

In [82]:
all_emp.show()

+------+-------------+---+-------+-----------+------------+------+
|emp_id|         name|age| salary|tax_ammount|gross_Salary|active|
+------+-------------+---+-------+-----------+------------+------+
|   001|     John Doe| 30|50000.0|     7000.0|     43000.0|  true|
|   002|   Jane Smith| 25|45000.0|        0.0|     45000.0|  true|
|   003|    Bob Brown| 35|55000.0|     7700.0|     47300.0|  true|
|   004|    Alice Lee| 28|48000.0|        0.0|     48000.0|  true|
|   005|    Jack Chan| 40|60000.0|     8400.0|     51600.0|  true|
|   006|    Jill Wong| 32|52000.0|     7280.0|     44720.0|  true|
|   007|James Johnson| 42|70000.0|    11200.0|     58800.0|  true|
|   008|     Kate Kim| 29|51000.0|     7140.0|     43860.0|  true|
|   009|      Tom Tan| 33|58000.0|     8120.0|     49880.0|  true|
|   010|     Lisa Lee| 27|47000.0|        0.0|     47000.0|  true|
|   011|   David Park| 38|65000.0|    10400.0|     54600.0|  true|
|   012|   Susan Chen| 31|54000.0|     7560.0|     46440.0|  t

In [83]:
###Add gender column randomly in dataframe

In [84]:
from pyspark.sql.functions import rand
all_emp = all_emp.withColumn("gender",when(rand()>.5,"Male").otherwise("Female"))

In [85]:
all_emp.show()

+------+-------------+---+-------+-----------+------------+------+------+
|emp_id|         name|age| salary|tax_ammount|gross_Salary|active|gender|
+------+-------------+---+-------+-----------+------------+------+------+
|   001|     John Doe| 30|50000.0|     7000.0|     43000.0|  true|  Male|
|   002|   Jane Smith| 25|45000.0|        0.0|     45000.0|  true|  Male|
|   003|    Bob Brown| 35|55000.0|     7700.0|     47300.0|  true|Female|
|   004|    Alice Lee| 28|48000.0|        0.0|     48000.0|  true|  Male|
|   005|    Jack Chan| 40|60000.0|     8400.0|     51600.0|  true|  Male|
|   006|    Jill Wong| 32|52000.0|     7280.0|     44720.0|  true|Female|
|   007|James Johnson| 42|70000.0|    11200.0|     58800.0|  true|Female|
|   008|     Kate Kim| 29|51000.0|     7140.0|     43860.0|  true|  Male|
|   009|      Tom Tan| 33|58000.0|     8120.0|     49880.0|  true|  Male|
|   010|     Lisa Lee| 27|47000.0|        0.0|     47000.0|  true|Female|
|   011|   David Park| 38|65000.0|    

In [89]:
#Rename column
all_emp = all_emp.withColumnRenamed("name","emp_name")

In [90]:
all_emp.show()

+------+-------------+---+-------+-----------+------------+------+------+
|emp_id|     emp_name|age| salary|tax_ammount|gross_Salary|active|gender|
+------+-------------+---+-------+-----------+------------+------+------+
|   001|     John Doe| 30|50000.0|     7000.0|     43000.0|  true|  Male|
|   002|   Jane Smith| 25|45000.0|        0.0|     45000.0|  true|  Male|
|   003|    Bob Brown| 35|55000.0|     7700.0|     47300.0|  true|Female|
|   004|    Alice Lee| 28|48000.0|        0.0|     48000.0|  true|  Male|
|   005|    Jack Chan| 40|60000.0|     8400.0|     51600.0|  true|  Male|
|   006|    Jill Wong| 32|52000.0|     7280.0|     44720.0|  true|Female|
|   007|James Johnson| 42|70000.0|    11200.0|     58800.0|  true|Female|
|   008|     Kate Kim| 29|51000.0|     7140.0|     43860.0|  true|  Male|
|   009|      Tom Tan| 33|58000.0|     8120.0|     49880.0|  true|  Male|
|   010|     Lisa Lee| 27|47000.0|        0.0|     47000.0|  true|Female|
|   011|   David Park| 38|65000.0|    

In [91]:
##rename all the column which is not start with emp except tax_amount

In [119]:
all_emp_renamed = all_emp.selectExpr(*[
 f"{col_name} as {col_name if col_name.startswith('emp') else f'emp_{col_name}'}"
    for col_name in all_emp.columns])

In [120]:
all_emp_renamed.show()

+------+-------------+-------+----------+---------------+----------------+----------+----------+
|emp_id|     emp_name|emp_age|emp_salary|emp_tax_ammount|emp_gross_Salary|emp_active|emp_gender|
+------+-------------+-------+----------+---------------+----------------+----------+----------+
|   001|     John Doe|     30|   50000.0|         7000.0|         43000.0|      true|      Male|
|   002|   Jane Smith|     25|   45000.0|            0.0|         45000.0|      true|      Male|
|   003|    Bob Brown|     35|   55000.0|         7700.0|         47300.0|      true|    Female|
|   004|    Alice Lee|     28|   48000.0|            0.0|         48000.0|      true|      Male|
|   005|    Jack Chan|     40|   60000.0|         8400.0|         51600.0|      true|      Male|
|   006|    Jill Wong|     32|   52000.0|         7280.0|         44720.0|      true|    Female|
|   007|James Johnson|     42|   70000.0|        11200.0|         58800.0|      true|    Female|
|   008|     Kate Kim|     29|