#DataReading

In [0]:
df = spark.read.format("csv").option("inferSchema", "true").option("header", "true").load("/Volumes/workspace/navpractise/naveen/navnaa/products.csv").display()

product_id,name,category,sub_category
101,iPhone 13,Electronics,Phone
102,MacBook Pro,Electronics,Laptop
103,T-shirt,Clothing,Top
104,Sofa,Furniture,Seating
105,Mixer Grinder,Kitchenware,Appliance
106,Shoes,Footwear,Sneakers
107,Samsung TV,Electronics,TV


#Schema Definition 

In [0]:
df = spark.read.format("csv").option("inferSchema", "true").option("header", "true").load("/Volumes/workspace/navpractise/naveen/navnaa/products.csv")

df.printSchema()


root
 |-- product_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- sub_category: string (nullable = true)



#DDL Schema

In [0]:
my_ddl = '''
           product_id: INT,
           name: string, 
           category: string,
           sub_category: string,
           '''

In [0]:
my_ddl = "product_id string, name STRING, category STRING, sub_category STRING"

df1 = spark.read.format("csv") \
    .schema(my_ddl) \
    .option("header", "true") \
    .load("/Volumes/workspace/navpractise/naveen/navnaa/products.csv")

df1.display()
df1.printSchema()

product_id,name,category,sub_category
101,iPhone 13,Electronics,Phone
102,MacBook Pro,Electronics,Laptop
103,T-shirt,Clothing,Top
104,Sofa,Furniture,Seating
105,Mixer Grinder,Kitchenware,Appliance
106,Shoes,Footwear,Sneakers
107,Samsung TV,Electronics,TV


#Struct_Type Schema

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

