<a href="https://colab.research.google.com/github/Mural1ikk/100-days-of-sql-and-pyspark/blob/main/pyspark_day2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lag, lead, sum as _sum
from pyspark.sql.window import Window

# Initialize Spark session
spark = SparkSession.builder.appName("SalesAnalysis").getOrCreate()

# Sample data
data = [
    ('2025-08-01', 'A', 200),
    ('2025-08-02', 'A', 300),
    ('2025-08-03', 'A', 150),
    ('2025-08-01', 'B', 400),
    ('2025-08-02', 'B', 250),
    ('2025-08-03', 'B', 350)
]

# Create DataFrame
columns = ['SaleDate', 'SalesPerson', 'Amount']
df = spark.createDataFrame(data, columns)


# ⏮️ LAG Function
# ✅ Purpose:
# Retrieves a value from a previous row in the same partition.

#  ⏭️ LEAD Function
# ✅ Purpose:
# Retrieves a value from a future row in the same partition.

#  ✅ Purpose:
# To access data from a different row relative to the current row — either before (LAG) or after (LEAD).
# 🔍 How It Works:
# Returns the value of a column from a previous or next row
# Does not aggregate — it's a positional lookup
# Useful for comparisons, trend analysis, or change detection
# ______________________________________________________________________
# 🧮 Running Totals
# ----------------
# ✅ Purpose:--
# To compute a cumulative sum (or other aggregate) up to the current row within a partition.
# 🔍 How It Works:
# Uses aggregate functions like SUM(), AVG(), etc.
# Requires a window frame (e.g., ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
# Aggregates values from the start of the partition to the current row

# Convert SaleDate to DateType if needed
from pyspark.sql.functions import to_date
df = df.withColumn("SaleDate", to_date(col("SaleDate"), "yyyy-MM-dd"))


# Define window spec partitioned by SalesPerson and ordered by SaleDate
window_spec = Window.partitionBy("SalesPerson").orderBy("SaleDate")

# Add previous day sales, next day sales, and running total
result_df = df.withColumn("PreviousDaySales", lag("Amount").over(window_spec)) \
              .withColumn("NextDaySales", lead("Amount").over(window_spec)) \
              .withColumn("RunningTotal", _sum("Amount").over(window_spec.rowsBetween(Window.unboundedPreceding, Window.currentRow)))

# Show result
result_df.orderBy("SalesPerson", "SaleDate").show()

+----------+-----------+------+----------------+------------+------------+
|  SaleDate|SalesPerson|Amount|PreviousDaySales|NextDaySales|RunningTotal|
+----------+-----------+------+----------------+------------+------------+
|2025-08-01|          A|   200|            NULL|         300|         200|
|2025-08-02|          A|   300|             200|         150|         500|
|2025-08-03|          A|   150|             300|        NULL|         650|
|2025-08-01|          B|   400|            NULL|         250|         400|
|2025-08-02|          B|   250|             400|         350|         650|
|2025-08-03|          B|   350|             250|        NULL|        1000|
+----------+-----------+------+----------------+------------+------------+

