In [1]:
from pyspark.sql import SparkSession
from datetime import date,datetime
spark=SparkSession.builder.appName("pyspark practise-1").getOrCreate()

In [2]:
data=[(1,'alice',100.78,True,date(2025,5,12),datetime(2025,5,12,14,30,0)),(2,'bob',200.89,False,date(2025,1,1),datetime(2025,1,1,14,30,0)), (3,'charlie',30.870,True,date(2026,1,1),datetime(2025,1,1,14,30,0))]

df1=spark.createDataFrame(data,schema="id int,name string,salary float,self_employed boolean,start_date date,last_login timestamp")
df1.show()
df1.printSchema()
df1.head(2)

+---+-------+------+-------------+----------+-------------------+
| id|   name|salary|self_employed|start_date|         last_login|
+---+-------+------+-------------+----------+-------------------+
|  1|  alice|100.78|         true|2025-05-12|2025-05-12 14:30:00|
|  2|    bob|200.89|        false|2025-01-01|2025-01-01 14:30:00|
|  3|charlie| 30.87|         true|2026-01-01|2025-01-01 14:30:00|
+---+-------+------+-------------+----------+-------------------+

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- salary: float (nullable = true)
 |-- self_employed: boolean (nullable = true)
 |-- start_date: date (nullable = true)
 |-- last_login: timestamp (nullable = true)



[Row(id=1, name='alice', salary=100.77999877929688, self_employed=True, start_date=datetime.date(2025, 5, 12), last_login=datetime.datetime(2025, 5, 12, 14, 30)),
 Row(id=2, name='bob', salary=200.88999938964844, self_employed=False, start_date=datetime.date(2025, 1, 1), last_login=datetime.datetime(2025, 1, 1, 14, 30))]

In [3]:
# Reading CSV file without header with schema

df2=spark.read.format('csv').option('header','false').schema('id int,event_date date,count int').load('./csvFiles/sample1.csv')

df2.printSchema()
df2.head(3)

root
 |-- id: integer (nullable = true)
 |-- event_date: date (nullable = true)
 |-- count: integer (nullable = true)



[Row(id=1, event_date=datetime.date(2025, 1, 1), count=2),
 Row(id=1, event_date=datetime.date(2025, 1, 2), count=3),
 Row(id=2, event_date=datetime.date(2025, 1, 2), count=5)]

In [7]:
# Reading CSV file with header with schema
df2=spark.read.format('csv').option('header','true').option('inferSchema','true').load('./csvFiles/sample2.csv')
df2.printSchema()
df2.head(3)
# df2=spark.read.format('csv').option('header','true').option('inferSchema','true').option('delimiter',',').load('sample2.csv')
# df2.printSchema()

root
 |-- roll_no: integer (nullable = true)
 |-- admission_date: date (nullable = true)
 |-- marks: integer (nullable = true)



[Row(roll_no=1, admission_date=datetime.date(2025, 1, 1), marks=2),
 Row(roll_no=1, admission_date=datetime.date(2025, 1, 2), marks=3),
 Row(roll_no=2, admission_date=datetime.date(2025, 1, 2), marks=5)]

In [None]:
# Reading JSON file with/without schema
#df=spark.read.format('json').option('inferSchema','true').option('multiline','true').load('./csvFiles/sample1.json')
df=spark.read.format('json').schema('name string,state string').option('multiline','true').load('./csvFiles/sample1.json')
#schema('name string,state string')
df.printSchema()
df.head(2)



root
 |-- name: string (nullable = true)
 |-- state: string (nullable = true)



[Row(name='dk', state='active')]

In [4]:
#selecting columns from dataframe
df1=spark.createDataFrame([[1,2,3],[4,5,6],[7,8,9]],schema="col1 int,col2 int,col3 int")
print("Dataframe df1:")
#df1.show()
print("Selecting specific columns from df1:")
#df1.select('col1','col3').show()
from pyspark.sql.functions import col
# Using col function to select columns
#df1.select(col('col1').alias('col1_alias'),col('col2').alias('col2_alias')).show()

#simple aggregation functions
from pyspark.sql.functions import max,min,avg,sum,least,greatest,count_distinct
df1.select(max(col('col1')).alias('max_col1'),min(col('col2')).alias('min_col2'),avg(col('col3')).alias('avg_col3'),sum('col1'),count_distinct('col1')).show()

df1.select(least('col1','col2','col3'),greatest('col1','col2','col3')).show()

Dataframe df1:
Selecting specific columns from df1:
+--------+--------+--------+---------+--------------------+
|max_col1|min_col2|avg_col3|sum(col1)|count(DISTINCT col1)|
+--------+--------+--------+---------+--------------------+
|       7|       2|     6.0|       12|                   3|
+--------+--------+--------+---------+--------------------+