my_struct = StructType([
    StructField("product_id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("category", StringType(), True),
    StructField("sub_category", StringType(), True)
])

df1 = spark.read.format("csv") \
    .schema(my_struct) \
    .option("header", "true") \
    .load("/Volumes/workspace/navpractise/naveen/navnaa/products.csv")

df1.display()


product_id,name,category,sub_category
101,iPhone 13,Electronics,Phone
102,MacBook Pro,Electronics,Laptop
103,T-shirt,Clothing,Top
104,Sofa,Furniture,Seating
105,Mixer Grinder,Kitchenware,Appliance
106,Shoes,Footwear,Sneakers
107,Samsung TV,Electronics,TV


#Select

In [0]:
df = spark.read.format("csv").option("inferSchema", "true").option("header", "true").load("/Volumes/workspace/navpractise/naveen/navnaa/products.csv")

df_sel = df.select("product_id","name").display()


product_id,name
101,iPhone 13
102,MacBook Pro
103,T-shirt
104,Sofa
105,Mixer Grinder
106,Shoes
107,Samsung TV


#Alias

In [0]:
from pyspark.sql.functions import col

df = spark.read.format("csv") \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .load("/Volumes/workspace/navpractise/naveen/navnaa/products.csv")

df_alias = df.select(col("product_id").alias("p_id"))
df_alias.display()

p_id
101
102
103
104
105
106
107


#Filter

In [0]:
from pyspark.sql.functions import col

df = spark.read.format("csv") \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .load("/Volumes/workspace/navpractise/naveen/navnaa/products.csv")

fil = df.filter((col("name")=="Sofa") & (col("category")=="Furniture")).display()

product_id,name,category,sub_category
104,Sofa,Furniture,Seating


#IsNull and isIn

In [0]:
from pyspark.sql.functions import col

df = spark.read.format("csv") \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .load("/Volumes/workspace/navpractise/naveen/navnaa/products.csv")
fil = df.filter(
    col("name").isNull() & col("category").isin(["Electronics", "Furniture"])
)
display(fil)

product_id,name,category,sub_category


#withColumnRenamed

In [0]:
from pyspark.sql.functions import col

df = spark.read.format("csv") \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .load("/Volumes/workspace/navpractise/naveen/navnaa/products.csv")

df.withColumnRenamed("product_id","p_id").display()

p_id,name,category,sub_category
101,iPhone 13,Electronics,Phone
102,MacBook Pro,Electronics,Laptop
103,T-shirt,Clothing,Top
104,Sofa,Furniture,Seating
105,Mixer Grinder,Kitchenware,Appliance
106,Shoes,Footwear,Sneakers
107,Samsung TV,Electronics,TV


#withColumn

#Scenario 1 : adding new column using lit()

In [0]:
from pyspark.sql.functions import *

df = spark.read.format("csv") \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .load("/Volumes/workspace/navpractise/naveen/navnaa/products.csv")

df= df.withColumn("flag", lit("new"))
df.display()

product_id,name,category,sub_category,flag
101,iPhone 13,Electronics,Phone,new
102,MacBook Pro,Electronics,Laptop,new
103,T-shirt,Clothing,Top,new
104,Sofa,Furniture,Seating,new
105,Mixer Grinder,Kitchenware,Appliance,new
106,Shoes,Footwear,Sneakers,new
107,Samsung TV,Electronics,TV,new


#Scenario 2 modify existing column

In [0]:
from pyspark.sql.functions import *

df = spark.read.format("csv") \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .load("/Volumes/workspace/navpractise/naveen/navnaa/products.csv")

df = df.withColumn("adding", concat("product_id", "name"))
df.display()

product_id,name,category,sub_category,adding
101,iPhone 13,Electronics,Phone,101iPhone 13
102,MacBook Pro,Electronics,Laptop,102MacBook Pro
103,T-shirt,Clothing,Top,103T-shirt
104,Sofa,Furniture,Seating,104Sofa
105,Mixer Grinder,Kitchenware,Appliance,105Mixer Grinder
106,Shoes,Footwear,Sneakers,106Shoes
107,Samsung TV,Electronics,TV,107Samsung TV


#Scenario 3 : Replace the content inside the column regexp_replace()

In [0]:
from pyspark.sql.functions import *

df = spark.read.format("csv") \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .load("/Volumes/workspace/navpractise/naveen/navnaa/products.csv")

transformed_df = df.withColumn(
    "category",
    regexp_replace(regexp_replace(col("category"), "Electronics", "Elec"), "Clothing", "Cloth")
)

transformed_df.show()
transformed_df.printSchema()

+----------+-------------+-----------+------------+
|product_id|         name|   category|sub_category|
+----------+-------------+-----------+------------+
|       101|    iPhone 13|       Elec|       Phone|
|       102|  MacBook Pro|       Elec|      Laptop|
|       103|      T-shirt|      Cloth|         Top|
|       104|         Sofa|  Furniture|     Seating|
|       105|Mixer Grinder|Kitchenware|   Appliance|
|       106|        Shoes|   Footwear|    Sneakers|
|       107|   Samsung TV|       Elec|          TV|
+----------+-------------+-----------+------------+

root
 |-- product_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- sub_category: string (nullable = true)



#Type Casting

In [0]:
from pyspark.sql.functions import *

df = spark.read.format("csv") \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .load("/Volumes/workspace/navpractise/naveen/navnaa/products.csv")
df1 = df.withColumn("product_id", col("product_id").cast(StringType()))
display(df1)

product_id,name,category,sub_category
101,iPhone 13,Electronics,Phone
102,MacBook Pro,Electronics,Laptop
103,T-shirt,Clothing,Top
104,Sofa,Furniture,Seating
105,Mixer Grinder,Kitchenware,Appliance
106,Shoes,Footwear,Sneakers
107,Samsung TV,Electronics,TV


#Sort Scenario 1

In [0]:
from pyspark.sql.functions import *

df = spark.read.format("csv") \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .load("/Volumes/workspace/navpractise/naveen/navnaa/products.csv")
df.sort(col("product_id").desc()).display()

product_id,name,category,sub_category
107,Samsung TV,Electronics,TV
106,Shoes,Footwear,Sneakers
105,Mixer Grinder,Kitchenware,Appliance
104,Sofa,Furniture,Seating
103,T-shirt,Clothing,Top
102,MacBook Pro,Electronics,Laptop
101,iPhone 13,Electronics,Phone


#Scenario 2

In [0]:
from pyspark.sql.functions import *

df = spark.read.format("csv") \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .load("/Volumes/workspace/navpractise/naveen/navnaa/products.csv")
df.sort(col("sub_category").asc()).display()

product_id,name,category,sub_category
105,Mixer Grinder,Kitchenware,Appliance
102,MacBook Pro,Electronics,Laptop
101,iPhone 13,Electronics,Phone
104,Sofa,Furniture,Seating
106,Shoes,Footwear,Sneakers
107,Samsung TV,Electronics,TV
103,T-shirt,Clothing,Top


#Scenario 3

In [0]:
from pyspark.sql.functions import *

df = spark.read.format("csv") \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .load("/Volumes/workspace/navpractise/naveen/navnaa/products.csv")
df.sort(["product_id","name"],ascending=[0,1]).display()

product_id,name,category,sub_category
107,Samsung TV,Electronics,TV
106,Shoes,Footwear,Sneakers
105,Mixer Grinder,Kitchenware,Appliance
104,Sofa,Furniture,Seating
103,T-shirt,Clothing,Top
102,MacBook Pro,Electronics,Laptop
101,iPhone 13,Electronics,Phone


#Limit

In [0]:
from pyspark.sql.functions import *

df = spark.read.format("csv") \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .load("/Volumes/workspace/navpractise/naveen/navnaa/products.csv")
df.limit(3).display()

product_id,name,category,sub_category
101,iPhone 13,Electronics,Phone
102,MacBook Pro,Electronics,Laptop
103,T-shirt,Clothing,Top


#Drop

In [0]:
from pyspark.sql.functions import *

df = spark.read.format("csv") \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .load("/Volumes/workspace/navpractise/naveen/navnaa/products.csv")
df.drop("sub_category", "name").display()

product_id,category
101,Electronics
102,Electronics
103,Clothing
104,Furniture
105,Kitchenware
106,Footwear
107,Electronics


#Drop_Duplicates

In [0]:
from pyspark.sql.functions import *

df = spark.read.format("csv") \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .load("/Volumes/workspace/navpractise/naveen/navnaa/products.csv")
df.drop_duplicates(['category']).display()

product_id,name,category,sub_category
103,T-shirt,Clothing,Top
101,iPhone 13,Electronics,Phone
106,Shoes,Footwear,Sneakers
104,Sofa,Furniture,Seating
105,Mixer Grinder,Kitchenware,Appliance


#Distinct

In [0]:
from pyspark.sql.functions import *

df = spark.read.format("csv") \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .load("/Volumes/workspace/navpractise/naveen/navnaa/products.csv")
df.select("category").distinct().display()

category
Furniture
Clothing
Kitchenware
Footwear
Electronics


#Union

In [0]:
data1 = [(1,"jio"), (2, "min")]
schema1= ["id","name"]

data2 = [(3,"var"),(4, "haul")]
schema2 = ["id", "name"]

df1 = spark.createDataFrame(data1, schema1)
df2 = spark.createDataFrame(data2, schema2)

uni = df1.union(df2)
display(uni)


id,name
1,jio
2,min
3,var
4,haul


#UnionByName


In [0]:
data1 = [("jio", 1), ("min", 2)]
schema1 = ["name", "id"]

data2 = [("var", 3), ("haul", 4)]
schema2 = ["name", "id"]


df1 = spark.createDataFrame(data1, schema1)
df2 = spark.createDataFrame(data2, schema2)
uni = df1.unionByName(df2)
display(uni)


name,id
jio,1
min,2
var,3
haul,4


#String Function
#Initcap(),upper(),lower(``)

In [0]:
from pyspark.sql.functions import *

df = spark.read.format("csv") \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .load("/Volumes/workspace/navpractise/naveen/navnaa/products.csv")

df.select(initcap("name").alias("nam")).display()
df.select(lower("category").alias("cat")).display()
df.select(upper("sub_category").alias("sub")).display()

nam
Iphone 13
Macbook Pro
T-shirt
Sofa
Mixer Grinder
Shoes
Samsung Tv


cat
electronics
electronics
clothing
furniture
kitchenware
footwear
electronics


sub
PHONE
LAPTOP
TOP
SEATING
APPLIANCE
SNEAKERS
TV


#Date function current_date()

In [0]:
from pyspark.sql.functions import col, current_date, date_add

# Load data
df = spark.read.format("csv") \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .load("/Volumes/workspace/navpractise/naveen/navnaa/products.csv")

# Add current date
df = df.withColumn("Now", current_date())
df.show()  # or display(df) if you're in Databricks





+----------+-------------+-----------+------------+----------+
|product_id|         name|   category|sub_category|       Now|
+----------+-------------+-----------+------------+----------+
|       101|    iPhone 13|Electronics|       Phone|2025-07-06|
|       102|  MacBook Pro|Electronics|      Laptop|2025-07-06|
|       103|      T-shirt|   Clothing|         Top|2025-07-06|
|       104|         Sofa|  Furniture|     Seating|2025-07-06|
|       105|Mixer Grinder|Kitchenware|   Appliance|2025-07-06|
|       106|        Shoes|   Footwear|    Sneakers|2025-07-06|
|       107|   Samsung TV|Electronics|          TV|2025-07-06|
+----------+-------------+-----------+------------+----------+



#Date add()

In [0]:
# Add 7 days to the 'Now' column
df = df.withColumn("week_after", date_add(col("Now"), 7))
df.show()




+----------+-------------+-----------+------------+----------+----------+
|product_id|         name|   category|sub_category|       Now|week_after|
+----------+-------------+-----------+------------+----------+----------+
|       101|    iPhone 13|Electronics|       Phone|2025-07-06|2025-07-13|
|       102|  MacBook Pro|Electronics|      Laptop|2025-07-06|2025-07-13|
|       103|      T-shirt|   Clothing|         Top|2025-07-06|2025-07-13|
|       104|         Sofa|  Furniture|     Seating|2025-07-06|2025-07-13|
|       105|Mixer Grinder|Kitchenware|   Appliance|2025-07-06|2025-07-13|
|       106|        Shoes|   Footwear|    Sneakers|2025-07-06|2025-07-13|
|       107|   Samsung TV|Electronics|          TV|2025-07-06|2025-07-13|
+----------+-------------+-----------+------------+----------+----------+



#Date sub()

In [0]:
# sub 7 days to the 'Now' column
df = df.withColumn("week_before", date_sub(col("Now"), 7))
df.show()


# sub 7 days to the 'Now' column
#df = df.withColumn("week_bef", date_add(col("Now"), -7))
#df.show()

+----------+-------------+-----------+------------+----------+----------+-----------+----------+
|product_id|         name|   category|sub_category|       Now|week_after|week_before|  week_bef|
+----------+-------------+-----------+------------+----------+----------+-----------+----------+
|       101|    iPhone 13|Electronics|       Phone|2025-07-06|2025-07-13| 2025-06-29|2025-06-29|
|       102|  MacBook Pro|Electronics|      Laptop|2025-07-06|2025-07-13| 2025-06-29|2025-06-29|
|       103|      T-shirt|   Clothing|         Top|2025-07-06|2025-07-13| 2025-06-29|2025-06-29|
|       104|         Sofa|  Furniture|     Seating|2025-07-06|2025-07-13| 2025-06-29|2025-06-29|
|       105|Mixer Grinder|Kitchenware|   Appliance|2025-07-06|2025-07-13| 2025-06-29|2025-06-29|
|       106|        Shoes|   Footwear|    Sneakers|2025-07-06|2025-07-13| 2025-06-29|2025-06-29|
|       107|   Samsung TV|Electronics|          TV|2025-07-06|2025-07-13| 2025-06-29|2025-06-29|
+----------+-------------+----

#Datediff

In [0]:
#Datediff
df = df1.withColumn("Datedif", datediff(col("week_after"), col("Now")))
display(df)

product_id,name,category,sub_category,Now,week_after,Datedif
101,iPhone 13,Electronics,Phone,2025-07-06,2025-07-13,7
102,MacBook Pro,Electronics,Laptop,2025-07-06,2025-07-13,7
103,T-shirt,Clothing,Top,2025-07-06,2025-07-13,7
104,Sofa,Furniture,Seating,2025-07-06,2025-07-13,7
105,Mixer Grinder,Kitchenware,Appliance,2025-07-06,2025-07-13,7
106,Shoes,Footwear,Sneakers,2025-07-06,2025-07-13,7
107,Samsung TV,Electronics,TV,2025-07-06,2025-07-13,7


# Handling Nulls

# dropna("all", "any", "subset")

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Create Spark session (if not already done)
spark = SparkSession.builder.getOrCreate()

# Sample data with null values
data = [
    (1, "Alice", 25, "HR"),
    (2, "Bob", None, "Finance"),
    (3, None, 30, "IT"),
    (4, "David", 28, None),
    (5, "Eve", None, "HR"),
    (6, None, None, "Finance"),
    (7, "Grace", 29, "IT"),
    (8, "Henry", 35, None),
    (9, None, None, None),
    (10, "Ivy", 32, "Sales")
]

# Define schema
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("department", StringType(), True)
])

