/FileStore/tables/dataset/Employee.csv

### Import Libraries

In [0]:
# Core PySpark imports
from pyspark.sql import SparkSession
from pyspark.sql import functions as F  # For all Spark functions (col, lit, regexp_replace, etc.)
from pyspark.sql import types as T      # For data types (StructType, StringType, IntegerType, etc.)

# Optional: Window functions for advanced analytics
from pyspark.sql.window import Window

### Reading CSV File

In [0]:
#create dataframe by reading the csv file
df = spark.read.format('csv').option('inferSchema', 'true').option('header', 'true').load('/FileStore/tables/dataset/Employee.csv')
display(df.head(10))


Education,JoiningYear,City,PaymentTier,Age,Gender,EverBenched,ExperienceInCurrentDomain,LeaveOrNot
Bachelors,2017,Bangalore,3,34,Male,No,0,0
Bachelors,2013,Pune,1,28,Female,No,3,1
Bachelors,2014,New Delhi,3,38,Female,No,2,0
Masters,2016,Bangalore,3,27,Male,No,5,1
Masters,2017,Pune,3,24,Male,Yes,2,1
Bachelors,2016,Bangalore,3,22,Male,No,0,0
Bachelors,2015,New Delhi,3,38,Male,No,0,0
Bachelors,2016,Bangalore,3,34,Female,No,2,1
Bachelors,2016,Pune,3,23,Male,No,1,0
Masters,2017,New Delhi,2,37,Male,No,2,0


In [0]:
df = spark.read.format('csv').option('inferSchema', 'true').option('header', 'true').load('/FileStore/tables/dataset/Employee.csv')
df.show(5)

+---------+-----------+---------+-----------+---+------+-----------+-------------------------+----------+
|Education|JoiningYear|     City|PaymentTier|Age|Gender|EverBenched|ExperienceInCurrentDomain|LeaveOrNot|
+---------+-----------+---------+-----------+---+------+-----------+-------------------------+----------+
|Bachelors|       2017|Bangalore|          3| 34|  Male|         No|                        0|         0|
|Bachelors|       2013|     Pune|          1| 28|Female|         No|                        3|         1|
|Bachelors|       2014|New Delhi|          3| 38|Female|         No|                        2|         0|
|  Masters|       2016|Bangalore|          3| 27|  Male|         No|                        5|         1|
|  Masters|       2017|     Pune|          3| 24|  Male|        Yes|                        2|         1|
+---------+-----------+---------+-----------+---+------+-----------+-------------------------+----------+
only showing top 5 rows



### Spark SQL

In [0]:
#Convert dataframe into table
df.createGlobalTempView("employee_services")

In [0]:
%sql
select * from global_temp.employee_services

Education,JoiningYear,City,PaymentTier,Age,Gender,EverBenched,ExperienceInCurrentDomain,LeaveOrNot
Bachelors,2017,Bangalore,3,34,Male,No,0,0
Bachelors,2013,Pune,1,28,Female,No,3,1
Bachelors,2014,New Delhi,3,38,Female,No,2,0
Masters,2016,Bangalore,3,27,Male,No,5,1
Masters,2017,Pune,3,24,Male,Yes,2,1
Bachelors,2016,Bangalore,3,22,Male,No,0,0
Bachelors,2015,New Delhi,3,38,Male,No,0,0
Bachelors,2016,Bangalore,3,34,Female,No,2,1
Bachelors,2016,Pune,3,23,Male,No,1,0
Masters,2017,New Delhi,2,37,Male,No,2,0


### Read JSON File

In [0]:
#Read JSON File
df_json = spark.read.format('json').option('inferSchema',True)\
    .option('header',True)\
        .option('multiline',False)\
            .load('/FileStore/tables/drivers.json')

In [0]:
display(df_json)

code,dob,driverId,driverRef,name,nationality,number,url
HAM,1985-01-07,1,hamilton,"List(Lewis, Hamilton)",British,44,http://en.wikipedia.org/wiki/Lewis_Hamilton
HEI,1977-05-10,2,heidfeld,"List(Nick, Heidfeld)",German,\N,http://en.wikipedia.org/wiki/Nick_Heidfeld
ROS,1985-06-27,3,rosberg,"List(Nico, Rosberg)",German,6,http://en.wikipedia.org/wiki/Nico_Rosberg
ALO,1981-07-29,4,alonso,"List(Fernando, Alonso)",Spanish,14,http://en.wikipedia.org/wiki/Fernando_Alonso
KOV,1981-10-19,5,kovalainen,"List(Heikki, Kovalainen)",Finnish,\N,http://en.wikipedia.org/wiki/Heikki_Kovalainen
NAK,1985-01-11,6,nakajima,"List(Kazuki, Nakajima)",Japanese,\N,http://en.wikipedia.org/wiki/Kazuki_Nakajima
BOU,1979-02-28,7,bourdais,"List(Sébastien, Bourdais)",French,\N,http://en.wikipedia.org/wiki/S%C3%A9bastien_Bourdais
RAI,1979-10-17,8,raikkonen,"List(Kimi, Räikkönen)",Finnish,7,http://en.wikipedia.org/wiki/Kimi_R%C3%A4ikk%C3%B6nen
KUB,1984-12-07,9,kubica,"List(Robert, Kubica)",Polish,88,http://en.wikipedia.org/wiki/Robert_Kubica
GLO,1982-03-18,10,glock,"List(Timo, Glock)",German,\N,http://en.wikipedia.org/wiki/Timo_Glock


##Schema Definition

In PySpark/Spark, explicit schema definitions (using StructType or DDL strings) are essential for several reasons, especially in distributed, large-scale data processing. Here's why they’re required or highly recommended:

1. Schema Inference Overhead
Problem: If you don’t define a schema, Spark infers it automatically by scanning the data (e.g., CSV/JSON). For large datasets, this is slow and resource-intensive.