+-----------------------+--------------------------+
|least(col1, col2, col3)|greatest(col1, col2, col3)|
+-----------------------+--------------------------+
|                      1|                         3|
|                      4|                         6|
|                      7|                         9|
+-----------------------+--------------------------+



In [5]:
# Adding a new column to existing dataframe
df1=spark.createDataFrame([['student1',45,33,24,'good'],['student2',5,3,20,'bad'],['student3',50,50,50,'good']],schema='name string,maths int,science int,english int,remarks string')
df1.show()
# Adding a new column with total marks 
# df2=df1.withColumn('total_marks',col('maths')+col('science')+col('english'))
# df2.show()
# Adding a new column with total marks and percentage
# df2=df1.withColumn('total_marks',col('maths')+col('science')+col('english')).withColumn('percentage',(col('total_marks')*100)/150)
# df2.show()

#1) using withColumn to add a new column

# Adding a new column with total marks and percentage calculated in a complex way
from pyspark.sql.functions import when,least,lit
df2=df1.withColumn('total_marks',col('maths')+col('science')+col('english')).withColumn('percentage',
                                                                                        when(col('remarks')=='good',(least(col('total_marks')+20,lit(150))*100)/150)
                                                                                        .when(col('remarks')=='bad',(col('total_marks')*100)/150)
                                                                                        .otherwise(0)
                                                                                        )
df2.show()
#note: least function returns the least value among the columns passed to it.
#note: lit function is used to create a column with a literal/constant value.
#note: when-otherwise in pyspark is similar to if-else condition in python.

#1) using withColumns to add a new column

from pyspark.sql.functions import greatest,least
# Adding a new column with highest and lowest marks among the subjects
df3=df2.withColumns({
    'highest_marks':greatest(col('maths'),col('science'),col('english')),
    'lowest_marks':least(col('maths'),col('science'),col('english'))
})
df3.show()

+--------+-----+-------+-------+-------+
|    name|maths|science|english|remarks|
+--------+-----+-------+-------+-------+
|student1|   45|     33|     24|   good|
|student2|    5|      3|     20|    bad|
|student3|   50|     50|     50|   good|
+--------+-----+-------+-------+-------+

+--------+-----+-------+-------+-------+-----------+------------------+
|    name|maths|science|english|remarks|total_marks|        percentage|
+--------+-----+-------+-------+-------+-----------+------------------+
|student1|   45|     33|     24|   good|        102| 81.33333333333333|
|student2|    5|      3|     20|    bad|         28|18.666666666666668|
|student3|   50|     50|     50|   good|        150|             100.0|
+--------+-----+-------+-------+-------+-----------+------------------+

+--------+-----+-------+-------+-------+-----------+------------------+-------------+------------+
|    name|maths|science|english|remarks|total_marks|        percentage|highest_marks|lowest_marks|
+--------

In [52]:
# Dropping a columns from dataframe
from pyspark.sql.functions import col
df1=spark.createDataFrame([[1,2,3],[4,5,6]],schema="col1 int,col2 int,col3 int")
df1.drop(col('col2'),col('col3')).show()  # Dropping a column from dataframe

+----+
|col1|
+----+
|   1|
|   4|
+----+



In [None]:
#BASIC FILTERING
df1=spark.createDataFrame([['student1',45,33,24,'good'],['student2',5,3,20,'bad'],['student3',50,50,50,'good']],schema='name string,maths int,science int,english int,remarks string')
df1.show()
from pyspark.sql.functions import col
df1.filter(col('remarks')=='good').show()
## Multiple conditions require parentheses around each condition
df1.filter((col('remarks')=='good') & (col('science')>45)).show()

df1.filter((col('remarks')=='bad') | (col('english')<25)).show()

+--------+-----+-------+-------+-------+
|    name|maths|science|english|remarks|
+--------+-----+-------+-------+-------+
|student1|   45|     33|     24|   good|
|student2|    5|      3|     20|    bad|
|student3|   50|     50|     50|   good|
+--------+-----+-------+-------+-------+

+--------+-----+-------+-------+-------+
|    name|maths|science|english|remarks|
+--------+-----+-------+-------+-------+
|student1|   45|     33|     24|   good|
|student3|   50|     50|     50|   good|
+--------+-----+-------+-------+-------+

+--------+-----+-------+-------+-------+
|    name|maths|science|english|remarks|
+--------+-----+-------+-------+-------+
|student3|   50|     50|     50|   good|
+--------+-----+-------+-------+-------+

+--------+-----+-------+-------+-------+
|    name|maths|science|english|remarks|
+--------+-----+-------+-------+-------+
|student1|   45|     33|     24|   good|
|student2|    5|      3|     20|    bad|
+--------+-----+-------+-------+-------+