# Create DataFrame
df = spark.createDataFrame(data, schema)
df.show()

df.dropna("all").display()
df.dropna("any").display()
df.dropna(subset = "age").display()


+---+-----+----+----------+
| id| name| age|department|
+---+-----+----+----------+
|  1|Alice|  25|        HR|
|  2|  Bob|NULL|   Finance|
|  3| NULL|  30|        IT|
|  4|David|  28|      NULL|
|  5|  Eve|NULL|        HR|
|  6| NULL|NULL|   Finance|
|  7|Grace|  29|        IT|
|  8|Henry|  35|      NULL|
|  9| NULL|NULL|      NULL|
| 10|  Ivy|  32|     Sales|
+---+-----+----+----------+



id,name,age,department
1,Alice,25.0,HR
2,Bob,,Finance
3,,30.0,IT
4,David,28.0,
5,Eve,,HR
6,,,Finance
7,Grace,29.0,IT
8,Henry,35.0,
9,,,
10,Ivy,32.0,Sales


id,name,age,department
1,Alice,25,HR
7,Grace,29,IT
10,Ivy,32,Sales


id,name,age,department
1,Alice,25,HR
3,,30,IT
4,David,28,
7,Grace,29,IT
8,Henry,35,
10,Ivy,32,Sales


# Filling nulls