Solution: Explicit schemas skip this step, improving performance.

2. Data Consistency & Validation
Problem: Inferred schemas can be incorrect (e.g., misinterpreting "25" as INT when it should be STRING).

Solution: Explicit schemas enforce data types and nullability rules, catching errors early (e.g., invalid dates or non-integer values).

3. Handling Semi-Structured Data
Problem: Data sources like CSV/JSON often lack strict typing (e.g., mixed data types in a column).

Solution: A defined schema ensures consistent structure across partitions/nodes in a distributed system.

4. Cross-Environment Consistency
Problem: Schema inference can vary across environments (e.g., dev vs. prod) if data samples differ.

Solution: Explicit schemas ensure the same structure everywhere, avoiding surprises.

When Are Explicit Schemas Required?
No Header in CSV: If your CSV lacks headers, Spark can’t infer column names.

Mixed Data Types: Columns with ambiguous types (e.g., numbers stored as strings).

Strict Data Contracts: When downstream systems (e.g., databases) expect a specific schema.

In [0]:
df.printSchema()

root
 |-- Education: string (nullable = true)
 |-- JoiningYear: integer (nullable = true)
 |-- City: string (nullable = true)
 |-- PaymentTier: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- EverBenched: string (nullable = true)
 |-- ExperienceInCurrentDomain: integer (nullable = true)
 |-- LeaveOrNot: integer (nullable = true)



###DDL Schema


In PySpark, you can define a schema for a DataFrame using DDL-formatted strings (similar to SQL DDL). This is useful when reading data from unstructured sources (e.g., CSV, JSON) to enforce structure upfront.

DDL (Data Definition Language): Defines the structure 

DDL Strings:

Pros: Concise, SQL-like syntax, easy to write for simple schemas.

Cons: Limited to basic data types (e.g., no support for nested types like arrays/maps).

Example:

python
ddl_schema = "Age INT, Name STRING"

Here in df, we are going to change one data type and see.
Going to change gender as string and see how it works

In [0]:
my_ddl_schema = ''' Education STRING,
    JoiningYear INT,
    City STRING,
    PaymentTier INT,
    Age INT,
    Gender STRING,
    EverBenched STRING,
    ExperienceInCurrentDomain INT,
    LeaveOrNot INT '''

In [0]:
df_schema = spark.read \
    .format("csv") \
    .option("header", True) \
    .schema(my_ddl_schema) \
    .load("/FileStore/tables/dataset/Employee.csv")

# Show the first 5 rows
df_schema.display(5)

Education,JoiningYear,City,PaymentTier,Age,Gender,EverBenched,ExperienceInCurrentDomain,LeaveOrNot
Bachelors,2017,Bangalore,3,34,Male,No,0,0
Bachelors,2013,Pune,1,28,Female,No,3,1
Bachelors,2014,New Delhi,3,38,Female,No,2,0
Masters,2016,Bangalore,3,27,Male,No,5,1
Masters,2017,Pune,3,24,Male,Yes,2,1
Bachelors,2016,Bangalore,3,22,Male,No,0,0
Bachelors,2015,New Delhi,3,38,Male,No,0,0
Bachelors,2016,Bangalore,3,34,Female,No,2,1
Bachelors,2016,Pune,3,23,Male,No,1,0
Masters,2017,New Delhi,2,37,Male,No,2,0


### Using StructType

StructType:

Pros: Full programmatic control, supports complex types (arrays, structs, maps), and precise nullability rules.

Example:

python

from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema = StructType([

    StructField("Age", IntegerType(), nullable=True),

    StructField("Name", StringType(), nullable=False)
])

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType


custom_schema = StructType([
    StructField("Education", StringType(), nullable=True),
    StructField("JoiningYear", IntegerType(), nullable=True),
    StructField("City", StringType(), nullable=True),
    StructField("PaymentTier", IntegerType(), nullable=True),
    StructField("Age", IntegerType(), nullable=True),
    StructField("Gender", StringType(), nullable=True),
    StructField("EverBenched", StringType(), nullable=True),
    StructField("ExperienceInCurrentDomain", IntegerType(), nullable=True),
    StructField("LeaveOrNot", IntegerType(), nullable=True)
])

In [0]:
df_struct_schema = spark.read \
    .format("csv") \
    .option("header", True) \
    .schema(custom_schema) \
    .load("/FileStore/tables/dataset/Employee.csv")

# Show the first 5 rows
df_struct_schema.display(5)

Education,JoiningYear,City,PaymentTier,Age,Gender,EverBenched,ExperienceInCurrentDomain,LeaveOrNot
Bachelors,2017,Bangalore,3,34,Male,No,0,0
Bachelors,2013,Pune,1,28,Female,No,3,1
Bachelors,2014,New Delhi,3,38,Female,No,2,0
Masters,2016,Bangalore,3,27,Male,No,5,1
Masters,2017,Pune,3,24,Male,Yes,2,1
Bachelors,2016,Bangalore,3,22,Male,No,0,0
Bachelors,2015,New Delhi,3,38,Male,No,0,0
Bachelors,2016,Bangalore,3,34,Female,No,2,1
Bachelors,2016,Pune,3,23,Male,No,1,0
Masters,2017,New Delhi,2,37,Male,No,2,0


### Data Transformations with PySpark

### 1. Select

The select transformation in PySpark is used to project columns from a DataFrame, allowing you to:

1. Choose Specific Columns: Extract columns by name or expression

Example:

df.select("name", "age")  # Select columns as strings

df.select(col("name"), col("age") + 10)  # Use expressions

In [0]:
# Just displaying the dataset I want to use
df.display(10)

