In [2]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .master("local[*]")
    .appName("transformation")
    .getOrCreate()
)

spark


In [2]:
# # define the schema

# from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType

# schema = StringType([
#     StructField("emp_id", IntegerType, True ),
#     StructField("empName", StringType, True),
#     StructField("empGender", StringType, True),
#     StructField("empSalary", FloatType, True),
#     StructField("empCountry",StringType, True)

# ])

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType

spark = (
    SparkSession.builder
    .master("local[*]")
    .appName("emp_schema_test")
    .getOrCreate()
)

emp_data = [
    (1, "John Doe", "Male", 70000.0, "USA"),
    (2, "Emily Clark", "Female", 72000.0, "Canada"),
    (3, "Michael Smith", "Male", 68000.0, "UK"),
    (4, "Sophia Lee", "Female", 75000.0, "Australia"),
    (5, "Daniel Kim", "Male", 69000.0, "India"),
    (5, "Daniel Kim", "Male", 69000.0, "India"),
    (6, "Olivia Brown", "Female", 71000.0, "USA"),
    (7, "James Wilson", "Male", 70500.0, "Canada"),
    (8, "Ava Johnson", "Female", 69500.0, "Germany"),
    (9, "Ethan Davis", "Male", 73000.0, "UK"),
    (10, "Isabella Garcia", "Female", 74000.0, "USA"),
    (9, "Ethan Davis", "Male", 73000.0, "UK"),
    (10, "Isabella Garcia", "Female", 74000.0, "USA"),
    (11, "Logan Martinez", "Male", 68000.0, "Mexico"),
    (12, "Mia Taylor", "Female", 72000.0, "France"),
    (13, "Lucas Anderson", "Male", 75000.0, "Ireland"),
    (14, "Charlotte Thomas", "Female", 73500.0, "New Zealand"),
    (15, "Jackson White", "Male", 70000.0, "Australia"),
    (14, "Charlotte Thomas", "Female", 73500.0, "New Zealand"),
    (15, "Jackson White", "Male", 70000.0, "Australia")
]

# Correct schema definition
emp_schema = StructType([
    StructField("empId", IntegerType(), True),
    StructField("empName", StringType(), True),
    StructField("empGender", StringType(), True),
    StructField("empSalary", FloatType(), True),
    StructField("empCountry", StringType(), True)
])

# Create DataFrame with schema
df1 = spark.createDataFrame(emp_data, schema=emp_schema)

df1.show()
df1.printSchema()
print("Row count:", df1.count())


                                                                                

+-----+----------------+---------+---------+-----------+
|empId|         empName|empGender|empSalary| empCountry|
+-----+----------------+---------+---------+-----------+
|    1|        John Doe|     Male|  70000.0|        USA|
|    2|     Emily Clark|   Female|  72000.0|     Canada|
|    3|   Michael Smith|     Male|  68000.0|         UK|
|    4|      Sophia Lee|   Female|  75000.0|  Australia|
|    5|      Daniel Kim|     Male|  69000.0|      India|
|    5|      Daniel Kim|     Male|  69000.0|      India|
|    6|    Olivia Brown|   Female|  71000.0|        USA|
|    7|    James Wilson|     Male|  70500.0|     Canada|
|    8|     Ava Johnson|   Female|  69500.0|    Germany|
|    9|     Ethan Davis|     Male|  73000.0|         UK|
|   10| Isabella Garcia|   Female|  74000.0|        USA|
|    9|     Ethan Davis|     Male|  73000.0|         UK|
|   10| Isabella Garcia|   Female|  74000.0|        USA|
|   11|  Logan Martinez|     Male|  68000.0|     Mexico|
|   12|      Mia Taylor|   Fema

In [2]:
# withColumn() : to add columns
# add two more columns
      #country ==> constant column ==> India
      #tax => derived column ==> 12%(salary)

from pyspark.sql.functions import lit
        
df2 = df1.withColumn("Origin", lit("India")).withColumn("tax",df1.empSalary*0.12)

df2.show(5)


+-----+-------------+---------+---------+----------+------+------+
|empId|      empName|empGender|empSalary|empCountry|Origin|   tax|
+-----+-------------+---------+---------+----------+------+------+
|    1|     John Doe|     Male|  70000.0|       USA| India|8400.0|
|    2|  Emily Clark|   Female|  72000.0|    Canada| India|8640.0|
|    3|Michael Smith|     Male|  68000.0|        UK| India|8160.0|
|    4|   Sophia Lee|   Female|  75000.0| Australia| India|9000.0|
|    5|   Daniel Kim|     Male|  69000.0|     India| India|8280.0|
+-----+-------------+---------+---------+----------+------+------+
only showing top 5 rows