In [0]:
df.fillna("not available").display()
df = df.withColumn("age", df["age"].cast("string"))
df.fillna("not avail", subset=["age"]).display()

id,name,age,department
1,Alice,25.0,HR
2,Bob,,Finance
3,not available,30.0,IT
4,David,28.0,not available
5,Eve,,HR
6,not available,,Finance
7,Grace,29.0,IT
8,Henry,35.0,not available
9,not available,,not available
10,Ivy,32.0,Sales


id,name,age,department
1,Alice,25,HR
2,Bob,not avail,Finance
3,,30,IT
4,David,28,
5,Eve,not avail,HR
6,,not avail,Finance
7,Grace,29,IT
8,Henry,35,
9,,not avail,
10,Ivy,32,Sales


# Split and Indexing

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, col

# Start Spark Session
spark = SparkSession.builder.getOrCreate()

# Sample data
data = [
    ("John Doe", "john.doe@example.com", "Electronics>Mobiles>Smartphones"),
    ("Alice Smith", "alice.smith@abc.com", "Clothing>Women>Tops"),
    ("Bob Johnson", "bob.j@xyz.org", "Electronics>Computers>Laptops"),
    ("Mary Lee", "mary.lee@company.net", "Home>Furniture>Chairs"),
    ("James Bond", "james.bond@mi6.uk", "Accessories>Men>Watches")
]

# Column names
columns = ["full_name", "email", "category_path"]

# Create DataFrame
df = spark.createDataFrame(data, columns)
df.show(truncate=False)
#Split
df.withColumn("category_path", split("category_path",">").alias("cat_path")).display()
#Indexing
df.withColumn("category_path", split("category_path",">")[2].alias("cat_path")).display()


+-----------+--------------------+-------------------------------+
|full_name  |email               |category_path                  |
+-----------+--------------------+-------------------------------+
|John Doe   |john.doe@example.com|Electronics>Mobiles>Smartphones|
|Alice Smith|alice.smith@abc.com |Clothing>Women>Tops            |
|Bob Johnson|bob.j@xyz.org       |Electronics>Computers>Laptops  |
|Mary Lee   |mary.lee@company.net|Home>Furniture>Chairs          |
|James Bond |james.bond@mi6.uk   |Accessories>Men>Watches        |
+-----------+--------------------+-------------------------------+



full_name,email,category_path
John Doe,john.doe@example.com,"List(Electronics, Mobiles, Smartphones)"
Alice Smith,alice.smith@abc.com,"List(Clothing, Women, Tops)"
Bob Johnson,bob.j@xyz.org,"List(Electronics, Computers, Laptops)"
Mary Lee,mary.lee@company.net,"List(Home, Furniture, Chairs)"
James Bond,james.bond@mi6.uk,"List(Accessories, Men, Watches)"


full_name,email,category_path
John Doe,john.doe@example.com,Smartphones
Alice Smith,alice.smith@abc.com,Tops
Bob Johnson,bob.j@xyz.org,Laptops
Mary Lee,mary.lee@company.net,Chairs
James Bond,james.bond@mi6.uk,Watches


# Explode

In [0]:
from pyspark.sql.functions import split, explode

df_exp = df.withColumn("category_path", split("category_path",">").alias("cat_path"))
df_exp.withColumn("category_path", explode("category_path")).display()