Education,JoiningYear,City,PaymentTier,Age,Gender,EverBenched,ExperienceInCurrentDomain,LeaveOrNot
Bachelors,2017,Bangalore,3,34,Male,No,0,0
Bachelors,2013,Pune,1,28,Female,No,3,1
Bachelors,2014,New Delhi,3,38,Female,No,2,0
Masters,2016,Bangalore,3,27,Male,No,5,1
Masters,2017,Pune,3,24,Male,Yes,2,1
Bachelors,2016,Bangalore,3,22,Male,No,0,0
Bachelors,2015,New Delhi,3,38,Male,No,0,0
Bachelors,2016,Bangalore,3,34,Female,No,2,1
Bachelors,2016,Pune,3,23,Male,No,1,0
Masters,2017,New Delhi,2,37,Male,No,2,0


In [0]:
# To select the first three columns in the dataset (employee.csv)
# 1st Way
select1 = df.select('Education','JoiningYear','City')
select1.display(10)

Education,JoiningYear,City
Bachelors,2017,Bangalore
Bachelors,2013,Pune
Bachelors,2014,New Delhi
Masters,2016,Bangalore
Masters,2017,Pune
Bachelors,2016,Bangalore
Bachelors,2015,New Delhi
Bachelors,2016,Bangalore
Bachelors,2016,Pune
Masters,2017,New Delhi


In [0]:
# To select the first three columns in the dataset (employee.csv)
# 2nd Way
from pyspark.sql.functions import col 
select2 = df.select(col('Education'),col('JoiningYear'),col('City'))
select2.display(5)

Education,JoiningYear,City
Bachelors,2017,Bangalore
Bachelors,2013,Pune
Bachelors,2014,New Delhi
Masters,2016,Bangalore
Masters,2017,Pune
Bachelors,2016,Bangalore
Bachelors,2015,New Delhi
Bachelors,2016,Bangalore
Bachelors,2016,Pune
Masters,2017,New Delhi


### 2. Alias
Rename Columns: Alias columns for clarity

In [0]:
df.select(col('JoiningYear').alias('YearofJoining')).display()

YearofJoining
2017
2013
2014
2016
2017
2016
2015
2016
2016
2017


#### 3. withColumn()
Create New Columns: Compute new fields using existing columns

Example: df.select("name", (col("salary") * 0.3).alias("tax"))

In [0]:
# To create a new column
from pyspark.sql.functions import lit

select1.withColumn('newColumn',lit('hello')).show(5)

+---------+-----------+---------+---------+
|Education|JoiningYear|     City|newColumn|
+---------+-----------+---------+---------+
|Bachelors|       2017|Bangalore|    hello|
|Bachelors|       2013|     Pune|    hello|
|Bachelors|       2014|New Delhi|    hello|
|  Masters|       2016|Bangalore|    hello|
|  Masters|       2017|     Pune|    hello|
+---------+-----------+---------+---------+
only showing top 5 rows



In [0]:
# transformations - use the existing columns and create a new one
# some example showing concatenation
from pyspark.sql.functions import concat_ws

select1.withColumn(
    "CityEducation",
    concat_ws("-", col("City"), col("Education"))
).show(5)


+---------+-----------+---------+-------------------+
|Education|JoiningYear|     City|      CityEducation|
+---------+-----------+---------+-------------------+
|Bachelors|       2017|Bangalore|Bangalore-Bachelors|
|Bachelors|       2013|     Pune|     Pune-Bachelors|
|Bachelors|       2014|New Delhi|New Delhi-Bachelors|
|  Masters|       2016|Bangalore|  Bangalore-Masters|
|  Masters|       2017|     Pune|       Pune-Masters|
+---------+-----------+---------+-------------------+
only showing top 5 rows



In [0]:
# using the function regexp_replace()

from pyspark.sql.functions import regexp_replace

#df.show(5)
select1.withColumn("City",regexp_replace(col('City'),"Bangalore","BA")).show(5)

+---------+-----------+---------+
|Education|JoiningYear|     City|
+---------+-----------+---------+
|Bachelors|       2017|       BA|
|Bachelors|       2013|     Pune|
|Bachelors|       2014|New Delhi|
|  Masters|       2016|       BA|
|  Masters|       2017|     Pune|
+---------+-----------+---------+
only showing top 5 rows



In [0]:
#Advanced example

from pyspark.sql.functions import when

df = df.withColumn(
    "AgeGroup",
    when(col("age") < 30, "Young")
     .when((col("age") >= 30) & (col("age") <= 50), "Mid")
     .otherwise("Senior")
).display()

Education,JoiningYear,City,PaymentTier,Age,Gender,EverBenched,ExperienceInCurrentDomain,LeaveOrNot,AgeGroup
Bachelors,2017,Bangalore,3,34,Male,No,0,0,Mid
Bachelors,2013,Pune,1,28,Female,No,3,1,Young
Bachelors,2014,New Delhi,3,38,Female,No,2,0,Mid
Masters,2016,Bangalore,3,27,Male,No,5,1,Young
Masters,2017,Pune,3,24,Male,Yes,2,1,Young
Bachelors,2016,Bangalore,3,22,Male,No,0,0,Young
Bachelors,2015,New Delhi,3,38,Male,No,0,0,Mid
Bachelors,2016,Bangalore,3,34,Female,No,2,1,Mid
Bachelors,2016,Pune,3,23,Male,No,1,0,Young
Masters,2017,New Delhi,2,37,Male,No,2,0,Mid


### 4. Filter/Where

Certainly! filter() and where() are transformations in PySpark used to select rows from a DataFrame based on a condition. They are identical (aliases of each other) and can be used interchangeably. 

filter() vs where(): They are aliases—use whichever you prefer.

Column References:

- col ('col_name')
- df ['col_name']
- Use SQL-like string expressions (e.g., "Age > 30").

Combining Conditions:
- & for AND
- | for OR
- ~ for NOT

