In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark

loading the data

In [0]:
%fs
rm -r dbfs:/FileStore/tables/*.json

In [0]:
df_csv = spark.read.format("csv").option("header",True).load("dbfs:/FileStore/tables/batch.csv")
df_csv.show()

+---+-----+----------+----+------+----------+
| id| name|       dob| age|salary|department|
+---+-----+----------+----+------+----------+
|  1| John|1992-05-12|  30| 70000|        IT|
|  2|Alice|1997-02-28|  25| 60000|        HR|
|  3|  Bob|      null|null| 80000|        IT|
|  4|Emily|1994-11-22|  28| 65000|   Finance|
+---+-----+----------+----+------+----------+



In [0]:
df_csv.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- age: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- department: string (nullable = true)



In [0]:
from pyspark.sql.types import StructType,StructField, IntegerType, StringType, DateType

In [0]:
schema = StructType([
    StructField('id',IntegerType()),
    StructField('name',StringType()),
    StructField('dob',DateType()),
    StructField('age',IntegerType()),
    StructField('salary',IntegerType()),
    StructField('department',StringType())
    
])

In [0]:
df_csv = spark.read.format("csv").schema(schema).option("header",True).load("dbfs:/FileStore/tables/batch.csv")

In [0]:
df_csv.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- dob: date (nullable = true)
 |-- age: integer (nullable = true)
 |-- salary: integer (nullable = true)
 |-- department: string (nullable = true)



In [0]:
df_csv.show()

+---+-----+----------+----+------+----------+
| id| name|       dob| age|salary|department|
+---+-----+----------+----+------+----------+
|  1| John|1992-05-12|  30| 70000|        IT|
|  2|Alice|1997-02-28|  25| 60000|        HR|
|  3|  Bob|      null|null| 80000|        IT|
|  4|Emily|1994-11-22|  28| 65000|   Finance|
+---+-----+----------+----+------+----------+



In [0]:
df_json = spark.read.format("json").schema(schema).load("dbfs:/FileStore/tables/*1.json")
df_json.show()

+---+------+----------+---+------+----------+
| id|  name|       dob|age|salary|department|
+---+------+----------+---+------+----------+
| 10|Sophie|1992-06-30| 30| 62000|   Finance|
|  2| Alice|1997-02-28| 25| 90000|   Finance|
|  4| Emily|1994-11-22| 28| 70000|   Finance|
|  9| James|1983-10-14| 39| 87000|        IT|
|  1|  John|1992-05-12| 30| 70000|        IT|
|  8|  Lisa|1995-08-20| 27| 58000|        HR|
+---+------+----------+---+------+----------+



In [0]:
%fs ls dbfs:/FileStore/tables


path,name,size,modificationTime
dbfs:/FileStore/tables/10_20220101.json,10_20220101.json,102,1707461052000
dbfs:/FileStore/tables/1_20220101.json,1_20220101.json,94,1707461050000
dbfs:/FileStore/tables/2_20220101.json,2_20220101.json,100,1707461050000
dbfs:/FileStore/tables/4_20220101.json,4_20220101.json,100,1707461051000
dbfs:/FileStore/tables/8_20220101.json,8_20220101.json,94,1707461051000
dbfs:/FileStore/tables/9_20220101.json,9_20220101.json,95,1707461052000
dbfs:/FileStore/tables/Address.xlsx,Address.xlsx,151315,1706078413000
dbfs:/FileStore/tables/Detail-1.csv,Detail-1.csv,208476,1706074372000
dbfs:/FileStore/tables/Detail-2.csv,Detail-2.csv,208476,1706074471000
dbfs:/FileStore/tables/Detail.csv,Detail.csv,208476,1706073591000


In [0]:
df_json_batch = spark.read.format("json").schema(schema).load("dbfs:/FileStore/tables/batch.jsonl")

In [0]:
df_json_batch.show()

+---+-----+----------+----+------+----------+
| id| name|       dob| age|salary|department|
+---+-----+----------+----+------+----------+
|  1| John|1992-05-12|  30| 70000|        IT|
|  2|Alice|1997-02-28|  25| 60000|        HR|
|  3|  Bob|      null|null| 80000|        IT|
|  4|Emily|1994-11-22|  28| 65000|   Finance|
|  5|David|1981-12-18|  41| 90000|        HR|
|  6|Susan|1989-07-05|  33| 75000|   Finance|
|  7| Mike|1976-03-15|  46| 95000|        IT|
+---+-----+----------+----+------+----------+



In [0]:
df_csv.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- dob: date (nullable = true)
 |-- age: integer (nullable = true)
 |-- salary: integer (nullable = true)
 |-- department: string (nullable = true)



In [0]:
df_final_json = df_json.union(df_json_batch)
df_final_json.show()

+---+------+----------+----+------+----------+
| id|  name|       dob| age|salary|department|
+---+------+----------+----+------+----------+
| 10|Sophie|1992-06-30|  30| 62000|   Finance|
|  2| Alice|1997-02-28|  25| 90000|   Finance|
|  4| Emily|1994-11-22|  28| 70000|   Finance|
|  9| James|1983-10-14|  39| 87000|        IT|
|  1|  John|1992-05-12|  30| 70000|        IT|
|  8|  Lisa|1995-08-20|  27| 58000|        HR|
|  1|  John|1992-05-12|  30| 70000|        IT|
|  2| Alice|1997-02-28|  25| 60000|        HR|
|  3|   Bob|      null|null| 80000|        IT|
|  4| Emily|1994-11-22|  28| 65000|   Finance|
|  5| David|1981-12-18|  41| 90000|        HR|
|  6| Susan|1989-07-05|  33| 75000|   Finance|
|  7|  Mike|1976-03-15|  46| 95000|        IT|
+---+------+----------+----+------+----------+



In [0]:
df_json.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- dob: date (nullable = true)
 |-- age: integer (nullable = true)
 |-- salary: integer (nullable = true)
 |-- department: string (nullable = true)



In [0]:
print(df_csv.columns,df_final_json.columns)

['id', 'name', 'dob', 'age', 'salary', 'department'] ['id', 'name', 'dob', 'age', 'salary', 'department']


In [0]:
#to change the order of columns
df_final_json = df_final_json.select(df_csv.columns)
df_final_json.show()

+---+------+----------+----+------+----------+
| id|  name|       dob| age|salary|department|
+---+------+----------+----+------+----------+
| 10|Sophie|1992-06-30|  30| 62000|   Finance|
|  2| Alice|1997-02-28|  25| 90000|   Finance|
|  4| Emily|1994-11-22|  28| 70000|   Finance|
|  9| James|1983-10-14|  39| 87000|        IT|
|  1|  John|1992-05-12|  30| 70000|        IT|
|  8|  Lisa|1995-08-20|  27| 58000|        HR|
|  1|  John|1992-05-12|  30| 70000|        IT|
|  2| Alice|1997-02-28|  25| 60000|        HR|
|  3|   Bob|      null|null| 80000|        IT|
|  4| Emily|1994-11-22|  28| 65000|   Finance|
|  5| David|1981-12-18|  41| 90000|        HR|
|  6| Susan|1989-07-05|  33| 75000|   Finance|
|  7|  Mike|1976-03-15|  46| 95000|        IT|
+---+------+----------+----+------+----------+



Union and unionall gives same output in pyspark

In [0]:
df = df_csv.union(df_final_json)
df.show()

+---+------+----------+----+------+----------+
| id|  name|       dob| age|salary|department|
+---+------+----------+----+------+----------+
|  1|  John|1992-05-12|  30| 70000|        IT|
|  2| Alice|1997-02-28|  25| 60000|        HR|
|  3|   Bob|      null|null| 80000|        IT|
|  4| Emily|1994-11-22|  28| 65000|   Finance|
| 10|Sophie|1992-06-30|  30| 62000|   Finance|
|  2| Alice|1997-02-28|  25| 90000|   Finance|
|  4| Emily|1994-11-22|  28| 70000|   Finance|
|  9| James|1983-10-14|  39| 87000|        IT|
|  1|  John|1992-05-12|  30| 70000|        IT|
|  8|  Lisa|1995-08-20|  27| 58000|        HR|
|  1|  John|1992-05-12|  30| 70000|        IT|
|  2| Alice|1997-02-28|  25| 60000|        HR|
|  3|   Bob|      null|null| 80000|        IT|
|  4| Emily|1994-11-22|  28| 65000|   Finance|
|  5| David|1981-12-18|  41| 90000|        HR|
|  6| Susan|1989-07-05|  33| 75000|   Finance|
|  7|  Mike|1976-03-15|  46| 95000|        IT|
+---+------+----------+----+------+----------+



In [0]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- dob: date (nullable = true)
 |-- age: integer (nullable = true)
 |-- salary: integer (nullable = true)
 |-- department: string (nullable = true)



In [0]:
df = df.dropDuplicates()
df.show()

+---+------+----------+----+------+----------+
| id|  name|       dob| age|salary|department|
+---+------+----------+----+------+----------+
|  3|   Bob|      null|null| 80000|        IT|
|  4| Emily|1994-11-22|  28| 65000|   Finance|
|  2| Alice|1997-02-28|  25| 60000|        HR|
|  1|  John|1992-05-12|  30| 70000|        IT|
| 10|Sophie|1992-06-30|  30| 62000|   Finance|
|  2| Alice|1997-02-28|  25| 90000|   Finance|
|  4| Emily|1994-11-22|  28| 70000|   Finance|
|  9| James|1983-10-14|  39| 87000|        IT|
|  8|  Lisa|1995-08-20|  27| 58000|        HR|
|  5| David|1981-12-18|  41| 90000|        HR|
|  6| Susan|1989-07-05|  33| 75000|   Finance|
|  7|  Mike|1976-03-15|  46| 95000|        IT|
+---+------+----------+----+------+----------+



In [0]:
df.count()

Out[87]: 12

In [0]:
df.select(
    "salary",
    "age"
).show()

df.select(
    ["salary","age"]
).show()

df.select(
    df.salary,
    df.age
).show()

df.select(
    f.col("salary"),
    f.col("age")
).show()

+------+----+
|salary| age|
+------+----+
| 80000|null|
| 65000|  28|
| 60000|  25|
| 70000|  30|
| 62000|  30|
| 90000|  25|
| 70000|  28|
| 87000|  39|
| 58000|  27|
| 90000|  41|
| 75000|  33|
| 95000|  46|
+------+----+

+------+----+
|salary| age|
+------+----+
| 80000|null|
| 65000|  28|
| 60000|  25|
| 70000|  30|
| 62000|  30|
| 90000|  25|
| 70000|  28|
| 87000|  39|
| 58000|  27|
| 90000|  41|
| 75000|  33|
| 95000|  46|
+------+----+

+------+----+
|salary| age|
+------+----+
| 80000|null|
| 65000|  28|
| 60000|  25|
| 70000|  30|
| 62000|  30|
| 90000|  25|
| 70000|  28|
| 87000|  39|
| 58000|  27|
| 90000|  41|
| 75000|  33|
| 95000|  46|
+------+----+

+------+----+
|salary| age|
+------+----+
| 80000|null|
| 65000|  28|
| 60000|  25|
| 70000|  30|
| 62000|  30|
| 90000|  25|
| 70000|  28|
| 87000|  39|
| 58000|  27|
| 90000|  41|
| 75000|  33|
| 95000|  46|
+------+----+



In [0]:
from pyspark.sql import functions as f

In [0]:
df.select(
    df.salary,
    (df.salary + .05*df.salary).alias("salary_raise"),
    (f.year(f.current_timestamp())-f.year("dob")).alias("age")
).show()

+------+------------+----+
|salary|salary_raise| age|
+------+------------+----+
| 80000|     84000.0|null|
| 65000|     68250.0|  30|
| 60000|     63000.0|  27|
| 70000|     73500.0|  32|
| 62000|     65100.0|  32|
| 90000|     94500.0|  27|
| 70000|     73500.0|  30|
| 87000|     91350.0|  41|
| 58000|     60900.0|  29|
| 90000|     94500.0|  43|
| 75000|     78750.0|  35|
| 95000|     99750.0|  48|
+------+------------+----+



#WithColumn

In [0]:
df = df.withColumn(
    "salary_raised",
    df.salary + .05*df.salary
)

df = df.withColumn(
    "salary_raised_10_per",
    df.salary + .1*df.salary
)

#0r
'''
df = df.withColumn(
    "salary_raised",
    df.salary + .05*df.salary
).withColumn(
    "salary_raised_10_per",
    df.salary + .1*df.salary
)
'''
df.show()

+---+------+----------+----+------+----------+-------------+--------------------+
| id|  name|       dob| age|salary|department|salary_raised|salary_raised_10_per|
+---+------+----------+----+------+----------+-------------+--------------------+
|  3|   Bob|      null|null| 80000|        IT|      84000.0|             88000.0|
|  4| Emily|1994-11-22|  28| 65000|   Finance|      68250.0|             71500.0|
|  2| Alice|1997-02-28|  25| 60000|        HR|      63000.0|             66000.0|
|  1|  John|1992-05-12|  30| 70000|        IT|      73500.0|             77000.0|
| 10|Sophie|1992-06-30|  30| 62000|   Finance|      65100.0|             68200.0|
|  2| Alice|1997-02-28|  25| 90000|   Finance|      94500.0|             99000.0|
|  4| Emily|1994-11-22|  28| 70000|   Finance|      73500.0|             77000.0|
|  9| James|1983-10-14|  39| 87000|        IT|      91350.0|             95700.0|
|  8|  Lisa|1995-08-20|  27| 58000|        HR|      60900.0|             63800.0|
|  5| David|1981

#WithColumns

In [0]:
df = df.withColumns(
    {
        "salary_raised":df.salary + .05*df.salary,
        "age1":f.year(f.current_timestamp())-f.year("dob")
    }
)

df.show()

+---+------+----------+----+------+----------+-------------+--------------------+----+
| id|  name|       dob| age|salary|department|salary_raised|salary_raised_10_per|age1|
+---+------+----------+----+------+----------+-------------+--------------------+----+
|  3|   Bob|      null|null| 80000|        IT|      84000.0|             88000.0|null|
|  4| Emily|1994-11-22|  28| 65000|   Finance|      68250.0|             71500.0|  30|
|  2| Alice|1997-02-28|  25| 60000|        HR|      63000.0|             66000.0|  27|
|  1|  John|1992-05-12|  30| 70000|        IT|      73500.0|             77000.0|  32|
| 10|Sophie|1992-06-30|  30| 62000|   Finance|      65100.0|             68200.0|  32|
|  2| Alice|1997-02-28|  25| 90000|   Finance|      94500.0|             99000.0|  27|
|  4| Emily|1994-11-22|  28| 70000|   Finance|      73500.0|             77000.0|  30|
|  9| James|1983-10-14|  39| 87000|        IT|      91350.0|             95700.0|  41|
|  8|  Lisa|1995-08-20|  27| 58000|        

#Filter

In [0]:
df.filter(
    f.col("salary_raised") >= 75000
).show()

+---+-----+----------+----+------+----------+-------------+--------------------+----+
| id| name|       dob| age|salary|department|salary_raised|salary_raised_10_per|age1|
+---+-----+----------+----+------+----------+-------------+--------------------+----+
|  3|  Bob|      null|null| 80000|        IT|      84000.0|             88000.0|null|
|  2|Alice|1997-02-28|  25| 90000|   Finance|      94500.0|             99000.0|  27|
|  9|James|1983-10-14|  39| 87000|        IT|      91350.0|             95700.0|  41|
|  5|David|1981-12-18|  41| 90000|        HR|      94500.0|             99000.0|  43|
|  6|Susan|1989-07-05|  33| 75000|   Finance|      78750.0|             82500.0|  35|
|  7| Mike|1976-03-15|  46| 95000|        IT|      99750.0|            104500.0|  48|
+---+-----+----------+----+------+----------+-------------+--------------------+----+



In [0]:
#using where
df.where(
    f.col("salary_raised") >= 75000
).show()

+---+-----+----------+----+------+----------+-------------+--------------------+----+
| id| name|       dob| age|salary|department|salary_raised|salary_raised_10_per|age1|
+---+-----+----------+----+------+----------+-------------+--------------------+----+
|  3|  Bob|      null|null| 80000|        IT|      84000.0|             88000.0|null|
|  2|Alice|1997-02-28|  25| 90000|   Finance|      94500.0|             99000.0|  27|
|  9|James|1983-10-14|  39| 87000|        IT|      91350.0|             95700.0|  41|
|  5|David|1981-12-18|  41| 90000|        HR|      94500.0|             99000.0|  43|
|  6|Susan|1989-07-05|  33| 75000|   Finance|      78750.0|             82500.0|  35|
|  7| Mike|1976-03-15|  46| 95000|        IT|      99750.0|            104500.0|  48|
+---+-----+----------+----+------+----------+-------------+--------------------+----+



#When Otherwise

In [0]:
df = df.withColumn(
    "age_group",
    f.when(
        (f.col("age")<=20),
        "Upto 20"
    ).when(
        (f.col("age")>20) &
        (f.col("age")<=30),
        "21 to 30"
    ).when(
        (f.col("age")>30) &
        (f.col("age")<=40),
        "31 to 40"
    ).otherwise(
        "more than 40"
    )

)

df.show()

+---+------+----------+----+------+----------+-------------+--------------------+----+------------+
| id|  name|       dob| age|salary|department|salary_raised|salary_raised_10_per|age1|   age_group|
+---+------+----------+----+------+----------+-------------+--------------------+----+------------+
|  3|   Bob|      null|null| 80000|        IT|      84000.0|             88000.0|null|more than 40|
|  4| Emily|1994-11-22|  28| 65000|   Finance|      68250.0|             71500.0|  30|    21 to 30|
|  2| Alice|1997-02-28|  25| 60000|        HR|      63000.0|             66000.0|  27|    21 to 30|
|  1|  John|1992-05-12|  30| 70000|        IT|      73500.0|             77000.0|  32|    21 to 30|
| 10|Sophie|1992-06-30|  30| 62000|   Finance|      65100.0|             68200.0|  32|    21 to 30|
|  2| Alice|1997-02-28|  25| 90000|   Finance|      94500.0|             99000.0|  27|    21 to 30|
|  4| Emily|1994-11-22|  28| 70000|   Finance|      73500.0|             77000.0|  30|    21 to 30|


#Lit

In [0]:
df = df.withColumn(
    "age_group",
    f.when(
        (f.col("age")<=20),
        f.lit("Upto 20")
    ).when(
        (f.col("age")>20) &
        (f.col("age")<=30),
        f.lit("21 to 30")
    ).when(
        (f.col("age")>30) &
        (f.col("age")<=40),
        f.lit("31 to 40")
    ).otherwise(
        f.lit("more than 40")
    )

)

df.show()

+---+------+----------+----+------+----------+-------------+--------------------+----+------------+
| id|  name|       dob| age|salary|department|salary_raised|salary_raised_10_per|age1|   age_group|
+---+------+----------+----+------+----------+-------------+--------------------+----+------------+
|  3|   Bob|      null|null| 80000|        IT|      84000.0|             88000.0|null|more than 40|
|  4| Emily|1994-11-22|  28| 65000|   Finance|      68250.0|             71500.0|  30|    21 to 30|
|  2| Alice|1997-02-28|  25| 60000|        HR|      63000.0|             66000.0|  27|    21 to 30|
|  1|  John|1992-05-12|  30| 70000|        IT|      73500.0|             77000.0|  32|    21 to 30|
| 10|Sophie|1992-06-30|  30| 62000|   Finance|      65100.0|             68200.0|  32|    21 to 30|
|  2| Alice|1997-02-28|  25| 90000|   Finance|      94500.0|             99000.0|  27|    21 to 30|
|  4| Emily|1994-11-22|  28| 70000|   Finance|      73500.0|             77000.0|  30|    21 to 30|


#Nulls

In [0]:
df = df.withColumns(
    {
        "age":f.coalesce(
            f.year(f.current_timestamp())-f.year("dob"),
            f.lit(-1)
        ),
        "has_dob_1":~f.isnull("dob"),
        "has_dob_2":f.col("dob").isNotNull()
    }

)
df.show()

+---+------+----------+---+------+----------+-------------+--------------------+----+------------+---------+---------+
| id|  name|       dob|age|salary|department|salary_raised|salary_raised_10_per|age1|   age_group|has_dob_1|has_dob_2|
+---+------+----------+---+------+----------+-------------+--------------------+----+------------+---------+---------+
|  3|   Bob|      null| -1| 80000|        IT|      84000.0|             88000.0|null|more than 40|    false|    false|
|  4| Emily|1994-11-22| 30| 65000|   Finance|      68250.0|             71500.0|  30|    21 to 30|     true|     true|
|  2| Alice|1997-02-28| 27| 60000|        HR|      63000.0|             66000.0|  27|    21 to 30|     true|     true|
|  1|  John|1992-05-12| 32| 70000|        IT|      73500.0|             77000.0|  32|    21 to 30|     true|     true|
| 10|Sophie|1992-06-30| 32| 62000|   Finance|      65100.0|             68200.0|  32|    21 to 30|     true|     true|
|  2| Alice|1997-02-28| 27| 90000|   Finance|   

#DropColumns

In [0]:
df = df.drop("salary_raised_10_per","age1","has_dob_1","has_dob_2")
df.show()

+---+------+----------+---+------+----------+-------------+------------+
| id|  name|       dob|age|salary|department|salary_raised|   age_group|
+---+------+----------+---+------+----------+-------------+------------+
|  3|   Bob|      null| -1| 80000|        IT|      84000.0|more than 40|
|  4| Emily|1994-11-22| 30| 65000|   Finance|      68250.0|    21 to 30|
|  2| Alice|1997-02-28| 27| 60000|        HR|      63000.0|    21 to 30|
|  1|  John|1992-05-12| 32| 70000|        IT|      73500.0|    21 to 30|
| 10|Sophie|1992-06-30| 32| 62000|   Finance|      65100.0|    21 to 30|
|  2| Alice|1997-02-28| 27| 90000|   Finance|      94500.0|    21 to 30|
|  4| Emily|1994-11-22| 30| 70000|   Finance|      73500.0|    21 to 30|
|  9| James|1983-10-14| 41| 87000|        IT|      91350.0|    31 to 40|
|  8|  Lisa|1995-08-20| 29| 58000|        HR|      60900.0|    21 to 30|
|  5| David|1981-12-18| 43| 90000|        HR|      94500.0|more than 40|
|  6| Susan|1989-07-05| 35| 75000|   Finance|      

#WithColumnRenamed

In [0]:
df = df.withColumnRenamed("name","First_name")

In [0]:
df.show()

+---+----------+----------+---+------+----------+-------------+------------+
| id|First_name|       dob|age|salary|department|salary_raised|   age_group|
+---+----------+----------+---+------+----------+-------------+------------+
|  3|       Bob|      null| -1| 80000|        IT|      84000.0|more than 40|
|  4|     Emily|1994-11-22| 30| 65000|   Finance|      68250.0|    21 to 30|
|  2|     Alice|1997-02-28| 27| 60000|        HR|      63000.0|    21 to 30|
|  1|      John|1992-05-12| 32| 70000|        IT|      73500.0|    21 to 30|
| 10|    Sophie|1992-06-30| 32| 62000|   Finance|      65100.0|    21 to 30|
|  2|     Alice|1997-02-28| 27| 90000|   Finance|      94500.0|    21 to 30|
|  4|     Emily|1994-11-22| 30| 70000|   Finance|      73500.0|    21 to 30|
|  9|     James|1983-10-14| 41| 87000|        IT|      91350.0|    31 to 40|
|  8|      Lisa|1995-08-20| 29| 58000|        HR|      60900.0|    21 to 30|
|  5|     David|1981-12-18| 43| 90000|        HR|      94500.0|more than 40|