full_name,email,category_path
John Doe,john.doe@example.com,Electronics
John Doe,john.doe@example.com,Mobiles
John Doe,john.doe@example.com,Smartphones
Alice Smith,alice.smith@abc.com,Clothing
Alice Smith,alice.smith@abc.com,Women
Alice Smith,alice.smith@abc.com,Tops
Bob Johnson,bob.j@xyz.org,Electronics
Bob Johnson,bob.j@xyz.org,Computers
Bob Johnson,bob.j@xyz.org,Laptops
Mary Lee,mary.lee@company.net,Home


# Array_Contains

In [0]:
from pyspark.sql.functions import array_contains

df_exp.withColumn("flag",array_contains("category_path","Electronics")).display()


full_name,email,category_path,flag
John Doe,john.doe@example.com,"List(Electronics, Mobiles, Smartphones)",True
Alice Smith,alice.smith@abc.com,"List(Clothing, Women, Tops)",False
Bob Johnson,bob.j@xyz.org,"List(Electronics, Computers, Laptops)",True
Mary Lee,mary.lee@company.net,"List(Home, Furniture, Chairs)",False
James Bond,james.bond@mi6.uk,"List(Accessories, Men, Watches)",False


# GroupBy

# scenario 1

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# Start Spark Session
spark = SparkSession.builder.getOrCreate()

# Sample data
data = [
    (1, "Alice", "North", "Electronics", 1200),
    (2, "Bob", "South", "Clothing", 800),
    (3, "Alice", "North", "Clothing", 300),
    (4, "David", "East", "Electronics", 1500),
    (5, "Alice", "North", "Electronics", 2000),
    (6, "Bob", "South", "Electronics", 2200),
    (7, "Carol", "West", "Clothing", 400),
    (8, "David", "East", "Clothing", 700),
    (9, "Carol", "West", "Electronics", 1300),
    (10, "Bob", "South", "Clothing", 900),
]

columns = ["transaction_id", "customer", "region", "category", "amount"]

# Create DataFrame
df = spark.createDataFrame(data, columns)
df.display()

# Group by category and sum amount
df.groupBy("category").agg(sum("amount").alias("total_amount")).display()

# Group by category and avg amount
df.groupBy("category").agg(avg("amount").alias("avg_amouny")).display()

# Group by 2 columns category and region 
df.groupBy("region","category").agg(sum("amount").alias("total")).display()

# Group by 2 columns and 2 aggregation 
df.groupBy("region","category").agg(sum("amount").alias("total"), avg("amount").alias("avg_amt")).display()




transaction_id,customer,region,category,amount
1,Alice,North,Electronics,1200
2,Bob,South,Clothing,800
3,Alice,North,Clothing,300
4,David,East,Electronics,1500
5,Alice,North,Electronics,2000
6,Bob,South,Electronics,2200
7,Carol,West,Clothing,400
8,David,East,Clothing,700
9,Carol,West,Electronics,1300
10,Bob,South,Clothing,900


category,total_amount
Electronics,8200
Clothing,3100


category,avg_amouny
Electronics,1640.0
Clothing,620.0


region,category,total
North,Electronics,3200
South,Clothing,1700
North,Clothing,300
East,Electronics,1500
South,Electronics,2200
West,Clothing,400
East,Clothing,700
West,Electronics,1300


region,category,total,avg_amt
North,Electronics,3200,1600.0
South,Clothing,1700,850.0
North,Clothing,300,300.0
East,Electronics,1500,1500.0
South,Electronics,2200,2200.0
West,Clothing,400,400.0
East,Clothing,700,700.0
West,Electronics,1300,1300.0


# Collect_list : make all the entity as list

In [0]:
from pyspark.sql import SparkSession

# Start Spark session
spark = SparkSession.builder.getOrCreate()

# Sample data: (user, book)
data = [
    ("user1", "Book 1"),
    ("user1", "Book 2"),
    ("user1", "Book 3"),
    ("user2", "Book 1"),
    ("user2", "Book 2"),
    ("user2", "Book 3"),
    ("user3", "Book 1"),
    ("user3", "Book 2"),
    ("user3", "Book 3"),
]

# Create DataFrame
columns = ["user", "book"]
df = spark.createDataFrame(data, columns)
df.show()
df.groupBy("user").agg(collect_list("book").alias("collected")).display()


+-----+------+
| user|  book|
+-----+------+
|user1|Book 1|
|user1|Book 2|
|user1|Book 3|
|user2|Book 1|
|user2|Book 2|
|user2|Book 3|
|user3|Book 1|
|user3|Book 2|
|user3|Book 3|
+-----+------+



user,collected
user1,"List(Book 1, Book 2, Book 3)"
user2,"List(Book 1, Book 2, Book 3)"
user3,"List(Book 1, Book 2, Book 3)"


# Pivot

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum

# Start Spark session
spark = SparkSession.builder.getOrCreate()

# Sample data: (region, month, sales)
data = [
    ("North", "Jan", 1000),
    ("North", "Feb", 1200),
    ("North", "Mar", 900),
    ("South", "Jan", 1100),
    ("South", "Feb", 1050),
    ("South", "Mar", 1150),
    ("East", "Jan", 950),
    ("East", "Feb", 1000),
    ("East", "Mar", 980),
    ("West", "Jan", 1030),
    ("West", "Feb", 1070),
    ("West", "Mar", 990),
]

columns = ["region", "month", "sales"]

# Create DataFrame
df = spark.createDataFrame(data, columns)
df.display()
df.groupBy("region").pivot("month").agg(avg("sales")).display()