Null Handling: Use col("column").isNull() or isNotNull()

In [0]:
#df dataframe is overwritten, so using select1

select1.display(5)

Education,JoiningYear,City
Bachelors,2017,Bangalore
Bachelors,2013,Pune
Bachelors,2014,New Delhi
Masters,2016,Bangalore
Masters,2017,Pune
Bachelors,2016,Bangalore
Bachelors,2015,New Delhi
Bachelors,2016,Bangalore
Bachelors,2016,Pune
Masters,2017,New Delhi


In [0]:
# 1. Filter employees in a specific city (e.g., "Bangalore")
from pyspark.sql.functions import col

#select1.filter(col('City') == 'Bangalore').display()
select1.filter(col('City') == 'Bangalore').count()

Out[53]: 2228

In [0]:
#Using where()

select1.where(select1['City'] == 'Bangalore').show(5)


+---------+-----------+---------+
|Education|JoiningYear|     City|
+---------+-----------+---------+
|Bachelors|       2017|Bangalore|
|  Masters|       2016|Bangalore|
|Bachelors|       2016|Bangalore|
|Bachelors|       2016|Bangalore|
|  Masters|       2012|Bangalore|
+---------+-----------+---------+
only showing top 5 rows



In [0]:
# Alternative (string expression)

select1.filter("City='Bangalore'").show(5)


+---------+-----------+---------+
|Education|JoiningYear|     City|
+---------+-----------+---------+
|Bachelors|       2017|Bangalore|
|  Masters|       2016|Bangalore|
|Bachelors|       2016|Bangalore|
|Bachelors|       2016|Bangalore|
|  Masters|       2012|Bangalore|
+---------+-----------+---------+
only showing top 5 rows



In [0]:
# Employees with a Payment Tier of 3
select1.filter(col('PaymentTier') == 3).show(5)

+---------+-----------+---------+
|Education|JoiningYear|     City|
+---------+-----------+---------+
|Bachelors|       2017|Bangalore|
|Bachelors|       2014|New Delhi|
|  Masters|       2016|Bangalore|
|  Masters|       2017|     Pune|
|Bachelors|       2016|Bangalore|
+---------+-----------+---------+
only showing top 5 rows



In [0]:
# Employees aged 30 or older
select1.where(col('Age') >= 30).show(5)

+---------+-----------+---------+
|Education|JoiningYear|     City|
+---------+-----------+---------+
|Bachelors|       2017|Bangalore|
|Bachelors|       2014|New Delhi|
|Bachelors|       2015|New Delhi|
|Bachelors|       2016|Bangalore|
|  Masters|       2017|New Delhi|
+---------+-----------+---------+
only showing top 5 rows



In [0]:
# Employees who have been benched (EverBenched = "Yes") and left (LeaveOrNot = 1)

select1.filter((col('Everbenched') == 'Yes') & (col('LeaveOrNot') == 1)).show(5)

+---------+-----------+---------+
|Education|JoiningYear|     City|
+---------+-----------+---------+
|  Masters|       2017|     Pune|
|Bachelors|       2018|     Pune|
|Bachelors|       2015|Bangalore|
|Bachelors|       2015|     Pune|
|Bachelors|       2015|     Pune|
+---------+-----------+---------+
only showing top 5 rows



### 5. Null Handling

In [0]:
select1.filter(select1["City"].isNull()).show()

+---------+-----------+----+
|Education|JoiningYear|City|
+---------+-----------+----+
+---------+-----------+----+



In [0]:
select1.filter(select1["City"].isNotNull()).show()

+---------+-----------+---------+
|Education|JoiningYear|     City|
+---------+-----------+---------+
|Bachelors|       2017|Bangalore|
|Bachelors|       2013|     Pune|
|Bachelors|       2014|New Delhi|
|  Masters|       2016|Bangalore|
|  Masters|       2017|     Pune|
|Bachelors|       2016|Bangalore|
|Bachelors|       2015|New Delhi|
|Bachelors|       2016|Bangalore|
|Bachelors|       2016|     Pune|
|  Masters|       2017|New Delhi|
|  Masters|       2012|Bangalore|
|Bachelors|       2016|     Pune|
|Bachelors|       2018|     Pune|
|Bachelors|       2016|Bangalore|
|Bachelors|       2012|Bangalore|
|Bachelors|       2017|Bangalore|
|Bachelors|       2014|Bangalore|
|Bachelors|       2014|     Pune|
|Bachelors|       2015|     Pune|
|Bachelors|       2016|New Delhi|
+---------+-----------+---------+
only showing top 20 rows



### 6. isin()

In [0]:
cities = ["Bangalore", "Mumbai", "Pune"]
select1.filter(select1["City"].isin(cities)).show(5)

+---------+-----------+---------+
|Education|JoiningYear|     City|
+---------+-----------+---------+
|Bachelors|       2017|Bangalore|
|Bachelors|       2013|     Pune|
|  Masters|       2016|Bangalore|
|  Masters|       2017|     Pune|
|Bachelors|       2016|Bangalore|
+---------+-----------+---------+
only showing top 5 rows



### 7. withColumnRenamed

Alias does change the name in the current execution, but this function renames in the dataframe itself.

In [0]:
select1.withColumnRenamed('JoiningYear','YearOfJoining').show(5)

+---------+-------------+---------+
|Education|YearOfJoining|     City|
+---------+-------------+---------+
|Bachelors|         2017|Bangalore|
|Bachelors|         2013|     Pune|
|Bachelors|         2014|New Delhi|
|  Masters|         2016|Bangalore|
|  Masters|         2017|     Pune|
+---------+-------------+---------+
only showing top 5 rows



### 8. Typecasting