In [None]:
#STRING FILTERING
df1=spark.createDataFrame([['Ravi',100],['Rahul',200],['sunny',40],['R123',100],['xxxx',100]],schema='name string,salary int')
from pyspark.sql.functions import col
df1.show()
# like operator is similar to sql like operator
print('names starting with R')
df1.filter(col('name').like('R%')).show()  # names starting with R
print('names having 5 letters')
df1.filter(col('name').like('_____')).show()  # names having 5 letters
# regex filtering
print('names having 5 letters-using regex')
df1.filter(col('name').rlike('^.{5}$')).show()  # names having 5 letters
print('names having only small letters')
df1.filter(col('name').rlike('^[a-z]+$')).show() #names having only small letters
print('names starting with capital letter, and followed by 3 digits')
df1.filter(col('name').rlike('^[A-Z][0-9]{3}$')).show() #names starting with capital letter, and followed by 3 digits
print('names having exactly 4 small letters')
df1.filter(col('name').rlike('^[a-z]{4}$')).show()

# Explanation of regex patterns used above:
# ^: Matches the beginning of the string.

# $: Matches the end of the string.

# .: Matches any single character.

# *: Matches the preceding character zero or more times.

# +: Matches the preceding character one or more times.

# [ ]: Matches any one of the characters inside the brackets.

# |: Acts as an OR condition.


+-----+------+
| name|salary|
+-----+------+
| Ravi|   100|
|Rahul|   200|
|sunny|    40|
| R123|   100|
| xxxx|   100|
+-----+------+

names starting with R
+-----+------+
| name|salary|
+-----+------+
| Ravi|   100|
|Rahul|   200|
| R123|   100|
+-----+------+

names having 5 letters
+-----+------+
| name|salary|
+-----+------+
|Rahul|   200|
|sunny|    40|
+-----+------+

names having 5 letters-using regex
+-----+------+
| name|salary|
+-----+------+
|Rahul|   200|
|sunny|    40|
+-----+------+

names having only small letters
+-----+------+
| name|salary|
+-----+------+
|sunny|    40|
| xxxx|   100|
+-----+------+

names starting with capital letter, and followed by 3 digits
+----+------+
|name|salary|
+----+------+
|R123|   100|
+----+------+

names having exactly 4 small letters
+----+------+
|name|salary|
+----+------+
|xxxx|   100|
+----+------+



In [None]:
# dealing with null values
df1=spark.createDataFrame([['Ravi',100],['Rahul',None],['sunny',40],['kk',100],['xxxx',100]],schema='name string,salary int')
df1.show()
from pyspark.sql.functions import col
print("iltering rows where salary is null")
df1.filter(col('salary').isNull()).show()  # Filtering rows where salary is null
print("Filtering rows where salary is not null")
df1.filter(col('salary').isNotNull()).show()  # Filtering rows where salary is not null
print("Dropping all rows with null values in any column")
df1.dropna().show()
print("Dropping any rows with null values in name column")
df1.dropna(subset=['name']).show() #subset needs a list of strings, so don't use col() here

print("Filling all null values, for all columns with a specific value: 'N/A' for all columns  ")
df1.fillna('N/A').show() #this will NOT fill nulls in int column, since 'N/A' is string

print("Filling all null values, for all columns with a specific value: 0 for all columns  ")
df1.fillna(0).show() #this will fill nulls in int column, since 0 is int

print('modified df:')
df1=spark.createDataFrame([['Ravi',100],['Rahul',None],[None,40],['kk',100],['xxxx',100]],schema='name string,salary int')
df1.show()
print("Filling null values in specific columns with specific values")
df1.fillna({
    'name':'Unknown',
    'salary':0
}).show()


+-----+------+
| name|salary|
+-----+------+
| Ravi|   100|
|Rahul|  NULL|
|sunny|    40|
|   kk|   100|
| xxxx|   100|
+-----+------+

iltering rows where salary is null
+-----+------+
| name|salary|
+-----+------+
|Rahul|  NULL|
+-----+------+

Filtering rows where salary is not null
+-----+------+
| name|salary|
+-----+------+
| Ravi|   100|
|sunny|    40|
|   kk|   100|
| xxxx|   100|
+-----+------+

Dropping all rows with null values in any column
+-----+------+
| name|salary|
+-----+------+
| Ravi|   100|
|sunny|    40|
|   kk|   100|
| xxxx|   100|
+-----+------+

Dropping any rows with null values in name column
+-----+------+
| name|salary|
+-----+------+
| Ravi|   100|
|Rahul|  NULL|
|sunny|    40|
|   kk|   100|
| xxxx|   100|
+-----+------+

Filling all null values, for all columns with a specific value: 'N/A' for all columns  
+-----+------+
| name|salary|
+-----+------+
| Ravi|   100|
|Rahul|  NULL|
|sunny|    40|
|   kk|   100|
| xxxx|   100|
+-----+------+

Filling all 