region,month,sales
North,Jan,1000
North,Feb,1200
North,Mar,900
South,Jan,1100
South,Feb,1050
South,Mar,1150
East,Jan,950
East,Feb,1000
East,Mar,980
West,Jan,1030


region,Feb,Jan,Mar
West,1070.0,1030.0,990.0
East,1000.0,950.0,980.0
North,1200.0,1000.0,900.0
South,1050.0,1100.0,1150.0


# When-Otherwise

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col

# Start Spark session
spark = SparkSession.builder.getOrCreate()

# Sample data: (food_id, food_item, food_type)
data = [
    (1, "Paneer Butter Masala", "veg", 1000),
    (2, "Chicken Biryani", "non-veg",3000),
    (3, "Dal Tadka", "veg",477),
    (4, "Mutton Curry", "non-veg",200),
    (5, "Veg Fried Rice", "veg",400),
    (6, "Fish Fry", "non-veg",600),
    (7, "Aloo Gobi", "veg",800),
    (8, "Egg Curry", "non-veg",700),
    (9, "Masala Dosa", "veg",600),
    (10, "Prawn Masala", "non-veg",200)
]

columns = ["food_id", "food_item", "food_type", "food_rate"]

# Create DataFrame
df = spark.createDataFrame(data, columns)
df.show(truncate=False)
#scenario 1
df1 = df.withColumn(
    "Food_flag",
    when(col("food_type") == "veg", "veg eaters").otherwise("Non-veg eaters")
)
display(df1)
#scenario 2
df2 = df1.withColumn(
    "Exp_flag",
    when((col("Food_flag") == "veg eaters") & (col("food_rate") > 700), "Expensive")
    .when((col("Food_flag") == "veg eaters") & (col("food_rate") <= 700), "InExpensive")
    .otherwise("Non-veg eaters")
)
display(df2)  


+-------+--------------------+---------+---------+
|food_id|food_item           |food_type|food_rate|
+-------+--------------------+---------+---------+
|1      |Paneer Butter Masala|veg      |1000     |
|2      |Chicken Biryani     |non-veg  |3000     |
|3      |Dal Tadka           |veg      |477      |
|4      |Mutton Curry        |non-veg  |200      |
|5      |Veg Fried Rice      |veg      |400      |
|6      |Fish Fry            |non-veg  |600      |
|7      |Aloo Gobi           |veg      |800      |
|8      |Egg Curry           |non-veg  |700      |
|9      |Masala Dosa         |veg      |600      |
|10     |Prawn Masala        |non-veg  |200      |
+-------+--------------------+---------+---------+



food_id,food_item,food_type,food_rate,Food_flag
1,Paneer Butter Masala,veg,1000,veg eaters
2,Chicken Biryani,non-veg,3000,Non-veg eaters
3,Dal Tadka,veg,477,veg eaters
4,Mutton Curry,non-veg,200,Non-veg eaters
5,Veg Fried Rice,veg,400,veg eaters
6,Fish Fry,non-veg,600,Non-veg eaters
7,Aloo Gobi,veg,800,veg eaters
8,Egg Curry,non-veg,700,Non-veg eaters
9,Masala Dosa,veg,600,veg eaters
10,Prawn Masala,non-veg,200,Non-veg eaters


food_id,food_item,food_type,food_rate,Food_flag,Exp_flag
1,Paneer Butter Masala,veg,1000,veg eaters,Expensive
2,Chicken Biryani,non-veg,3000,Non-veg eaters,Non-veg eaters
3,Dal Tadka,veg,477,veg eaters,InExpensive
4,Mutton Curry,non-veg,200,Non-veg eaters,Non-veg eaters
5,Veg Fried Rice,veg,400,veg eaters,InExpensive
6,Fish Fry,non-veg,600,Non-veg eaters,Non-veg eaters
7,Aloo Gobi,veg,800,veg eaters,Expensive
8,Egg Curry,non-veg,700,Non-veg eaters,Non-veg eaters
9,Masala Dosa,veg,600,veg eaters,InExpensive
10,Prawn Masala,non-veg,200,Non-veg eaters,Non-veg eaters


# Joins

In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Customer DataFrame
customers_data = [
    (1, "Alice", "USA"),
    (2, "Bob", "UK"),
    (3, "Charlie", "India"),
    (4, "David", "Germany"),
    (5, "Eva", "France"),
]

customers_columns = ["customer_id", "name", "country"]
df1 = spark.createDataFrame(customers_data, customers_columns)

# Orders DataFrame
orders_data = [
    (101, 1, "Laptop"),
    (102, 2, "Phone"),
    (103, 1, "Mouse"),
    (104, 6, "Monitor"),
    (105, 3, "Keyboard"),
]

orders_columns = ["order_id", "customer_id", "product"]
df2 = spark.createDataFrame(orders_data, orders_columns)

# ✅ Now display separately (optional)
display(df1)
display(df2)


customer_id,name,country
1,Alice,USA
2,Bob,UK
3,Charlie,India
4,David,Germany
5,Eva,France


order_id,customer_id,product
101,1,Laptop
102,2,Phone
103,1,Mouse
104,6,Monitor
105,3,Keyboard


# Inner Join

In [0]:
# Inner join
df1.join(df2, df1["customer_id"] == df2["customer_id"], "inner").display()

customer_id,name,country,order_id,customer_id.1,product
1,Alice,USA,103,1,Mouse
1,Alice,USA,101,1,Laptop
2,Bob,UK,102,2,Phone
3,Charlie,India,105,3,Keyboard


# Left Join