In [0]:
#Cast PaymentTier (String to Integer)
df = spark.read.format('csv').option('inferSchema', 'true').option('header', 'true').load('/FileStore/tables/dataset/Employee.csv')
display(df.head(10))

Education,JoiningYear,City,PaymentTier,Age,Gender,EverBenched,ExperienceInCurrentDomain,LeaveOrNot
Bachelors,2017,Bangalore,3,34,Male,No,0,0
Bachelors,2013,Pune,1,28,Female,No,3,1
Bachelors,2014,New Delhi,3,38,Female,No,2,0
Masters,2016,Bangalore,3,27,Male,No,5,1
Masters,2017,Pune,3,24,Male,Yes,2,1
Bachelors,2016,Bangalore,3,22,Male,No,0,0
Bachelors,2015,New Delhi,3,38,Male,No,0,0
Bachelors,2016,Bangalore,3,34,Female,No,2,1
Bachelors,2016,Pune,3,23,Male,No,1,0
Masters,2017,New Delhi,2,37,Male,No,2,0


In [0]:
cast = df.withColumn("Age", col("Age").cast("double"))
cast.show(5)

+---------+-----------+---------+-----------+----+------+-----------+-------------------------+----------+
|Education|JoiningYear|     City|PaymentTier| Age|Gender|EverBenched|ExperienceInCurrentDomain|LeaveOrNot|
+---------+-----------+---------+-----------+----+------+-----------+-------------------------+----------+
|Bachelors|       2017|Bangalore|          3|34.0|  Male|         No|                        0|         0|
|Bachelors|       2013|     Pune|          1|28.0|Female|         No|                        3|         1|
|Bachelors|       2014|New Delhi|          3|38.0|Female|         No|                        2|         0|
|  Masters|       2016|Bangalore|          3|27.0|  Male|         No|                        5|         1|
|  Masters|       2017|     Pune|          3|24.0|  Male|        Yes|                        2|         1|
+---------+-----------+---------+-----------+----+------+-----------+-------------------------+----------+
only showing top 5 rows



In [0]:
cast.printSchema()

root
 |-- Education: string (nullable = true)
 |-- JoiningYear: integer (nullable = true)
 |-- City: string (nullable = true)
 |-- PaymentTier: integer (nullable = true)
 |-- Age: double (nullable = true)
 |-- Gender: string (nullable = true)
 |-- EverBenched: string (nullable = true)
 |-- ExperienceInCurrentDomain: integer (nullable = true)
 |-- LeaveOrNot: integer (nullable = true)



### 9. Sort/OrderBy

Sorts the DataFrame based on one or more columns.


In [0]:
#Sort by Age (ascending)
sorted_df = df.sort("Age")
sorted_df.show(5)

+---------+-----------+---------+-----------+---+------+-----------+-------------------------+----------+
|Education|JoiningYear|     City|PaymentTier|Age|Gender|EverBenched|ExperienceInCurrentDomain|LeaveOrNot|
+---------+-----------+---------+-----------+---+------+-----------+-------------------------+----------+
|Bachelors|       2014|     Pune|          1| 22|Female|         No|                        0|         1|
|Bachelors|       2012|New Delhi|          3| 22|Female|         No|                        0|         0|
|Bachelors|       2015|New Delhi|          3| 22|  Male|         No|                        0|         0|
|Bachelors|       2016|Bangalore|          3| 22|  Male|         No|                        0|         0|
|Bachelors|       2017|     Pune|          2| 22|  Male|         No|                        0|         0|
+---------+-----------+---------+-----------+---+------+-----------+-------------------------+----------+
only showing top 5 rows



In [0]:
# Equivalent orderBy:
sorted_df_orderby = df.orderBy(F.asc("Age"))
sorted_df_orderby.show(5)

+---------+-----------+---------+-----------+---+------+-----------+-------------------------+----------+
|Education|JoiningYear|     City|PaymentTier|Age|Gender|EverBenched|ExperienceInCurrentDomain|LeaveOrNot|
+---------+-----------+---------+-----------+---+------+-----------+-------------------------+----------+
|Bachelors|       2014|     Pune|          1| 22|Female|         No|                        0|         1|
|Bachelors|       2012|New Delhi|          3| 22|Female|         No|                        0|         0|
|Bachelors|       2015|New Delhi|          3| 22|  Male|         No|                        0|         0|
|Bachelors|       2016|Bangalore|          3| 22|  Male|         No|                        0|         0|
|Bachelors|       2017|     Pune|          2| 22|  Male|         No|                        0|         0|
+---------+-----------+---------+-----------+---+------+-----------+-------------------------+----------+
only showing top 5 rows



In [0]:
#Sort by JoiningYear (descending) and PaymentTier (ascending)
sorted_df1 = df.orderBy(
    F.desc("JoiningYear"),  # Most recent joiners first
    F.asc("PaymentTier")     # Lower tiers first for same year
).show(5)

+---------+-----------+---------+-----------+---+------+-----------+-------------------------+----------+
|Education|JoiningYear|     City|PaymentTier|Age|Gender|EverBenched|ExperienceInCurrentDomain|LeaveOrNot|
+---------+-----------+---------+-----------+---+------+-----------+-------------------------+----------+
|  Masters|       2018|New Delhi|          1| 24|  Male|        Yes|                        2|         1|
|Bachelors|       2018|Bangalore|          1| 27|  Male|         No|                        5|         0|
|Bachelors|       2018|Bangalore|          1| 25|  Male|        Yes|                        3|         0|
|  Masters|       2018|New Delhi|          1| 26|Female|         No|                        4|         1|
|Bachelors|       2018|     Pune|          1| 26|  Male|        Yes|                        4|         0|
+---------+-----------+---------+-----------+---+------+-----------+-------------------------+----------+
only showing top 5 rows



### 10. Limit

In [0]:
limited_df = df.limit(5)
limited_df.display()