In [3]:
# withColumnRenamed()  : to rename the columns
    #two columns => country, tax
    
df3 = df2.withColumnRenamed("Origin", "empOrigin") \
        .withColumnRenamed("tax", "empTax")

df3.show(5)
# df3.printSchema() #data type

+-----+-------------+---------+---------+----------+---------+------+
|empId|      empName|empGender|empSalary|empCountry|empOrigin|empTax|
+-----+-------------+---------+---------+----------+---------+------+
|    1|     John Doe|     Male|  70000.0|       USA|    India|8400.0|
|    2|  Emily Clark|   Female|  72000.0|    Canada|    India|8640.0|
|    3|Michael Smith|     Male|  68000.0|        UK|    India|8160.0|
|    4|   Sophia Lee|   Female|  75000.0| Australia|    India|9000.0|
|    5|   Daniel Kim|     Male|  69000.0|     India|    India|8280.0|
+-----+-------------+---------+---------+----------+---------+------+
only showing top 5 rows



In [4]:
# way to select the specific columns
from pyspark.sql.functions import col

# df3.select("empId", "empName", col("empGender")).show(5) #both are same

df3.select("empId", df3.empName, col("empGender")).show(5)


+-----+-------------+---------+
|empId|      empName|empGender|
+-----+-------------+---------+
|    1|     John Doe|     Male|
|    2|  Emily Clark|   Female|
|    3|Michael Smith|     Male|
|    4|   Sophia Lee|   Female|
|    5|   Daniel Kim|     Male|
+-----+-------------+---------+
only showing top 5 rows



In [5]:
df3.show(5)

+-----+-------------+---------+---------+----------+---------+------+
|empId|      empName|empGender|empSalary|empCountry|empOrigin|empTax|
+-----+-------------+---------+---------+----------+---------+------+
|    1|     John Doe|     Male|  70000.0|       USA|    India|8400.0|
|    2|  Emily Clark|   Female|  72000.0|    Canada|    India|8640.0|
|    3|Michael Smith|     Male|  68000.0|        UK|    India|8160.0|
|    4|   Sophia Lee|   Female|  75000.0| Australia|    India|9000.0|
|    5|   Daniel Kim|     Male|  69000.0|     India|    India|8280.0|
+-----+-------------+---------+---------+----------+---------+------+
only showing top 5 rows



In [6]:
# case conditions :

    #male ==> m
    #female ==> f

# df4 = df3.select("empId", 
#                  "empName", 
#                  ("empGender"), 
#                  "empSalary", 
#                  "empCountry", 
#                  "empOrigin", 
#                  "empTax")

# df4.show(5)

+-----+-------------+---------+---------+----------+---------+------+
|empId|      empName|empGender|empSalary|empCountry|empOrigin|empTax|
+-----+-------------+---------+---------+----------+---------+------+
|    1|     John Doe|     Male|  70000.0|       USA|    India|8400.0|
|    2|  Emily Clark|   Female|  72000.0|    Canada|    India|8640.0|
|    3|Michael Smith|     Male|  68000.0|        UK|    India|8160.0|
|    4|   Sophia Lee|   Female|  75000.0| Australia|    India|9000.0|
|    5|   Daniel Kim|     Male|  69000.0|     India|    India|8280.0|
+-----+-------------+---------+---------+----------+---------+------+
only showing top 5 rows



In [7]:
# case conditions :

    #male ==> m
    #female ==> f
    
from pyspark.sql.functions import when
    
df4 = df3.select("empId", 
                 "empName", 
                 when(df3.empGender == 'Male', 'M').otherwise('F').alias("empGender"), 
                 "empSalary", 
                 "empCountry", 
                 "empOrigin", 
                 "empTax")

df4.show(5)

+-----+-------------+---------+---------+----------+---------+------+
|empId|      empName|empGender|empSalary|empCountry|empOrigin|empTax|
+-----+-------------+---------+---------+----------+---------+------+
|    1|     John Doe|        M|  70000.0|       USA|    India|8400.0|
|    2|  Emily Clark|        F|  72000.0|    Canada|    India|8640.0|
|    3|Michael Smith|        M|  68000.0|        UK|    India|8160.0|
|    4|   Sophia Lee|        F|  75000.0| Australia|    India|9000.0|
|    5|   Daniel Kim|        M|  69000.0|     India|    India|8280.0|
+-----+-------------+---------+---------+----------+---------+------+
only showing top 5 rows