In [19]:
df1=spark.createDataFrame([['rr',100],['rr',90],['sunny',90],['kk',100],['kk',100]],schema='name string,salary int')
df1.show()
from pyspark.sql.functions import col

print("Removing duplicate rows from dataframe")
df1.distinct().show()

print("Removing duplicate rows based on specific column(s), ie name column")
df1.dropDuplicates(subset=['name']).show() # gives more control, you can pass a list of columns based on which duplicates should be removed.
#if subset is not passed, it behaves like distinct()

df1.filter(col('name').isin(['rr','sunny'])).show()  # Filtering rows where name is in the given list

+-----+------+
| name|salary|
+-----+------+
|   rr|   100|
|   rr|    90|
|sunny|    90|
|   kk|   100|
|   kk|   100|
+-----+------+

Removing duplicate rows from dataframe
+-----+------+
| name|salary|
+-----+------+
|   rr|   100|
|   rr|    90|
|sunny|    90|
|   kk|   100|
+-----+------+

Removing duplicate rows based on specific column(s), ie name column
+-----+------+
| name|salary|
+-----+------+
|   rr|   100|
|sunny|    90|
|   kk|   100|
+-----+------+

+-----+------+
| name|salary|
+-----+------+
|   rr|   100|
|   rr|    90|
|sunny|    90|
+-----+------+



In [16]:
#group by
df1=spark.createDataFrame([['rr',100],['rr',50],['sunny',90],['kk',100],['kk',10]],schema='name string,salary int')
df1.show()
from pyspark.sql.functions import col,sum,max,min,collect_list
df1.groupby('name').sum('salary').show()  # Grouping by name and calculating sum of salary

#VV IMP
#using .agg() <-- this is more flexible and allows multiple aggregations, and also allows aliasing the aggregated columns

df1.groupby(col('name')).agg(collect_list('salary').alias('salary_list'),sum('salary').alias('total_salary'),max('salary').alias('max_salary'),min('salary').alias('min_salary')).show()

#find aggregated values , for name where total salary>=100

df1.groupby(col('name')).agg(sum('salary').alias('total_salary'),max('salary').alias('max_salary'),min('salary').alias('min_salary')).filter(col('total_salary')>=100).show()


+-----+------+
| name|salary|
+-----+------+
|   rr|   100|
|   rr|    50|
|sunny|    90|
|   kk|   100|
|   kk|    10|
+-----+------+

+-----+-----------+
| name|sum(salary)|
+-----+-----------+
|   rr|        150|
|sunny|         90|
|   kk|        110|
+-----+-----------+

+-----+-----------+------------+----------+----------+
| name|salary_list|total_salary|max_salary|min_salary|
+-----+-----------+------------+----------+----------+
|   rr|  [100, 50]|         150|       100|        50|
|sunny|       [90]|          90|        90|        90|
|   kk|  [100, 10]|         110|       100|        10|
+-----+-----------+------------+----------+----------+

+----+------------+----------+----------+
|name|total_salary|max_salary|min_salary|
+----+------------+----------+----------+
|  rr|         150|       100|        50|
|  kk|         110|       100|        10|
+----+------------+----------+----------+



In [28]:
#JOINS
df1=spark.createDataFrame([[1,'alice'],[2,'bob'],[3,'charlie']],schema='id int,name string')
df2=spark.createDataFrame([[1,'maths',30],[1,'science',20],[2,'history',50]],schema='id int,subject string,marks int')

#inner join
df1.join(df2,df1.id==df2.id,'inner').show()

#left join
df1.join(df2,df1.id==df2.id,'left').show()

#its always better to add alias, before join, to avoid ambiguity
from pyspark.sql.functions import sum
df1=df1.alias('a')
df2=df2.alias('b')
df1.join(df2,df1.id==df2.id,'left').groupby('a.id','a.name').agg(sum('b.marks').alias('totalmark')).show()  # left anti join

#trying temp view
df1.createOrReplaceTempView('df1')
df2.createOrReplaceTempView('df2')
spark.sql("""
select a.id,a.name,sum(b.marks) as total_marks,string_agg(b.subject,',') within group(order by b.marks desc) as subjects
from df1 a left join df2 b on a.id=b.id
group by a.id,a.name
""").show()

+---+-----+---+-------+-----+
| id| name| id|subject|marks|
+---+-----+---+-------+-----+
|  1|alice|  1|  maths|   30|
|  1|alice|  1|science|   20|
|  2|  bob|  2|history|   50|
+---+-----+---+-------+-----+

+---+-------+----+-------+-----+
| id|   name|  id|subject|marks|
+---+-------+----+-------+-----+
|  1|  alice|   1|science|   20|
|  1|  alice|   1|  maths|   30|
|  2|    bob|   2|history|   50|
|  3|charlie|NULL|   NULL| NULL|
+---+-------+----+-------+-----+