Education,JoiningYear,City,PaymentTier,Age,Gender,EverBenched,ExperienceInCurrentDomain,LeaveOrNot
Bachelors,2017,Bangalore,3,34,Male,No,0,0
Bachelors,2013,Pune,1,28,Female,No,3,1
Bachelors,2014,New Delhi,3,38,Female,No,2,0
Masters,2016,Bangalore,3,27,Male,No,5,1
Masters,2017,Pune,3,24,Male,Yes,2,1


### 11. Drop

In [0]:
df_clean = df.drop("EverBenched")
df_clean.show(5)

+---------+-----------+---------+-----------+---+------+-------------------------+----------+
|Education|JoiningYear|     City|PaymentTier|Age|Gender|ExperienceInCurrentDomain|LeaveOrNot|
+---------+-----------+---------+-----------+---+------+-------------------------+----------+
|Bachelors|       2017|Bangalore|          3| 34|  Male|                        0|         0|
|Bachelors|       2013|     Pune|          1| 28|Female|                        3|         1|
|Bachelors|       2014|New Delhi|          3| 38|Female|                        2|         0|
|  Masters|       2016|Bangalore|          3| 27|  Male|                        5|         1|
|  Masters|       2017|     Pune|          3| 24|  Male|                        2|         1|
+---------+-----------+---------+-----------+---+------+-------------------------+----------+
only showing top 5 rows



In [0]:
df_clean.printSchema()

root
 |-- Education: string (nullable = true)
 |-- JoiningYear: integer (nullable = true)
 |-- City: string (nullable = true)
 |-- PaymentTier: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- ExperienceInCurrentDomain: integer (nullable = true)
 |-- LeaveOrNot: integer (nullable = true)



### 12. DropDuplicates
Removes duplicate rows based on all or a subset of columns.


In [0]:
#Remove all duplicate rows (all columns)
unique_df = df.dropDuplicates()
unique_df.show(5)

+---------+-----------+---------+-----------+---+------+-----------+-------------------------+----------+
|Education|JoiningYear|     City|PaymentTier|Age|Gender|EverBenched|ExperienceInCurrentDomain|LeaveOrNot|
+---------+-----------+---------+-----------+---+------+-----------+-------------------------+----------+
|Bachelors|       2014|New Delhi|          3| 38|Female|         No|                        2|         0|
|Bachelors|       2016|Bangalore|          3| 22|  Male|         No|                        0|         0|
|Bachelors|       2017|Bangalore|          3| 34|  Male|         No|                        0|         0|
|  Masters|       2016|Bangalore|          3| 27|  Male|         No|                        5|         1|
|Bachelors|       2013|     Pune|          1| 28|Female|         No|                        3|         1|
+---------+-----------+---------+-----------+---+------+-----------+-------------------------+----------+
only showing top 5 rows



### 13. Union and ByUnion

union() combines DataFrames vertically but requires identical column order and data types.
Columns are matched by position, not by name.

unionByName() combines DataFrames by matching column names, ignoring column order and handling missing columns.

In [0]:
from pyspark.sql import Row

# DataFrame 1
#data1 = [
   # Row(Name="Alice", Age=30, City="New York"),
  #  Row(Name="Bob", Age=25, City="London")
#]

#Simple way
data1 = [("Alice",30,"New York"), ("Bob",25,"London")]

df1 = spark.createDataFrame(data1)
df1.printSchema()

root
 |-- _1: string (nullable = true)
 |-- _2: long (nullable = true)
 |-- _3: string (nullable = true)



In [0]:
# DataFrame 2 (columns in different order + an extra column)
#data2 = [
    #Row(City="Paris", Name="Charlie", Age=35, Salary=50000),
    #Row(City="Tokyo", Name="Diana", Age=28, Salary=60000)
#]
#Simple way
data2 = [("Paris","Charlie",30,50000), ("Tokyo","Diana",25,60000)]

df2 = spark.createDataFrame(data2)
df2.printSchema()

root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: long (nullable = true)
 |-- _4: long (nullable = true)