In [8]:
# orderBy() or sort() : to sort the data

# df4.orderBy(df4.empSalary.desc()).show(5)

# both the statement are same orderby or sort give you same ans

# use desc or asc order in both statement

df4.sort(df4.empSalary).show(5) 


+-----+--------------+---------+---------+----------+---------+------+
|empId|       empName|empGender|empSalary|empCountry|empOrigin|empTax|
+-----+--------------+---------+---------+----------+---------+------+
|   11|Logan Martinez|        M|  68000.0|    Mexico|    India|8160.0|
|    3| Michael Smith|        M|  68000.0|        UK|    India|8160.0|
|    5|    Daniel Kim|        M|  69000.0|     India|    India|8280.0|
|    5|    Daniel Kim|        M|  69000.0|     India|    India|8280.0|
|    8|   Ava Johnson|        F|  69500.0|   Germany|    India|8340.0|
+-----+--------------+---------+---------+----------+---------+------+
only showing top 5 rows



In [9]:
df4.show(5)

+-----+-------------+---------+---------+----------+---------+------+
|empId|      empName|empGender|empSalary|empCountry|empOrigin|empTax|
+-----+-------------+---------+---------+----------+---------+------+
|    1|     John Doe|        M|  70000.0|       USA|    India|8400.0|
|    2|  Emily Clark|        F|  72000.0|    Canada|    India|8640.0|
|    3|Michael Smith|        M|  68000.0|        UK|    India|8160.0|
|    4|   Sophia Lee|        F|  75000.0| Australia|    India|9000.0|
|    5|   Daniel Kim|        M|  69000.0|     India|    India|8280.0|
+-----+-------------+---------+---------+----------+---------+------+
only showing top 5 rows



In [10]:
# dropDuplicates() : to remove the duplicates

# df4.dropDuplicates().show()
df4.dropDuplicates().orderBy("empId").show()
df4.dropDuplicates().count()

                                                                                

+-----+----------------+---------+---------+-----------+---------+------+
|empId|         empName|empGender|empSalary| empCountry|empOrigin|empTax|
+-----+----------------+---------+---------+-----------+---------+------+
|    1|        John Doe|        M|  70000.0|        USA|    India|8400.0|
|    2|     Emily Clark|        F|  72000.0|     Canada|    India|8640.0|
|    3|   Michael Smith|        M|  68000.0|         UK|    India|8160.0|
|    4|      Sophia Lee|        F|  75000.0|  Australia|    India|9000.0|
|    5|      Daniel Kim|        M|  69000.0|      India|    India|8280.0|
|    6|    Olivia Brown|        F|  71000.0|        USA|    India|8520.0|
|    7|    James Wilson|        M|  70500.0|     Canada|    India|8460.0|
|    8|     Ava Johnson|        F|  69500.0|    Germany|    India|8340.0|
|    9|     Ethan Davis|        M|  73000.0|         UK|    India|8760.0|
|   10| Isabella Garcia|        F|  74000.0|        USA|    India|8880.0|
|   11|  Logan Martinez|        M|  68

15

In [11]:
df4.show()

+-----+----------------+---------+---------+-----------+---------+------+
|empId|         empName|empGender|empSalary| empCountry|empOrigin|empTax|
+-----+----------------+---------+---------+-----------+---------+------+
|    1|        John Doe|        M|  70000.0|        USA|    India|8400.0|
|    2|     Emily Clark|        F|  72000.0|     Canada|    India|8640.0|
|    3|   Michael Smith|        M|  68000.0|         UK|    India|8160.0|
|    4|      Sophia Lee|        F|  75000.0|  Australia|    India|9000.0|
|    5|      Daniel Kim|        M|  69000.0|      India|    India|8280.0|
|    5|      Daniel Kim|        M|  69000.0|      India|    India|8280.0|
|    6|    Olivia Brown|        F|  71000.0|        USA|    India|8520.0|
|    7|    James Wilson|        M|  70500.0|     Canada|    India|8460.0|
|    8|     Ava Johnson|        F|  69500.0|    Germany|    India|8340.0|
|    9|     Ethan Davis|        M|  73000.0|         UK|    India|8760.0|
|   10| Isabella Garcia|        F|  74

In [12]:
# distinct() : to display the unique records

# df4.distinct().show()

df4.distinct().orderBy("empId").show()

