In [0]:
import random
from pyspark.sql import Row

# Create a list of employee records
employees = [
    Row(empid=i, empname=f"Employee_{i}", deptNo=random.randint(1, 3), Salary=random.randint(100000, 400000))
    for i in range(1, 11)
]

# Create a DataFrame
employee_df = spark.createDataFrame(employees)

# Create the employee table
employee_df.write.saveAsTable("employee", mode="overwrite")

# Display the employee table
display(spark.sql("SELECT * FROM employee"))

In [0]:
%sql
--SELECT * FROM employee where Salary = (select min(Salary) from employee);
select *,(Salary-min_sal) sal_delta from (
SELECT a.*,b.Salary as min_sal FROM employee a join (SELECT * FROM employee where Salary = (select min(Salary) from employee)) b on 1=1
)

In [0]:
%sql

--SELECT *,min(salary) over ( order by Salary) as min_sal FROM employee
SELECT *, min(Salary) OVER (partition by deptNo) as min_sal,((Salary - min(Salary) OVER (partition by deptNo))/Salary)*100 as percent_diff FROM employee order by deptNo asc

In [0]:
%sql
SELECT max(salary) FROM employee

In [0]:
%sql
SELECT *,max(salary) over () as max_sal FROM employee

In [0]:
employee_df.display()

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import max,col

window_spec = Window.partitionBy(col("deptNo"))

employee_df1 = employee_df.withColumn("Min_sal",max(col("Salary")).over(window_spec))
employee_df1.display()

In [0]:
data = [("A",1),("A",6),("A",10),("A",4),("A",5)]

df = spark.createDataFrame(data,["id","value"])

df.display()

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import sum,col,row_number

window_spec = (
    Window.partitionBy("id")
    .orderBy("value")
    .rowsBetween(Window.unboundedPreceding,0)
)

df.withColumn("sum",sum("value").over(window_spec)).display()

In [0]:
window_spec = (
    Window.partitionBy("id")
    .orderBy("value")
    .rowsBetween(0,Window.unboundedFollowing)
)

df.withColumn("sum",sum("value").over(window_spec)).display()

In [0]:
window_spec = (
    Window.partitionBy("id")
    .orderBy("value")
    .rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)
)

df.withColumn("sum",sum("value").over(window_spec)).display()

In [0]:
window_spec = (
    Window.partitionBy("id")
    .orderBy("value")
    .rangeBetween(-1,5) # rangeBetween(4,10)
)

df.withColumn("sum",sum("value").over(window_spec)).display()

In [0]:
data = [(2020,100),(2021,300),(2022,50),(2022,50),(2022,60),(2023,700),(2024,900)]

df = spark.createDataFrame(data,["year","revenue"])

df.display()

In [0]:
window_spec = (
    Window.orderBy("year")
    .rowsBetween(Window.unboundedPreceding,0)
)

df.withColumn("cummulative_sum",sum("revenue").over(window_spec)).display()

In [0]:
window_spec = (
    Window.orderBy("year")
    #.rowsBetween(Window.unboundedPreceding,0)
)

df.withColumn("cummulative_sum",sum("revenue").over(window_spec)).display()

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import sum,col,row_number,rank,dense_rank

window_spec = (
    Window.partitionBy("year","revenue")
    .orderBy("revenue")
)

df.withColumn("rn",row_number().over(window_spec)).display()

In [0]:
df_dedup = df.withColumn("rn",row_number().over(window_spec)).filter(col("rn") == 1)
df_dedup.display()

In [0]:
window_spec = (
    Window
    .orderBy("revenue")
)

df.withColumn("rank",rank().over(window_spec)).display()

In [0]:
window_spec = (
    Window
    .orderBy("revenue")
)

df.withColumn("dense_rank",dense_rank().over(window_spec)).display()

In [0]:
window_spec = (
    Window
    .orderBy("revenue")
)

df.withColumn("dense_rank",dense_rank().over(window_spec)).withColumn("rank",rank().over(window_spec)).withColumn("rowNumber",row_number().over(window_spec)).display()

In [0]:
window_spec = (
    Window
    .orderBy(col("Salary").desc())
)

employee_df.withColumn("dense_rank",dense_rank().over(window_spec)).filter(col("dense_rank")==3).display()