In [0]:
df1.union(df2).display()
#This gives an error because union() combines DataFrames vertically but requires identical column order and data types. Columns are matched by position, not by name.

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-2237084779659956>:1[0m
[0;32m----> 1[0m [43mdf1[49m[38;5;241;43m.[39;49m[43munion[49m[43m([49m[43mdf2[49m[43m)[49m[38;5;241m.[39mdisplay()

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:48[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     47[0m [38;5;28;01mtry[39;00m:
[0;32m---> 48[0m     res [38;5;241m=[39m [43mfunc[49m[43m([49m[38;5;241;43m*[39;49m[43margs[49m[43m,[49m[43m [49m[38;5;241;43m*[39;49m[38;5;241;43m*[39;49m[43mkwargs[49m[43m)[49m
[1;32m     49[0m     logger[38;5;241m.[39mlog_success(
[1;32m     50[0m         module_name, class_name, function_name, time[38;5;241m.[39mperf_counter() [38;5;241m

In [0]:
#Way 2

# DataFrame 1
data11 = [
    Row(Name="Alice", Age=30, City="New York"),
    Row(Name="Bob", Age=25, City="London")
]

df11 = spark.createDataFrame(data11)
df11.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: long (nullable = true)
 |-- City: string (nullable = true)



In [0]:
# DataFrame 2 (columns in different order + an extra column)
data22 = [
    Row(City="Paris", Name="Charlie", Age=35, Salary=50000),
    Row(City="Tokyo", Name="Diana", Age=28, Salary=60000)
]

df22 = spark.createDataFrame(data22)
df22.printSchema()

root
 |-- City: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Age: long (nullable = true)
 |-- Salary: long (nullable = true)



In [0]:
df22_aligned = df22.drop("Salary").select("Name", "Age", "City")
df_union_success = df11.union(df22_aligned)
df_union_success.show()


+-------+---+--------+
|   Name|Age|    City|
+-------+---+--------+
|  Alice| 30|New York|
|    Bob| 25|  London|
|Charlie| 35|   Paris|
|  Diana| 28|   Tokyo|
+-------+---+--------+



In [0]:
# UnionByName will align columns by name and fill missing columns with null
df_union_by_name = df11.unionByName(df22, allowMissingColumns=True)

# Show the result
df_union_by_name.show()

+-------+---+--------+------+
|   Name|Age|    City|Salary|
+-------+---+--------+------+
|  Alice| 30|New York|  null|
|    Bob| 25|  London|  null|
|Charlie| 35|   Paris| 50000|
|  Diana| 28|   Tokyo| 60000|
+-------+---+--------+------+



### 14. String Functions

lower() / upper(),

substring(),

split(),

concat() / concat_ws(),

trim() / ltrim() / rtrim(), - Remove white spaces

regexp_replace(),

length(),

lpad() / rpad(),

initcap(),

when() + String Conditions


In [0]:
df.show(5)

+---------+-----------+---------+-----------+---+------+-----------+-------------------------+----------+
|Education|JoiningYear|     City|PaymentTier|Age|Gender|EverBenched|ExperienceInCurrentDomain|LeaveOrNot|
+---------+-----------+---------+-----------+---+------+-----------+-------------------------+----------+
|Bachelors|       2017|Bangalore|          3| 34|  Male|         No|                        0|         0|
|Bachelors|       2013|     Pune|          1| 28|Female|         No|                        3|         1|
|Bachelors|       2014|New Delhi|          3| 38|Female|         No|                        2|         0|
|  Masters|       2016|Bangalore|          3| 27|  Male|         No|                        5|         1|
|  Masters|       2017|     Pune|          3| 24|  Male|        Yes|                        2|         1|
+---------+-----------+---------+-----------+---+------+-----------+-------------------------+----------+
only showing top 5 rows



In [0]:
str1 = df.select(F.initcap('City')).show(5)

+-------------+
|initcap(City)|
+-------------+
|    Bangalore|
|         Pune|
|    New Delhi|
|    Bangalore|
|         Pune|
+-------------+
only showing top 5 rows



In [0]:
str2 = df.select(F.upper('City')).show(5)

+-----------+
|upper(City)|
+-----------+
|  BANGALORE|
|       PUNE|
|  NEW DELHI|
|  BANGALORE|
|       PUNE|
+-----------+
only showing top 5 rows



In [0]:
str1 = df.select(F.lower('City')).show(5)

+-----------+
|lower(City)|
+-----------+
|  bangalore|
|       pune|
|  new delhi|
|  bangalore|
|       pune|
+-----------+
only showing top 5 rows



In [0]:
str3 = df.select(F.substring('Education',1,3)).show(5)

+--------------------------+
|substring(Education, 1, 3)|
+--------------------------+
|                       Bac|
|                       Bac|
|                       Bac|
|                       Mas|
|                       Mas|
+--------------------------+
only showing top 5 rows



In [0]:
str4 = df.select(F.split('City'," ")).show(5)

+------------------+
|split(City,  , -1)|
+------------------+
|       [Bangalore]|
|            [Pune]|
|      [New, Delhi]|
|       [Bangalore]|
|            [Pune]|
+------------------+
only showing top 5 rows



In [0]:
#Concatenate columns with or without a separator.

str5 = df.select(F.concat(F.col('Education'),F.lit("-"),F.col("JoiningYear"))).show(5)

str5 = df.select(F.concat_ws("|", F.col('PaymentTier'),F.col("EverBenched"))).show(5)


+---------------------------------+
|concat(Education, -, JoiningYear)|
+---------------------------------+
|                   Bachelors-2017|
|                   Bachelors-2013|
|                   Bachelors-2014|
|                     Masters-2016|
|                     Masters-2017|
+---------------------------------+
only showing top 5 rows

+--------------------------------------+
|concat_ws(|, PaymentTier, EverBenched)|
+--------------------------------------+
|                                  3|No|
|                                  1|No|
|                                  3|No|
|                                  3|No|
|                                 3|Yes|
+--------------------------------------+
only showing top 5 rows



In [0]:
str6 = df.select(F.length('City')).show(5)

+------------+
|length(City)|
+------------+
|           9|
|           4|
|           9|
|           9|
|           4|
+------------+
only showing top 5 rows



### 15. Split and Indexing

Split a string column into an array and access elements by index:



In [0]:
# Sample data with full names
data = [
    ("John Doe",),
    ("Alice Smith",),
    ("Bob Johnson",),
    ("Charlie Brown",)
]

# Create DataFrame
df5 = spark.createDataFrame(data, ["Name"])
df5.show()

+-------------+
|         Name|
+-------------+
|     John Doe|
|  Alice Smith|
|  Bob Johnson|
|Charlie Brown|
+-------------+



In [0]:
# Split "Name" into an array
df5 = df5.withColumn("NameArray", F.split(F.col("Name"), " "))

# Extract first and last names
df5 = df5.withColumn("FirstName", F.col("NameArray").getItem(0)) \
       .withColumn("LastName", F.col("NameArray").getItem(1))

# Show final result
df5.select("Name", "FirstName", "LastName").show(truncate=False)

+-------------+---------+--------+
|Name         |FirstName|LastName|
+-------------+---------+--------+
|John Doe     |John     |Doe     |
|Alice Smith  |Alice    |Smith   |
|Bob Johnson  |Bob      |Johnson |
|Charlie Brown|Charlie  |Brown   |
+-------------+---------+--------+



### 17. Joins

In [0]:
employees_data = [
    (1, "Alice", 101),
    (2, "Bob", 102),
    (3, "Charlie", 103),  # Department 103 doesn't exist in departments_df
    (4, "David", None)     # No department assigned
]
employees_df = spark.createDataFrame(
    employees_data, ["emp_id", "emp_name", "dept_id"]
)

employees_df.display()

emp_id,emp_name,dept_id
1,Alice,101.0
2,Bob,102.0
3,Charlie,103.0
4,David,


In [0]:
departments_data = [
    (101, "HR"),
    (102, "Engineering"),
    (104, "Marketing")  # Department 104 has no employees
]
departments_df = spark.createDataFrame(
    departments_data, ["dept_id", "dept_name"]
)
departments_df.display()

dept_id,dept_name
101,HR
102,Engineering
104,Marketing


#### Inner Join

Returns rows where the join key (dept_id) exists in both DataFrames.


In [0]:
inner_join_df = employees_df.join(
    departments_df, "dept_id", "inner"
)

inner_join_df.display()

dept_id,emp_id,emp_name,dept_name
101,1,Alice,HR
102,2,Bob,Engineering


#### Outer Join (Full Outer)

Returns all rows from both DataFrames, filling null for missing matches.

In [0]:
outer_join_df = employees_df.join(
    departments_df, "dept_id", "outer"
)
outer_join_df.display()

dept_id,emp_id,emp_name,dept_name
,4.0,David,
101.0,1.0,Alice,HR
102.0,2.0,Bob,Engineering
103.0,3.0,Charlie,
104.0,,,Marketing


#### Left Join

Returns all rows from the left DataFrame (employees_df), with matching rows from the right (departments_df).

In [0]:
left_join_df = employees_df.join(
    departments_df, "dept_id", "left"
)

left_join_df.display()

dept_id,emp_id,emp_name,dept_name
101.0,1,Alice,HR
102.0,2,Bob,Engineering
103.0,3,Charlie,
,4,David,


#### Right Join

Returns all rows from the right DataFrame (departments_df), with matching rows from the left (employees_df).

In [0]:
right_join_df = employees_df.join(
    departments_df, "dept_id", "right"
)

right_join_df.display()

dept_id,emp_id,emp_name,dept_name
101,1.0,Alice,HR
102,2.0,Bob,Engineering
104,,,Marketing


#### Left Semi Join

Returns rows from the left DataFrame (employees_df) that have a match in the right DataFrame.
(Like an INNER JOIN, but only includes columns from the left DataFrame.)

In [0]:
left_semi_df = employees_df.join(
    departments_df, "dept_id", "left_semi"
)

left_semi_df.display()

dept_id,emp_id,emp_name
101,1,Alice
102,2,Bob


### Left Anti Join

Returns rows from the left DataFrame (employees_df) that do NOT have a match in the right DataFrame.

In [0]:
left_anti_df = employees_df.join(
    departments_df, "dept_id", "left_anti"
)

left_anti_df.display()

dept_id,emp_id,emp_name
103.0,3,Charlie
,4,David


### 18. User Defined Functions

In [0]:
from pyspark.sql.types import StringType

# Create UDF
def to_upper(s):
    return s.upper() if s else None

upper_udf = F.udf(to_upper, StringType())

# Apply UDF
df = df.withColumn("NameUpper", upper_udf(F.col("Name")))
df.display()

Name,NameUpper
John Doe,JOHN DOE
Alice Smith,ALICE SMITH
Bob Johnson,BOB JOHNSON
Charlie Brown,CHARLIE BROWN


### 19. Data Writing

In [0]:
#Paraqet

df.write.format('parquet')\
.mode('overwrite')\
.option('path','/FileStore/tables/CSV/data.csv')\
.save()

In [0]:
df = spark.read.format('csv').option('inferSchema', 'true').option('header', 'true').load('/FileStore/tables/dataset/Employee.csv')
display(df.head(10))

Education,JoiningYear,City,PaymentTier,Age,Gender,EverBenched,ExperienceInCurrentDomain,LeaveOrNot
Bachelors,2017,Bangalore,3,34,Male,No,0,0
Bachelors,2013,Pune,1,28,Female,No,3,1
Bachelors,2014,New Delhi,3,38,Female,No,2,0
Masters,2016,Bangalore,3,27,Male,No,5,1
Masters,2017,Pune,3,24,Male,Yes,2,1
Bachelors,2016,Bangalore,3,22,Male,No,0,0
Bachelors,2015,New Delhi,3,38,Male,No,0,0
Bachelors,2016,Bangalore,3,34,Female,No,2,1
Bachelors,2016,Pune,3,23,Male,No,1,0
Masters,2017,New Delhi,2,37,Male,No,2,0


In [0]:
# Write to Parquet (replace output path)
parquet_path = "/FileStore/tables/employee_parquet"
df.write.mode("overwrite").parquet(parquet_path)

print(f"Parquet file saved to: {parquet_path}")

Parquet file saved to: /FileStore/tables/employee_parquet


In [0]:
# List files in the output directory
display(dbutils.fs.ls("/FileStore/tables/employee_parquet"))

path,name,size,modificationTime
dbfs:/FileStore/tables/employee_parquet/_SUCCESS,_SUCCESS,0,1746493602000
dbfs:/FileStore/tables/employee_parquet/_committed_4541453399563326273,_committed_4541453399563326273,124,1746493602000
dbfs:/FileStore/tables/employee_parquet/_started_4541453399563326273,_started_4541453399563326273,0,1746493600000
dbfs:/FileStore/tables/employee_parquet/part-00000-tid-4541453399563326273-884647ab-47a9-4f12-92cf-e8e4cfae6d73-264-1-c000.snappy.parquet,part-00000-tid-4541453399563326273-884647ab-47a9-4f12-92cf-e8e4cfae6d73-264-1-c000.snappy.parquet,15018,1746493601000