+-----+----------------+---------+---------+-----------+---------+------+
|empId|         empName|empGender|empSalary| empCountry|empOrigin|empTax|
+-----+----------------+---------+---------+-----------+---------+------+
|    1|        John Doe|        M|  70000.0|        USA|    India|8400.0|
|    2|     Emily Clark|        F|  72000.0|     Canada|    India|8640.0|
|    3|   Michael Smith|        M|  68000.0|         UK|    India|8160.0|
|    4|      Sophia Lee|        F|  75000.0|  Australia|    India|9000.0|
|    5|      Daniel Kim|        M|  69000.0|      India|    India|8280.0|
|    6|    Olivia Brown|        F|  71000.0|        USA|    India|8520.0|
|    7|    James Wilson|        M|  70500.0|     Canada|    India|8460.0|
|    8|     Ava Johnson|        F|  69500.0|    Germany|    India|8340.0|
|    9|     Ethan Davis|        M|  73000.0|         UK|    India|8760.0|
|   10| Isabella Garcia|        F|  74000.0|        USA|    India|8880.0|
|   11|  Logan Martinez|        M|  68

In [13]:
# where() or filter() : to filter the data 

df4.where((df4.empSalary > 55000) & (df4.empGender=='F') & (df4.empName.like("A%"))).show()

+-----+-----------+---------+---------+----------+---------+------+
|empId|    empName|empGender|empSalary|empCountry|empOrigin|empTax|
+-----+-----------+---------+---------+----------+---------+------+
|    8|Ava Johnson|        F|  69500.0|   Germany|    India|8340.0|
+-----+-----------+---------+---------+----------+---------+------+



In [14]:
df4.where(
          (df4.empSalary > 55000) & 
          (df4.empGender=='F') |
          (df4.empName.like("A%"))) \
          .orderBy("empSalary") \
          .show()

+-----+----------------+---------+---------+-----------+---------+------+
|empId|         empName|empGender|empSalary| empCountry|empOrigin|empTax|
+-----+----------------+---------+---------+-----------+---------+------+
|    8|     Ava Johnson|        F|  69500.0|    Germany|    India|8340.0|
|    6|    Olivia Brown|        F|  71000.0|        USA|    India|8520.0|
|    2|     Emily Clark|        F|  72000.0|     Canada|    India|8640.0|
|   12|      Mia Taylor|        F|  72000.0|     France|    India|8640.0|
|   14|Charlotte Thomas|        F|  73500.0|New Zealand|    India|8820.0|
|   14|Charlotte Thomas|        F|  73500.0|New Zealand|    India|8820.0|
|   10| Isabella Garcia|        F|  74000.0|        USA|    India|8880.0|
|   10| Isabella Garcia|        F|  74000.0|        USA|    India|8880.0|
|    4|      Sophia Lee|        F|  75000.0|  Australia|    India|9000.0|
+-----+----------------+---------+---------+-----------+---------+------+



In [16]:
df4.where((df4.empSalary > 55000) & (df4.empGender=='F') | (df4.empName.like("A%"))).orderBy("empSalary").show()

+-----+----------------+---------+---------+-----------+---------+------+
|empId|         empName|empGender|empSalary| empCountry|empOrigin|empTax|
+-----+----------------+---------+---------+-----------+---------+------+
|    8|     Ava Johnson|        F|  69500.0|    Germany|    India|8340.0|
|    6|    Olivia Brown|        F|  71000.0|        USA|    India|8520.0|
|    2|     Emily Clark|        F|  72000.0|     Canada|    India|8640.0|
|   12|      Mia Taylor|        F|  72000.0|     France|    India|8640.0|
|   14|Charlotte Thomas|        F|  73500.0|New Zealand|    India|8820.0|
|   14|Charlotte Thomas|        F|  73500.0|New Zealand|    India|8820.0|
|   10| Isabella Garcia|        F|  74000.0|        USA|    India|8880.0|
|   10| Isabella Garcia|        F|  74000.0|        USA|    India|8880.0|
|    4|      Sophia Lee|        F|  75000.0|  Australia|    India|9000.0|
+-----+----------------+---------+---------+-----------+---------+------+



In [17]:
data = [
    (1, "Aarav", "M", 75000, "Engineering"),
    (2, "Diya", "F", 68000, "Marketing"),
    (3, "Kabir", "M", 72000, "Finance"),
    (4, "Meera", "F", 81000, "Engineering"),
    (5, "Rohan", "M", 60000, "HR"),
    (6, "Isha", "F", 90000, "Sales"),
    (7, "Arjun", "M", 78000, "Engineering"),
    (8, "Anaya", "F", 67000, "Marketing"),
    (9, "Vivaan", "M", 71000, "Finance"),
    (10, "Saanvi", "F", 85000, "Sales"),
    (11, "Yuvan", "M", 62000, "HR"),
    (12, "Kiara", "F", 76000, "Engineering")
    ]