In [0]:
window_spec = (
    Window.partitionBy("deptNo")
    .orderBy(col("Salary").desc())
)

employee_df.withColumn("dense_rank",dense_rank().over(window_spec)).filter(col("dense_rank") == 1).display()

In [0]:
data = [(2020,100),(2021,300),(2022,50),(2022,50),(2022,60),(2023,700),(2024,900)]

revenue_df = spark.createDataFrame(data,["year","revenue"])

df.display()
# 1.create a seperate column 'rev_diff' with revenue difference when compared to previous year
# 2.create a seperate column 'P/F' based on 'rev_diff'  to know whether this year got Profit or loss

In [0]:
from pyspark.sql.functions import ntile,lead,lag,first_value,last_value
window_spec = (
    Window.partitionBy("deptNo")
    .orderBy(col("Salary").desc())
)

employee_df.withColumn("ntile_rank",ntile(3).over(window_spec)).display()

In [0]:
##lead(column, offset, default): Value from a subsequent row.
##lag(column, offset, default): Value from a preceding row.
##first_value(column, ignoreNulls): First value in the window.
##last_value(column, ignoreNulls): Last value in the window.

data = [(2020,100),(2021,300),(2022,50),(2022,50),(2022,60),(2023,700),(2024,900)]

revenue_df = spark.createDataFrame(data,["year","revenue"])

revenue_df.display()

In [0]:
from pyspark.sql.functions import ntile,lead,lag,first_value,last_value,col,sum
from pyspark.sql.window import Window
window_spec = (
    Window
    .orderBy(col("year").asc())
)

grouped_df = revenue_df.groupBy("year").agg(sum(col("revenue") ).alias("revenue"))
#grouped_df.display()
grouped_df.withColumn("prev_revenue", lag(col("revenue"), 1, 0).over(window_spec)).withColumn("PnL",col("revenue")-col("prev_revenue")).display()

In [0]:
from pyspark.sql.functions import ntile,lead,lag,first_value,last_value
window_spec = (
    Window
    .orderBy(col("year"))
)

grouped_df = revenue_df.groupBy("year").agg(sum(col("revenue") ).alias("revenue"))
#grouped_df.display()
grouped_df.withColumn("prev_revenue", lag(col("revenue"), 1, 0).over(window_spec)).withColumn("next_revenue", lead(col("revenue"), 1, 0).over(window_spec)).display()

In [0]:
from pyspark.sql.functions import ntile,lead,lag,first_value,last_value,last
window_spec = (
    Window
    .orderBy(col("year").asc()).rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)
)

grouped_df = revenue_df.groupBy("year").agg(sum(col("revenue") ).alias("revenue"))
#grouped_df.display()
result_df = grouped_df.withColumn("first_revenue", first_value(col("revenue")).over(window_spec)).withColumn("last_revenue", last_value(col("revenue")).over(window_spec))

#result_df = grouped_df.withColumn("last_revenue", last_value(col("revenue"),True).over(window_spec))
result_df.display()

In [0]:
data = [('2025-04-01',100,'Credit'),('2025-04-01',300,'Debit'),('2025-04-01',50,'Credit'),('2025-04-02',50,'Debit'),('2025-04-02',60,'Credit'),('2025-04-03',700,'Debit'),('2025-04-03',900,'Credit')]

revenue_df = spark.createDataFrame(data,["date","transaction","trans_type"])

revenue_df.display()


In [0]:
from pyspark.sql import Row

data = [
    Row(employee_id=1, name="John Doe", department="HR", salary=50000),
    Row(employee_id=2, name="Jane Smith", department="Finance", salary=None),
    Row(employee_id=3, name=None, department="IT", salary=70000),
    Row(employee_id=4, name="Mike Johnson", department=None, salary=60000),
    Row(employee_id=5, name="Emily Davis", department="Marketing", salary=None),
    Row(employee_id=None, name="Anna Brown", department="Sales", salary=55000),
    Row(employee_id=None, name=None, department=None, salary=None)
]

employee_df = spark.createDataFrame(data)

display(employee_df)

In [0]:
#thresh considers only no of non null values
employee_df.dropna(how ='any',thresh = 4).display()

In [0]:
employee_df.dropna(how = 'all' ,subset = ["employee_id","name"]).display()