In [0]:
# left join
df1.join(df2, df1["customer_id"] == df2["customer_id"], "left").display()

customer_id,name,country,order_id,customer_id.1,product
1,Alice,USA,103.0,1.0,Mouse
1,Alice,USA,101.0,1.0,Laptop
2,Bob,UK,102.0,2.0,Phone
3,Charlie,India,105.0,3.0,Keyboard
4,David,Germany,,,
5,Eva,France,,,


# Right Join

In [0]:
# Right  join
df1.join(df2, df1["customer_id"] == df2["customer_id"], "right").display()

customer_id,name,country,order_id,customer_id.1,product
1.0,Alice,USA,103,1,Mouse
2.0,Bob,UK,102,2,Phone
3.0,Charlie,India,105,3,Keyboard
1.0,Alice,USA,101,1,Laptop
,,,104,6,Monitor


# Anti Join

In [0]:
# Anti join
df1.join(df2, df1["customer_id"] == df2["customer_id"], "anti").display()

customer_id,name,country
4,David,Germany
5,Eva,France


# Window Function

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# Start Spark session
spark = SparkSession.builder.getOrCreate()

# Sample data
data = [
    (1, "Alice", "HR", 4000, "2023-01"),
    (2, "Bob", "HR", 4500, "2023-01"),
    (3, "Charlie", "IT", 5000, "2023-01"),
    (4, "David", "IT", 4800, "2023-01"),
    (5, "Eve", "Finance", 5300, "2023-01"),
    
    (1, "Alice", "HR", 4100, "2023-02"),
    (2, "Bob", "HR", 4600, "2023-02"),
    (3, "Charlie", "IT", 5100, "2023-02"),
    (4, "David", "IT", 4900, "2023-02"),
    (5, "Eve", "Finance", 5400, "2023-02"),
    
    (1, "Alice", "HR", 4200, "2023-03"),
    (2, "Bob", "HR", 4700, "2023-03"),
    (3, "Charlie", "IT", 5200, "2023-03"),
    (4, "David", "IT", 5000, "2023-03"),
    (5, "Eve", "Finance", 5500, "2023-03"),
]

columns = ["emp_id", "name", "department", "salary", "month"]
df = spark.createDataFrame(data, columns)
display(df)


emp_id,name,department,salary,month
1,Alice,HR,4000,2023-01
2,Bob,HR,4500,2023-01
3,Charlie,IT,5000,2023-01
4,David,IT,4800,2023-01
5,Eve,Finance,5300,2023-01
1,Alice,HR,4100,2023-02
2,Bob,HR,4600,2023-02
3,Charlie,IT,5100,2023-02
4,David,IT,4900,2023-02
5,Eve,Finance,5400,2023-02


# row number()

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# Define the window specification
window_spec = Window.orderBy("emp_id")

# Apply row_number over the window
df.withColumn("rowcol", row_number().over(window_spec)).display()




emp_id,name,department,salary,month,rowcol
1,Alice,HR,4000,2023-01,1
1,Alice,HR,4100,2023-02,2
1,Alice,HR,4200,2023-03,3
2,Bob,HR,4500,2023-01,4
2,Bob,HR,4600,2023-02,5
2,Bob,HR,4700,2023-03,6
3,Charlie,IT,5000,2023-01,7
3,Charlie,IT,5100,2023-02,8
3,Charlie,IT,5200,2023-03,9
4,David,IT,4800,2023-01,10


Rank()

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank

w= Window.orderBy("emp_id")
df.withColumn("Rank_col", rank().over(w)).display()

emp_id,name,department,salary,month,Rank_col
1,Alice,HR,4000,2023-01,1
1,Alice,HR,4100,2023-02,1
1,Alice,HR,4200,2023-03,1
2,Bob,HR,4500,2023-01,4
2,Bob,HR,4600,2023-02,4
2,Bob,HR,4700,2023-03,4
3,Charlie,IT,5000,2023-01,7
3,Charlie,IT,5100,2023-02,7
3,Charlie,IT,5200,2023-03,7
4,David,IT,4800,2023-01,10


# Dense rank()

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import dense_rank

window_spec = Window.orderBy(col("emp_id").desc())

# Apply row_number over the window
df.withColumn("densecol", dense_rank().over(window_spec)).display()

emp_id,name,department,salary,month,densecol
5,Eve,Finance,5300,2023-01,1
5,Eve,Finance,5400,2023-02,1
5,Eve,Finance,5500,2023-03,1
4,David,IT,4800,2023-01,2
4,David,IT,4900,2023-02,2
4,David,IT,5000,2023-03,2
3,Charlie,IT,5000,2023-01,3
3,Charlie,IT,5100,2023-02,3
3,Charlie,IT,5200,2023-03,3
2,Bob,HR,4500,2023-01,4


# Cumulative Sum : To add row by row

In [0]:

from pyspark.sql.window import Window
from pyspark.sql.functions import sum

# Define the window specification
window_spec = Window.orderBy("emp_id").rowsBetween(Window.unboundedPreceding, Window.currentRow)

df.withColumn("cumsum", sum("salary").over(window_spec)).display()


# Define the window specification
window_spec = Window.orderBy("emp_id").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

df.withColumn("totsum", sum("salary").over(window_spec)).display()