+---+-------+---------+
| id|   name|totalmark|
+---+-------+---------+
|  1|  alice|       50|
|  2|    bob|       50|
|  3|charlie|     NULL|
+---+-------+---------+

+---+-------+-----------+-------------+
| id|   name|total_marks|     subjects|
+---+-------+-----------+-------------+
|  1|  alice|         50|maths,science|
|  2|    bob|         50|      history|
|  3|charlie|       NULL|         NULL|
+---+-------+-----------+-------------+



In [3]:
df1=spark.createDataFrame([['rr',100],['rr',50],['sunny',90],['kk',100],['kk',10]],schema='name string,salary int')
from pyspark.sql.functions import asc,desc
df1.orderBy(desc('name'),asc('salary')).show()  # Sorting by name ascending and salary ascending

+-----+------+
| name|salary|
+-----+------+
|sunny|    90|
|   rr|    50|
|   rr|   100|
|   kk|    10|
|   kk|   100|
+-----+------+



In [7]:
#https://www.sparkplayground.com/pyspark-coding-interview-questions/discount-on-products
from pyspark.sql.functions import col
df=spark.read.format('csv').option('header','true').option('inferSchema','true').load('./csvFiles/productDiscount.csv')
df.show()
result_df=df.withColumns({
    'final_price': col('original_price') * ( 1 - (col('discount_percentage')/100) )
})

result_df.select('product_id','product_name','final_price').show()

+----------+------------+--------------+-------------------+
|product_id|product_name|original_price|discount_percentage|
+----------+------------+--------------+-------------------+
|      P001|      Laptop|        1000.0|                 10|
|      P002|       Phone|         800.0|                  5|
|      P003|      Tablet|         600.0|                 15|
|      P004|     Monitor|         300.0|                 20|
|      P005|    Keyboard|         100.0|                 25|
+----------+------------+--------------+-------------------+

+----------+------------+-----------+
|product_id|product_name|final_price|
+----------+------------+-----------+
|      P001|      Laptop|      900.0|
|      P002|       Phone|      760.0|
|      P003|      Tablet|      510.0|
|      P004|     Monitor|      240.0|
|      P005|    Keyboard|       75.0|
+----------+------------+-----------+



In [8]:
# https://www.sparkplayground.com/pyspark-coding-interview-questions/load-and-transform-data-json

df=spark.read.format('json').option('multiline','true').load('./csvFiles/orders.json')
df.show()
df.createOrReplaceTempView('orders')
#using inline
spark.sql("""
select customer_id,order_id,prd.product_name,prd.product_price
from orders
lateral view outer inline(orders.products)  prd
""").show()

# using explode function
spark.sql("""
select customer_id,order_id,prd.product_name,prd.product_price
from orders
lateral view outer explode(orders.products) as prd
""").show()