# Define schema using instances of types
schema = StructType([
    StructField("empId", IntegerType(), True),
    StructField("empName", StringType(), True),
    StructField("empGender", StringType(), True),
    StructField("empSalary", IntegerType(), True),
    StructField("empDepartment", StringType(), True),
])


df = spark.createDataFrame(data, schema)
df.show()
df.printSchema()


+-----+-------+---------+---------+-------------+
|empId|empName|empGender|empSalary|empDepartment|
+-----+-------+---------+---------+-------------+
|    1|  Aarav|        M|    75000|  Engineering|
|    2|   Diya|        F|    68000|    Marketing|
|    3|  Kabir|        M|    72000|      Finance|
|    4|  Meera|        F|    81000|  Engineering|
|    5|  Rohan|        M|    60000|           HR|
|    6|   Isha|        F|    90000|        Sales|
|    7|  Arjun|        M|    78000|  Engineering|
|    8|  Anaya|        F|    67000|    Marketing|
|    9| Vivaan|        M|    71000|      Finance|
|   10| Saanvi|        F|    85000|        Sales|
|   11|  Yuvan|        M|    62000|           HR|
|   12|  Kiara|        F|    76000|  Engineering|
+-----+-------+---------+---------+-------------+

root
 |-- empId: integer (nullable = true)
 |-- empName: string (nullable = true)
 |-- empGender: string (nullable = true)
 |-- empSalary: integer (nullable = true)
 |-- empDepartment: string (nullab

In [18]:
# count() : to get the no. of records

df.count()

12

In [24]:
# aggregate function : count, max, min, sum, avg

from pyspark.sql.functions import max, min, sum, avg,count

# df.groupby("empDepartment").agg(count("*")).show()

# using alias 

df.agg(count("*").alias("empCount")).show()
df.agg(max("empSalary").alias("maxSalary")).show()
df.agg(min("empSalary").alias("minSalary")).show()
df.agg(avg("empSalary").alias("avgSalary")).show()
df.agg(sum("empSalary").alias("sumSalary")).show()

# df.groupby("empDepartment").agg(count("*").alias("empCount")).show()

# df.groupby("empDepartment").agg(count("*").alias("empCount")).agg(max("empSalary").show()

+--------+
|empCount|
+--------+
|      12|
+--------+

+---------+
|maxSalary|
+---------+
|    90000|
+---------+

+---------+
|minSalary|
+---------+
|    60000|
+---------+

+---------+
|avgSalary|
+---------+
|  73750.0|
+---------+

+---------+
|sumSalary|
+---------+
|   885000|
+---------+



In [None]:
# df.groupby("empDepartment").agg(count("*").alias("empCount"))
#                                           .alias

In [25]:
# find the emp count for each department

df.groupby("empDepartment").agg(count("*").alias("empCount"), \
                            max("empSalary").alias("maxSalary"), \
                            min("empSalary").alias("minSalary"), \
                            avg("empSalary").alias("avgSalary"), \
                            sum("empSalary").alias("sumSalary")).show()

+-------------+--------+---------+---------+---------+---------+
|empDepartment|empCount|maxSalary|minSalary|avgSalary|sumSalary|
+-------------+--------+---------+---------+---------+---------+
|        Sales|       2|    90000|    85000|  87500.0|   175000|
|  Engineering|       4|    81000|    75000|  77500.0|   310000|
|           HR|       2|    62000|    60000|  61000.0|   122000|
|      Finance|       2|    72000|    71000|  71500.0|   143000|
|    Marketing|       2|    68000|    67000|  67500.0|   135000|
+-------------+--------+---------+---------+---------+---------+



In [26]:
# union / union all : to merge the data

# condition :
    # schema should be same 
    # sequence should be same

data1 = [(1, 'Anil',27),
        (2, 'sandeep', 20), #jan
        (3, 'riya', 29)]
schema1 = ['id', 'name', 'age']

data2 = [(3, 'riya', 20),
        (4, 'rani', 26)]   #feb  
schema2 = ['id', 'name', 'age']

data3 = [(5, 'liya',29),
        (6, 'mani', 26)]  #march
schema3 = ['id', 'name', 'age']

df1 = spark.createDataFrame(data1, schema1)
df2 = spark.createDataFrame(data2, schema2)
df3 = spark.createDataFrame(data3, schema3)


df1.show()
df2.show()
df3.show()


+---+-------+---+
| id|   name|age|
+---+-------+---+
|  1|   Anil| 27|
|  2|sandeep| 20|
|  3|   riya| 29|
+---+-------+---+

+---+----+---+
| id|name|age|
+---+----+---+
|  3|riya| 20|
|  4|rani| 26|
+---+----+---+

+---+----+---+
| id|name|age|
+---+----+---+
|  5|liya| 29|
|  6|mani| 26|
+---+----+---+



In [29]:
df_union = df1.union(df2).union(df3)
# df_union.show()  #table show the duplicate value also use union all
df_union.dropDuplicates().sort("id").show()

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  1|   Anil| 27|
|  2|sandeep| 20|
|  3|   riya| 20|
|  3|   riya| 29|
|  4|   rani| 26|
|  5|   liya| 29|
|  6|   mani| 26|
+---+-------+---+



In [43]:
# joins : inner : left : right, full, cross


emp_data = [
    (1,  "Aarav",    1,   70000,  1),
    (2,  "Vivaan",   2,   72000,  2),
    (3,  "Reyansh",  3,   68000,  2),
    (2,  "Aadhya",   2,   75000,  2),
    (3,  "Ishaan",   3,   71000,  3),
    (4,  "Diya",     4,   73000,  4),
    (5,  "Anaya",    None,   76000,  None),
    (6,  "Arjun",    4,   69000,  3),
    (7,  "Myra",     2,   74000,  1),
    (8, "Yuvan",   None,   67000, 4),
    (9, "Kiara",    2,   71000,  3),
    (7, "Shaurya",  3,   70000,  None),
    (8, "Meera",    1,   72000,  4),
    (10, "Rohan",    1,   73000,  None),
    (10, "Saanvi",   1,   76000,  4)
]

emp_schema = ["empId", "empName", "deptId", "empSalary", "cityId"]


dept_data = [
    (1, "HR"),
    (2, "IT"),
    (3, "Sales"),
    (4, "Finance")
]

dept_schema = ["deptId", "deptName"]

address_data = [
    (1, "hyd"),
    (2, "blr"),
    (3, "chn"),
    (4, "kkt")
]

add_schema = ["cityId", "cityName"]

print("emp_df :")
emp_df = spark.createDataFrame(emp_data,emp_schema)
emp_df.show()


print("dept_df :")
emp_df = spark.createDataFrame(dept_data,dept_schema)
emp_df.show()


print("address_df :")
emp_df = spark.createDataFrame(address_data,add_schema)
address_df.show()




# print("emp_df :")
# emp_df = spark.createDataFrame(emp_data,emp_schema)
# emp_df.show()



emp_df :
+-----+-------+------+---------+------+
|empId|empName|deptId|empSalary|cityId|
+-----+-------+------+---------+------+
|    1|  Aarav|     1|    70000|     1|
|    2| Vivaan|     2|    72000|     2|
|    3|Reyansh|     3|    68000|     2|
|    2| Aadhya|     2|    75000|     2|
|    3| Ishaan|     3|    71000|     3|
|    4|   Diya|     4|    73000|     4|
|    5|  Anaya|  null|    76000|  null|
|    6|  Arjun|     4|    69000|     3|
|    7|   Myra|     2|    74000|     1|
|    8|  Yuvan|  null|    67000|     4|
|    9|  Kiara|     2|    71000|     3|
|    7|Shaurya|     3|    70000|  null|
|    8|  Meera|     1|    72000|     4|
|   10|  Rohan|     1|    73000|  null|
|   10| Saanvi|     1|    76000|     4|
+-----+-------+------+---------+------+

dept_df :
+------+--------+
|deptId|deptName|
+------+--------+
|     1|      HR|
|     2|      IT|
|     3|   Sales|
|     4| Finance|
+------+--------+

address_df :


NameError: name 'address_df' is not defined

In [44]:
emp_data = [
    (1,  "Aarav",    1,   70000,  1),
    (2,  "Vivaan",   2,   72000,  2),
    (3,  "Reyansh",  3,   68000,  2),
    (2,  "Aadhya",   2,   75000,  2),  # Duplicate empId 2
    (3,  "Ishaan",   3,   71000,  3),  # Duplicate empId 3
    (4,  "Diya",     4,   73000,  4),
    (5,  "Anaya",    None, 76000,  None),
    (6,  "Arjun",    4,   69000,  3),
    (7,  "Myra",     2,   74000,  1),
    (8,  "Yuvan",    None, 67000,  4),
    (9,  "Kiara",    2,   71000,  3),
    (7,  "Shaurya",  3,   70000,  None),  # Duplicate empId 7
    (8,  "Meera",    1,   72000,  4),     # Duplicate empId 8
    (10, "Rohan",    1,   73000,  None),  # Duplicate empId 10
    (10, "Saanvi",   1,   76000,  4)      # Duplicate empId 10
]
emp_schema = ["empId", "empName", "deptId", "empSalary", "cityId"]

dept_data = [
    (1, "HR"),
    (2, "IT"),
    (3, "Sales"),
    (4, "Finance")
]
dept_schema = ["deptId", "deptName"]

address_data = [
    (1, "hyd"),
    (2, "blr"),
    (3, "chn"),
    (4, "kkt")
]
add_schema = ["cityId", "cityName"]

# Create DataFrames with CORRECT variable names
print("emp_df:")
emp_df = spark.createDataFrame(emp_data, emp_schema)
emp_df.show()

print("dept_df:")
dept_df = spark.createDataFrame(dept_data, dept_schema)  # Fixed variable name
dept_df.show()

print("address_df:")
address_df = spark.createDataFrame(address_data, add_schema)  # Fixed variable name
address_df.show()

emp_df:
+-----+-------+------+---------+------+
|empId|empName|deptId|empSalary|cityId|
+-----+-------+------+---------+------+
|    1|  Aarav|     1|    70000|     1|
|    2| Vivaan|     2|    72000|     2|
|    3|Reyansh|     3|    68000|     2|
|    2| Aadhya|     2|    75000|     2|
|    3| Ishaan|     3|    71000|     3|
|    4|   Diya|     4|    73000|     4|
|    5|  Anaya|  null|    76000|  null|
|    6|  Arjun|     4|    69000|     3|
|    7|   Myra|     2|    74000|     1|
|    8|  Yuvan|  null|    67000|     4|
|    9|  Kiara|     2|    71000|     3|
|    7|Shaurya|     3|    70000|  null|
|    8|  Meera|     1|    72000|     4|
|   10|  Rohan|     1|    73000|  null|
|   10| Saanvi|     1|    76000|     4|
+-----+-------+------+---------+------+

dept_df:
+------+--------+
|deptId|deptName|
+------+--------+
|     1|      HR|
|     2|      IT|
|     3|   Sales|
|     4| Finance|
+------+--------+

address_df:
+------+--------+
|cityId|cityName|
+------+--------+
|     1|   

In [61]:
# You can drop the deptId & cityId column
inner_df = emp_df.join(dept_df, emp_df.deptId == dept_df.deptId, "inner") \
                 .join(address_df, emp_df.cityId == address_df.cityId, "inner") \
                 .drop(dept_df.deptId) \ #you can drop the deptId & cityId column
                 .drop(address_df.cityId) \
                 .dropDuplicates() # Uncomment to drop duplicates

inner_df.show()

+-----+-------+------+---------+------+--------+--------+
|empId|empName|deptId|empSalary|cityId|deptName|cityName|
+-----+-------+------+---------+------+--------+--------+
|    7|   Myra|     2|    74000|     1|      IT|     hyd|
|    1|  Aarav|     1|    70000|     1|      HR|     hyd|
|    6|  Arjun|     4|    69000|     3| Finance|     chn|
|    9|  Kiara|     2|    71000|     3|      IT|     chn|
|    3| Ishaan|     3|    71000|     3|   Sales|     chn|
|    2| Aadhya|     2|    75000|     2|      IT|     blr|
|    2| Vivaan|     2|    72000|     2|      IT|     blr|
|    3|Reyansh|     3|    68000|     2|   Sales|     blr|
|    4|   Diya|     4|    73000|     4| Finance|     kkt|
|   10| Saanvi|     1|    76000|     4|      HR|     kkt|
|    8|  Meera|     1|    72000|     4|      HR|     kkt|
+-----+-------+------+---------+------+--------+--------+



In [63]:
# left_df = emp_df.join(dept_df, emp_df.deptId == dept_df.deptId, "left") \
#                  .join(address_df, emp_df.cityId == address_df.cityId,"left") \
#                  .drop(emp_df.deptId) \ #you can drop the deptId & cityId column
#                  .drop(emp_df.cityId) \
#                  .dropDuplicates() \  #drop the duplicate value
#                  .dropna()
# #                  .fillna("unknown") #to fill null value
# # left_df.show()
# left_df.orderBy("empId").show()

SyntaxError: unexpected character after line continuation character (1535683249.py, line 3)

In [66]:
left_df = (
    emp_df
    .join(dept_df, emp_df.deptId == dept_df.deptId, "left")
    .join(address_df, emp_df.cityId == address_df.cityId, "left")
    .drop(emp_df.deptId) #you can drop the deptId & cityId column
    .drop(emp_df.cityId)
    .dropDuplicates()
    # .dropna()  # Alternative: drop rows with nulls
    .fillna("unknown")  # Fill null values with "unknown"
)
# left_df.show()
left_df.orderBy("empId").show()

+-----+-------+---------+------+--------+------+--------+
|empId|empName|empSalary|deptId|deptName|cityId|cityName|
+-----+-------+---------+------+--------+------+--------+
|    1|  Aarav|    70000|     1|      HR|     1|     hyd|
|    2| Aadhya|    75000|     2|      IT|     2|     blr|
|    2| Vivaan|    72000|     2|      IT|     2|     blr|
|    3|Reyansh|    68000|     3|   Sales|     2|     blr|
|    3| Ishaan|    71000|     3|   Sales|     3|     chn|
|    4|   Diya|    73000|     4| Finance|     4|     kkt|
|    5|  Anaya|    76000|  null| unknown|  null| unknown|
|    6|  Arjun|    69000|     4| Finance|     3|     chn|
|    7|   Myra|    74000|     2|      IT|     1|     hyd|
|    7|Shaurya|    70000|     3|   Sales|  null| unknown|
|    8|  Meera|    72000|     1|      HR|     4|     kkt|
|    8|  Yuvan|    67000|  null| unknown|     4|     kkt|
|    9|  Kiara|    71000|     2|      IT|     3|     chn|
|   10| Saanvi|    76000|     1|      HR|     4|     kkt|
|   10|  Rohan

In [69]:
# to handel null values using fillna()

left_df.fillna(0, "deptId").show() # fill the null value as zero

# left_df.fillna(0, "deptId").fillna("missing", "deptName").show() # fillna

+-----+-------+---------+------+--------+------+--------+
|empId|empName|empSalary|deptId|deptName|cityId|cityName|
+-----+-------+---------+------+--------+------+--------+
|    7|Shaurya|    70000|     3|   Sales|  null| unknown|
|    5|  Anaya|    76000|     0| unknown|  null| unknown|
|    1|  Aarav|    70000|     1|      HR|     1|     hyd|
|    3|Reyansh|    68000|     3|   Sales|     2|     blr|
|    6|  Arjun|    69000|     4| Finance|     3|     chn|
|    4|   Diya|    73000|     4| Finance|     4|     kkt|
|    8|  Meera|    72000|     1|      HR|     4|     kkt|
|    7|   Myra|    74000|     2|      IT|     1|     hyd|
|    9|  Kiara|    71000|     2|      IT|     3|     chn|
|    2| Vivaan|    72000|     2|      IT|     2|     blr|
|    2| Aadhya|    75000|     2|      IT|     2|     blr|
|    3| Ishaan|    71000|     3|   Sales|     3|     chn|
|   10| Saanvi|    76000|     1|      HR|     4|     kkt|
|    8|  Yuvan|    67000|     0| unknown|     4|     kkt|
|   10|  Rohan

In [72]:
# sample data

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType

# spark = SparkSession.builder.appName("NestedSchemaExample").getOrCreate()

# Data
data = [
    (1, 'john', 'male', [
        (1, "projectA", 6),
        (2, "projectB", 8)
    ]),
    (2, 'alex', 'male', [
        (10, "projectC", 6),
        (20, "projectD", 8)
    ])
]

# Schema
schema = StructType([
    StructField("Id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("projectDetails", ArrayType(
        StructType([
            StructField("projectId", IntegerType(), True),
            StructField("projectName", StringType(), True),
            StructField("projectDuration", IntegerType(), True)
        ])
    ), True)
])

# Create DataFrame
df = spark.createDataFrame(data, schema)

# Show DataFrame
df.show(1, truncate=False)
# df.show()
# df.printSchema()


+---+----+------+------------------------------------+
|Id |name|gender|projectDetails                      |
+---+----+------+------------------------------------+
|1  |john|male  |[{1, projectA, 6}, {2, projectB, 8}]|
+---+----+------+------------------------------------+
only showing top 1 row