emp_id,name,department,salary,month,cumsum
1,Alice,HR,4000,2023-01,4000
1,Alice,HR,4100,2023-02,8100
1,Alice,HR,4200,2023-03,12300
2,Bob,HR,4500,2023-01,16800
2,Bob,HR,4600,2023-02,21400
2,Bob,HR,4700,2023-03,26100
3,Charlie,IT,5000,2023-01,31100
3,Charlie,IT,5100,2023-02,36200
3,Charlie,IT,5200,2023-03,41400
4,David,IT,4800,2023-01,46200


emp_id,name,department,salary,month,totsum
1,Alice,HR,4000,2023-01,72300
2,Bob,HR,4500,2023-01,72300
3,Charlie,IT,5000,2023-01,72300
4,David,IT,4800,2023-01,72300
5,Eve,Finance,5300,2023-01,72300
1,Alice,HR,4100,2023-02,72300
2,Bob,HR,4600,2023-02,72300
3,Charlie,IT,5100,2023-02,72300
4,David,IT,4900,2023-02,72300
5,Eve,Finance,5400,2023-02,72300


# User defined

In [0]:
# Step 1: Define the function
def myfun(x):
    return x * x

# Step 2: Register the UDF
myudf = udf(myfun)

# Step 3: Create a sample DataFrame
data = [(1, 1000), (2, 2000), (3, 3000)]
columns = ["emp_id", "salary"]
df = spark.createDataFrame(data, columns)

# Step 4: Use the UDF to create a new column
df = df.withColumn("newcol", myudf("salary"))

# Step 5: Display the DataFrame
display(df)

emp_id,salary,newcol
1,1000,1000000
2,2000,4000000
3,3000,9000000


# Data Writing and Modes


# Append

In [0]:
df = spark.read.format("csv").option("inferSchema", "true").option("header", "true").load("/Volumes/workspace/navpractise/naveen/navnaa/products.csv")

#append
df.write.format("csv").mode("append").save("/Volumes/workspace/navpractise/naveen/navnaa/products.csv")

product_id,name,category,sub_category
101,iPhone 13,Electronics,Phone
102,MacBook Pro,Electronics,Laptop
103,T-shirt,Clothing,Top
104,Sofa,Furniture,Seating
105,Mixer Grinder,Kitchenware,Appliance
106,Shoes,Footwear,Sneakers
107,Samsung TV,Electronics,TV
102,MacBook Pro,Electronics,Laptop
103,T-shirt,Clothing,Top
104,Sofa,Furniture,Seating


# Overwrite

In [0]:
df = spark.read.format("csv").option("inferSchema", "true").option("header", "true").load("/Volumes/workspace/navpractise/naveen/navnaa/products.csv")

#overwrite
df.write.format("csv").mode("overwrite").save("/Volumes/workspace/navpractise/naveen/navnaa/products.csv")

product_id,name,category,sub_category
101,iPhone 13,Electronics,Phone
102,MacBook Pro,Electronics,Laptop
103,T-shirt,Clothing,Top
104,Sofa,Furniture,Seating
105,Mixer Grinder,Kitchenware,Appliance
106,Shoes,Footwear,Sneakers
107,Samsung TV,Electronics,TV
102,MacBook Pro,Electronics,Laptop
103,T-shirt,Clothing,Top
104,Sofa,Furniture,Seating


# Error

In [0]:
df = spark.read.format("csv").option("inferSchema", "true").option("header", "true").load("/Volumes/workspace/navpractise/naveen/navnaa/products.csv")
#error
df.write.format("csv").mode("error").save("/Volumes/workspace/navpractise/naveen/navnaa/products.csv")

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-4861324046094436>, line 3[0m
[1;32m      1[0m df [38;5;241m=[39m spark[38;5;241m.[39mread[38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mcsv[39m[38;5;124m"[39m)[38;5;241m.[39moption([38;5;124m"[39m[38;5;124minferSchema[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mtrue[39m[38;5;124m"[39m)[38;5;241m.[39moption([38;5;124m"[39m[38;5;124mheader[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mtrue[39m[38;5;124m"[39m)[38;5;241m.[39mload([38;5;124m"[39m[38;5;124m/Volumes/workspace/navpractise/naveen/navnaa/products.csv[39m[38;5;124m"[39m)
[0;32m----> 3[0m df[38;5;241m.[39mwrite[38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mcsv[39m[38;5;124m"[39m)[38;5;241m.[39mmode([38;5;124m"[39m[38;5;124merror[39m[38;5;124m"[39m)[38;5;241m.[39msave(

# Ignore

In [0]:
df = spark.read.format("csv").option("inferSchema", "true").option("header", "true").load("/Volumes/workspace/navpractise/naveen/navnaa/products.csv")


df.write.format("csv").mode("ignore").save("/Volumes/workspace/navpractise/naveen/navnaa/products.csv")

# Parquet

In [0]:


df.write.format("parquet").mode("overwrite").save("/Volumes/workspace/navpractise/naveen/navnaa/products.csv")

#Tables

In [0]:
# Recreate the DataFrame
df = spark.read.format("csv").load("/Volumes/workspace/navpractise/naveen/navnaa/products.csv")

# Write the DataFrame to the table
df.write.format("delta").mode("overwrite").saveAsTable("my_tab")



# Spark SQL

In [0]:
data = [(1, 1000), (2, 2000), (3, 3000)]
columns = ["emp_id", "salary"]
df = spark.createDataFrame(data, columns)

df.createOrReplaceTempView("mytemp")

In [0]:
%sql
select * from mytemp

emp_id,salary
1,1000
2,2000
3,3000


#Convert sql to dataframe

In [0]:
df_sql = spark.sql("select * from mytemp").display()

emp_id,salary
1,1000
2,2000
3,3000