+-----------+--------+--------------------+
|customer_id|order_id|            products|
+-----------+--------+--------------------+
|       C001|   O1001|[{Laptop, 1500}, ...|
|       C002|   O1002|    [{Keyboard, 75}]|
+-----------+--------+--------------------+

+-----------+--------+------------+-------------+
|customer_id|order_id|product_name|product_price|
+-----------+--------+------------+-------------+
|       C001|   O1001|      Laptop|         1500|
|       C001|   O1001|       Mouse|           25|
|       C002|   O1002|    Keyboard|           75|
+-----------+--------+------------+-------------+

+-----------+--------+------------+-------------+
|customer_id|order_id|product_name|product_price|
+-----------+--------+------------+-------------+
|       C001|   O1001|      Laptop|         1500|
|       C001|   O1001|       Mouse|           25|
|       C002|   O1002|    Keyboard|           75|
+-----------+--------+------------+-------------+



In [12]:
#https://www.sparkplayground.com/pyspark-coding-interview-questions/employees-earning-more-than-average

from pyspark.sql import SparkSession

# Employee DataFrame
employee_data = [
    (1, "Alice", 5000, 1),
    (2, "Bob", 7000, 2),
    (3, "Charlie", 4000, 1),
    (4, "David", 6000, 2),
    (5, "Eve", 8000, 3),
    (6, "Kev", 9000, 3),
    (7, "Mev", 10000, 3),
    (8, "Mob", 12000, 2)
]
employee_columns = ["employee_id", "employee_name", "salary", "department_id"]
emp_df = spark.createDataFrame(employee_data, employee_columns)

# Department DataFrame
department_data = [
    (1, "HR"),
    (2, "Engineering"),
    (3, "Finance")
]
department_columns = ["department_id", "department_name"]
dept_df = spark.createDataFrame(department_data, department_columns)

# Display dataframes (optional)
emp_df.show()
dept_df.show()

emp_df.createOrReplaceTempView('emp')
dept_df.createOrReplaceTempView('dept')
spark.sql("""
with cte1 as (
select department_id,avg(salary) as avg_salary
from emp
group by department_id)
select e.employee_name,d.department_name,e.salary
from emp as e
inner join dept as d on e.department_id=d.department_id
inner join cte1 as c on e.department_id=c.department_id
where e.salary>c.avg_salary
""").show()


+-----------+-------------+------+-------------+
|employee_id|employee_name|salary|department_id|
+-----------+-------------+------+-------------+
|          1|        Alice|  5000|            1|
|          2|          Bob|  7000|            2|
|          3|      Charlie|  4000|            1|
|          4|        David|  6000|            2|
|          5|          Eve|  8000|            3|
|          6|          Kev|  9000|            3|
|          7|          Mev| 10000|            3|
|          8|          Mob| 12000|            2|
+-----------+-------------+------+-------------+

+-------------+---------------+
|department_id|department_name|
+-------------+---------------+
|            1|             HR|
|            2|    Engineering|
|            3|        Finance|
+-------------+---------------+

+-------------+---------------+------+
|employee_name|department_name|salary|
+-------------+---------------+------+
|        Alice|             HR|  5000|
|          Mob|    Engineering

In [None]:
# https://www.sparkplayground.com/pyspark-coding-interview-questions/remove-duplicates
# Define schema

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType
from pyspark.sql.functions import col
from datetime import date
schema = StructType([
    StructField("user_id", IntegerType(), True),
    StructField("user_name", StringType(), True),
    StructField("created_date", DateType(), True),
    StructField("email", StringType(), True)
])

# Sample data
data = [
    (1, "Alice", date(2023, 5, 10), "alice@example.com"),
    (1, "Alice", date(2023, 6, 15), "alice_new@example.com"),
    (2, "Bob", date(2023, 7, 1), "bob@example.com"),
    (3, "Charlie", date(2023, 5, 20), "charlie@example.com"),
    (3, "Charlie", date(2023, 6, 25), "charlie_updated@example.com"),
    (4, "David", date(2023, 8, 5), "david@example.com")
]

# Create DataFrame
user_df = spark.createDataFrame(data, schema)
user_df.show()
print("sorted by created_date desc:")
user_df.orderBy(col('created_date').desc()).show()
print("Removing duplicates based on user_id, keeping the most recent created_date entry:")
user_df.orderBy(col('created_date').desc()).dropDuplicates(subset=['user_id']).show()

+-------+---------+------------+--------------------+
|user_id|user_name|created_date|               email|
+-------+---------+------------+--------------------+
|      1|    Alice|  2023-05-10|   alice@example.com|
|      1|    Alice|  2023-06-15|alice_new@example...|
|      2|      Bob|  2023-07-01|     bob@example.com|
|      3|  Charlie|  2023-05-20| charlie@example.com|
|      3|  Charlie|  2023-06-25|charlie_updated@e...|
|      4|    David|  2023-08-05|   david@example.com|
+-------+---------+------------+--------------------+

sorted by created_date desc:
+-------+---------+------------+--------------------+
|user_id|user_name|created_date|               email|
+-------+---------+------------+--------------------+
|      4|    David|  2023-08-05|   david@example.com|
|      2|      Bob|  2023-07-01|     bob@example.com|
|      3|  Charlie|  2023-06-25|charlie_updated@e...|
|      1|    Alice|  2023-06-15|alice_new@example...|
|      3|  Charlie|  2023-05-20| charlie@example.com

In [None]:
#https://www.sparkplayground.com/pyspark-coding-interview-questions/group-amounts

from pyspark.sql.types import StructType, StructField, StringType, IntegerType



# Define schema
schema = StructType([
    StructField("category", StringType(), True),
    StructField("sub_category", StringType(), True),
    StructField("amount", IntegerType(), True)
])

# Sample data
data = [
    ("Electronics", "Laptop", 1000),
    ("Electronics", "Laptop", 1200),
    ("Furniture", "Chair", 200),
    ("Furniture", "Chair", 150),
    ("Furniture", "Chair", 180),
    ("Furniture", "Chair", 200),
    ("Electronics","iPhone", 600),
    ("Electronics","iPhone", 400),
]

# Create DataFrame
df = spark.createDataFrame(data, schema)
df.show()
df=df.distinct() # remove duplicates first
df.createOrReplaceTempView('products')
spark.sql("""
select category,sub_category,string_agg(amount,',') within group (order by amount asc) as amounts
from products
group by category,sub_category
""").show()


+-----------+------------+------+
|   category|sub_category|amount|
+-----------+------------+------+
|Electronics|      Laptop|  1000|
|Electronics|      Laptop|  1200|
|  Furniture|       Chair|   200|
|  Furniture|       Chair|   150|
|  Furniture|       Chair|   180|
|  Furniture|       Chair|   200|
|Electronics|      iPhone|   600|
|Electronics|      iPhone|   400|
+-----------+------------+------+

+-----------+------------+-----------+
|   category|sub_category|    amounts|
+-----------+------------+-----------+
|Electronics|      iPhone|    400,600|
|  Furniture|       Chair|150,180,200|
|Electronics|      Laptop|  1000,1200|
+-----------+------------+-----------+



In [None]:
# https://www.sparkplayground.com/pyspark-coding-interview-questions/monthly-transaction-summary

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType
from pyspark.sql.functions import col, to_date


# Define schema
schema = StructType([
    StructField("id", IntegerType(), False),
    StructField("country", StringType(), False),
    StructField("state", StringType(), False),
    StructField("amount", IntegerType(), False),
    StructField("trans_date", StringType(), False),  # Initially as String
])

# Sample data
data = [
    (121, "US", "approved", 1000, "2018-12-18"),
    (122, "US", "declined", 2000, "2018-12-19"),
    (123, "US", "approved", 2000, "2019-01-01"),
    (124, "DE", "approved", 2000, "2019-01-07"),
    (125, "US", "approved", 500, "2018-12-20"),
    (126, "US", "declined", 1500, "2019-01-05"),
    (127, "DE", "approved", 1800, "2019-01-10"),
    (128, "FR", "declined", 1200, "2019-02-15"),
    (129, "FR", "approved", 2500, "2019-02-17"),
    (130, "US", "approved", 3000, "2019-02-20"),
    (131, "DE", "declined", 2200, "2019-03-05"),
    (132, "FR", "approved", 1000, "2019-03-10"),
]

# Create DataFrame
df = spark.createDataFrame(data, schema=schema)

# Convert trans_date column to DateType
df = df.withColumn("trans_date", to_date(col("trans_date"), "yyyy-MM-dd"))
# iff is not recognised in the version of spark ,I am using, so using case-when instead
# Show the DataFrame
df.show()
df.createOrReplaceTempView('transactions')
spark.sql("""
select date_format(trans_date,'yyyy-MM') as month,country,count(*) as trans_count,sum(
case 
when state='approved' then 1
else 0
end) as approved_count,sum(amount) as trans_total_amount
from transactions
group by date_format(trans_date,'yyyy-MM'),country
order by month,country
""").show()


+---+-------+--------+------+----------+
| id|country|   state|amount|trans_date|
+---+-------+--------+------+----------+
|121|     US|approved|  1000|2018-12-18|
|122|     US|declined|  2000|2018-12-19|
|123|     US|approved|  2000|2019-01-01|
|124|     DE|approved|  2000|2019-01-07|
|125|     US|approved|   500|2018-12-20|
|126|     US|declined|  1500|2019-01-05|
|127|     DE|approved|  1800|2019-01-10|
|128|     FR|declined|  1200|2019-02-15|
|129|     FR|approved|  2500|2019-02-17|
|130|     US|approved|  3000|2019-02-20|
|131|     DE|declined|  2200|2019-03-05|
|132|     FR|approved|  1000|2019-03-10|
+---+-------+--------+------+----------+

+-------+-------+-----------+--------------+------------------+
|  month|country|trans_count|approved_count|trans_total_amount|
+-------+-------+-----------+--------------+------------------+
|2018-12|     US|          3|             2|              3500|
|2019-01|     DE|          2|             2|              3800|
|2019-01|     US|      

In [41]:
#https://www.sparkplayground.com/pyspark-coding-interview-questions/player-statistics


from pyspark.sql.functions import split, col, expr

# Create players_df
players_data = [
    ("Sachin-IND", 18694, "93/49"),
    ("Ricky-AUS", 11274, "66/31"),
    ("Lara-WI", 10222, "45/21"),
    ("Rahul-IND", 10355, "95/11"),
    ("Jhonnty-SA", 7051, "43/5"),
    ("Hayden-AUS", 8722, "67/19")
]

players_df = spark.createDataFrame(players_data, ["player", "runs", "50s/100s"])

# Create countries_df
countries_data = [
    ("IND", "India"),
    ("AUS", "Australia"),
    ("WI", "WestIndies"),
    ("SA", "SouthAfrica")
]

countries_df = spark.createDataFrame(countries_data, ["SRT", "country"])

#Your solution starts here
players_df.show()
countries_df.show()
players_df.createOrReplaceTempView('players')
countries_df.createOrReplaceTempView('countries')
spark.sql("""
with cte1 as (
select substring(player,1,instr(player,'-')-1) as playername,substring(player,instr(player,'-')+1,len(player))  as country,runs,cast(substring(`50s/100s`,1,instr(`50s/100s`,'/')-1) as int)+cast(substring(`50s/100s`,instr(`50s/100s`,'/')+1,len(50s/100s)) as int) as sum
from players
)
select *
from cte1
where sum>95
order by runs desc
""").show()



+----------+-----+--------+
|    player| runs|50s/100s|
+----------+-----+--------+
|Sachin-IND|18694|   93/49|
| Ricky-AUS|11274|   66/31|
|   Lara-WI|10222|   45/21|
| Rahul-IND|10355|   95/11|
|Jhonnty-SA| 7051|    43/5|
|Hayden-AUS| 8722|   67/19|
+----------+-----+--------+

+---+-----------+
|SRT|    country|
+---+-----------+
|IND|      India|
|AUS|  Australia|
| WI| WestIndies|
| SA|SouthAfrica|
+---+-----------+

+----------+-------+-----+---+
|playername|country| runs|sum|
+----------+-------+-----+---+
|    Sachin|    IND|18694|142|
|     Ricky|    AUS|11274| 97|
|     Rahul|    IND|10355|106|
+----------+-------+-----+---+



In [None]:
# https://www.sparkplayground.com/pyspark-coding-interview-questions/daily-total-sales
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum



data = [
    (1, 101, "2025-05-10", 2, 25.00),
    (1, 102, "2025-05-10", 1, 15.00),
    (1, 103, "2025-05-11", 3, 30.00),
    (2, 101, "2025-05-10", 2, 40.00)
]

columns = ["store_id", "product_id", "sale_date", "quantity_sold", "total_sales"]

df = spark.createDataFrame(data, columns)
df.show()
df.createOrReplaceTempView('sales')
spark.sql("""
select store_id,sale_date,sum(total_sales) as  daily_total_sales
from sales
group by store_id,sale_date
""").show()



+--------+----------+----------+-------------+-----------+
|store_id|product_id| sale_date|quantity_sold|total_sales|
+--------+----------+----------+-------------+-----------+
|       1|       101|2025-05-10|            2|       25.0|
|       1|       102|2025-05-10|            1|       15.0|
|       1|       103|2025-05-11|            3|       30.0|
|       2|       101|2025-05-10|            2|       40.0|
+--------+----------+----------+-------------+-----------+

+--------+----------+-----------------+
|store_id| sale_date|daily_total_sales|
+--------+----------+-----------------+
|       1|2025-05-10|             40.0|
|       1|2025-05-11|             30.0|
|       2|2025-05-10|             40.0|
+--------+----------+-----------------+



In [53]:
# https://www.sparkplayground.com/pyspark-coding-interview-questions/top-products

data = [
    # 2025-05-10
    (1, 101, "2025-05-10", 2, 25.00),
    (2, 101, "2025-05-10", 1, 15.00),
    (1, 102, "2025-05-10", 5, 50.00),
    (3, 103, "2025-05-10", 3, 30.00),
    (2, 104, "2025-05-10", 4, 45.00),
    (1, 105, "2025-05-10", 2, 60.00),
    (1, 105, "2025-05-10", 1, 15.00),
    (1, 106, "2025-05-10", 2, 10.00),

    # 2025-05-11
    (1, 201, "2025-05-11", 1, 20.00),
    (2, 201, "2025-05-11", 2, 40.00),
    (2, 202, "2025-05-11", 2, 40.00),
    (3, 203, "2025-05-11", 3, 35.00),
    (1, 204, "2025-05-11", 1, 25.00),
    (2, 205, "2025-05-11", 2, 30.00),
    (1, 206, "2025-05-11", 4, 50.00),
]

columns = ["store_id", "product_id", "sale_date", "quantity_sold", "total_sales"]

df = spark.createDataFrame(data, columns)

# Your logic goes here to create df_result

df.show()
df.createOrReplaceTempView('sales')
spark.sql("""
with cte1 as (
select product_id,sale_date,sum(total_sales) as total_sales
from sales
group by product_id,sale_date),cte2 as (
select product_id,sale_date,total_sales,row_number() over (order by total_sales desc) as rank from cte1)
select product_id,sale_date,total_sales from cte2 where rank<6
""").show()


+--------+----------+----------+-------------+-----------+
|store_id|product_id| sale_date|quantity_sold|total_sales|
+--------+----------+----------+-------------+-----------+
|       1|       101|2025-05-10|            2|       25.0|
|       2|       101|2025-05-10|            1|       15.0|
|       1|       102|2025-05-10|            5|       50.0|
|       3|       103|2025-05-10|            3|       30.0|
|       2|       104|2025-05-10|            4|       45.0|
|       1|       105|2025-05-10|            2|       60.0|
|       1|       105|2025-05-10|            1|       15.0|
|       1|       106|2025-05-10|            2|       10.0|
|       1|       201|2025-05-11|            1|       20.0|
|       2|       201|2025-05-11|            2|       40.0|
|       2|       202|2025-05-11|            2|       40.0|
|       3|       203|2025-05-11|            3|       35.0|
|       1|       204|2025-05-11|            1|       25.0|
|       2|       205|2025-05-11|            2|       30.