In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import *
import pandas as pd

In [0]:

# Create a small pandas DataFrame
df = pd.DataFrame({
    "name": ["Alice", "Bob", "Charlie"],
    "age": [25, 30, 35]
})

df.head(2)



In [0]:
# Sample data
data = [
    ("P1", "North", "Electronics", 1200.50, "2025-04-01"),
    ("P2", "North", "Electronics", 950.75, "2025-04-01"),
    ("P3", "North", "Electronics", 850.00, "2025-04-01"),
    ("P4", "North", "Electronics", 200.00, "2025-04-01"),
    ("P1", "South", "Furniture", 500.00, "2025-04-01"),
    ("P5", "South", "Furniture", 1500.00, "2025-04-01"),
    ("P6", "South", "Furniture", 700.00, "2025-04-01"),
    ("P7", "East", "Grocery", 300.00, "2025-04-01"),
    ("P8", "East", "Grocery", 1200.00, "2025-04-01"),
    ("P9", "East", "Grocery", 1000.00, "2025-04-01"),
    ("P10", "East", "Grocery", 50.00, "2025-04-01"),
]

# Define schema with sale_date as string
schema = StructType([
    StructField("product_id", StringType(), True),
    StructField("region", StringType(), True),
    StructField("category", StringType(), True),
    StructField("sale_amount", DoubleType(), True),
    StructField("sale_date", StringType(), True)
])

# Create DataFrame
sales_df = spark.createDataFrame(data, schema)
sales_df = sales_df.withColumn("sale_date", to_date("sale_date"))

# Show the DataFrame
sales_df.show(truncate=False)


In [0]:
"""
For each region and category, find:
Top 3 selling products by revenue
"""
# Aggregating total revenue for each product in category and region
aggregated_df = sales_df.groupBy("region", "category", "product_id").agg(
    sum("sale_amount").alias("total_revenue")
)

# Assigning ranks for products
window_spec = Window.partitionBy("region", "category").orderBy(desc("total_revenue"))
ranking_df = aggregated_df.withColumn(
    "rank",
    dense_rank().over(window_spec)
)

top3_products_df = ranking_df.filter(col("rank") <= 3)
top3_products_df.show()

In [0]:
"""
Average revenue of top 3 products
"""
# Join original sales data with top 3 products (based on total revenue)
top3_sales_df = sales_df.join(top3_products_df.select("region", "category", "product_id"), 
                              on=["region", "category", "product_id"], how="inner")

# Calculate average revenue of those top 3 products per region-category
avg_top3_df = top3_sales_df.groupBy("region", "category").agg(
    avg("sale_amount").alias("avg_top3_revenue")
)
avg_top3_df.show()

In [0]:
# Percentage contribution of each top product to its category-region revenue
# Aggregating total revenue for each product in category and region
aggregated_df = sales_df.groupBy("region", "category", "product_id").agg(
    sum("sale_amount").alias("top_product_revenue")
)

# Assigning ranks for products
window_spec = Window.partitionBy("region", "category").orderBy(desc("top_product_revenue"))
ranking_df = aggregated_df.withColumn(
    "rank",
    dense_rank().over(window_spec)
)

top3_products_df = ranking_df.filter(col("rank") <= 3)

# Aggregating total reveenue across each region  and category
total_reg_cate_df = sales_df.groupBy("region", "category").agg(
    sum("sale_amount").alias("total_revenue")
)

# Joining total revenue and catgeory region level revenue
joined_df = total_reg_cate_df.join(top3_products_df, on=["region", "category"], how="inner")

# Contribution of top product in each category
top_product_cont_df = joined_df.withColumn(
    "contribution",
    (col("top_product_revenue") / col("total_revenue")) * 100
)
top_product_cont_df.show()

In [0]:
"""
Detect First and Last Purchase of Customer (Lifecycle Analysis)
Dataset:
orders: order_id, customer_id, order_date, order_amount

🛠️ Task:
For each customer:
Get first and last purchase date
Calculate days between first and last order
Flag if a customer is "One-Time Buyer" or "Repeat Buyer"
Also, calculate average order value
"""

In [0]:
# Sample schema for the 'orders' DataFrame
schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("order_date", StringType(), True),
    StructField("order_amount", FloatType(), True)
])

# Sample data for the 'orders' DataFrame
data = [
    (1, 101, "2024-01-01", 150.0),
    (2, 101, "2024-03-15", 200.0),
    (3, 102, "2024-01-10", 300.0),
    (4, 103, "2024-01-12", 250.0),
    (5, 104, "2024-02-01", 120.0),
    (6, 104, "2024-04-10", 180.0)
]

# Create the DataFrame
orders_df = spark.createDataFrame(data, schema)

# Convert the 'order_date' column to date format
orders_df = orders_df.withColumn("order_date", col("order_date").cast(DateType()))
orders_df.show()

In [0]:
# Get first and last purchase date
first_last_df = orders_df.groupBy("customer_id").agg(
    min("order_date").alias("first_purchase_date"),
    max("order_date").alias("last_purchase_date")
)
# Calculate days between first and last order
day_diff_df = first_last_df.withColumn(
    "day_diff",
    datediff(col("last_purchase_date"), col("first_purchase_date"))
)
day_diff_df.show()

# Flag if a customer is "One-Time Buyer" or "Repeat Buyer"
customer_analysis_df = orders_df.groupBy("customer_id").agg(
    countDistinct("order_id").alias("total_orders")
)
customers_flags_df = customer_analysis_df.withColumn(
    "buyer_type",
    when(col("total_orders") > 1, "Repeat_Buyer").otherwise("One_Time_Buyer")
)
customer_flags_df.show()

# Also, calculate average order value
avg_order_val_df = orders_df.groupBy("customer_id").agg(
    avg("order_amount").alias("avg_order_value")
)
window_spec = Window.orderBy(desc("avg_order_value"))
ranking_df = avg_order_val_df.withColumn("rank", dense_rank().over(window_spec))
ranking_df.show()

In [0]:
"""
Time Series Trend Analysis Per Product
Dataset:
daily_sales: product_id, sale_date, units_sold

🛠️ Task:
For each product:
Calculate 7-day moving average of sales
Identify upward or downward trend using 3 consecutive increases/decreases
Flag spike days (where sales > 2 * moving avg)"""

In [0]:
# Define schema
schema = StructType([
    StructField("product_id", IntegerType(), True),
    StructField("sale_date", StringType(), True),
    StructField("units_sold", IntegerType(), True)
])

# Sample data
data = [
    (101, "2024-04-01", 50),
    (101, "2024-04-02", 55),
    (101, "2024-04-03", 53),
    (101, "2024-04-04", 60),
    (101, "2024-04-05", 65),
    (101, "2024-04-06", 70),
    (101, "2024-04-07", 75),
    (101, "2024-04-08", 150),  # Spike
    (101, "2024-04-09", 80),
    (102, "2024-04-01", 20),
    (102, "2024-04-02", 22),
    (102, "2024-04-03", 25),
    (102, "2024-04-04", 23),
    (102, "2024-04-05", 24),
    (102, "2024-04-06", 21),
    (102, "2024-04-07", 19),
    (102, "2024-04-08", 18),
    (102, "2024-04-09", 35)  # Spike
]

# Create DataFrame
daily_sales_df = spark.createDataFrame(data, schema)
daily_sales_df = daily_sales_df.withColumn("sale_date", to_date("sale_date"))
daily_sales_df.show()

In [0]:
# Calculate 7-day moving average of sales
window = Window.partitionBy("product").orderBy("sale_date").rowsBetween(-6, 0)
moving_avg_df = daily_sales_df.withColumn(
    "moving_avg",
    avg("units_sold").over(window)
)

# Identify upward or downward trend using 3 consecutive increases/decreases
window_spec = Window.partitionBy("product").orderBy("sale_date")
product_trends_df = daily_sales_df.withColumn(
    "prev_day_sale",
    lag("units_sold").over(window_spec)
)
sales_analysis_df = product_trends_df.withColumn(
    "isHigher",
    when(col("units_sold") > col("prev_day_sale"), "Yes").otherwise("No")
)

# Identifying consequtive streaks
windows = Window.partitionBy("product").orderBy("sale_date")
random_rows_df = sales_analysis_df.withColumn(
    "rn", 
    row_number().over(windows)
).filter("prev_day_sale is not null")

# Creating groups to find consequtives by substracting date and rownumber
conse_grop_df = random_rows_df.withColumn(
    "grouping",
    date_sub("sale_date", "rn")
)
aggregated_df = conse_grop_df.groupBy("product_id", "grouping").agg(
    sum(when(col("isHigher") == 'Yes'),1).alias("total_growths")
)

# Assigning ranks for consequtives for each product
window_rank = Window.partitionBy("product").orderBy(desc("total_growths"))
product_cons_rank_df = aggregated_df.withColumn("rank", dense_rank().over(window_rank))
product_cons_rank_df.show()

# Flag spike days (where sales > 2 * moving avg)

# Getting only last row per product to get moving average to join with original dataframe
window_last = Window.partitionBy("product").orderBy("sale_date").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
product_moving_avg_df = moving_avg_df.withColumn("last_mov_avg", last("moving_avg").over(window_last))
product_moving_avg_df = product_moving_avg_df.distinct()

# Joining with original dataframe
joined_df = daily_sales_df.join(product_moving_avg_df.select("product", "last_mov_avg"), on="product_id", how="inner")

# Filetering sales where greater than moving average
filtered_df = joined_df.filter(col("units_sold") > 2 * col("last_mov_avg"))
filtered_df.show()

In [0]:
data = [
    {
        "user_id": "u1",
        "events": [
            {"event_type": "click", "timestamp": "2024-01-01T10:00:00"},
            {"event_type": "purchase", "timestamp": "2024-01-01T10:05:00"},
        ],
    }
]

schema = StructType(
    [
        StructField("user_id", StringType(), True),
        StructField(
            "events",
            ArrayType(
                StructType(
                    [
                        StructField("event_type", StringType(), True),
                        StructField("timestamp", StringType(), True),
                    ]
                )
            ),
            True,
        ),
    ]
)

json_df = spark.createDataFrame(data, schema)

# Exploding the json dataframe with explode function
exploded_df = json_df.withColumn("event", explode("events"))

final_df = exploded_df.select(
    col("user_id"),
    col("event.event_type").alias("event_type"),
    col("event.timestamp").alias("timestamp"),
)

final_df = final_df.withColumn("timestamp", to_timestamp("timestamp"))

# For each event, calculate time since previous event (per user)
window_spec = Window.partitionBy("user_id").orderBy("timestamp")
time_diff_df = final_df.withColumn(
    "previous_event_time", lag("timestamp").over(window_spec)
).withColumn(
    "time_diff_seconds",
    unix_timestamp(col("timestamp")) - unix_timestamp(col("previous_event_time")),
).withColumn(
    "time_diff_readable",
    from_unixtime(col("time_diff_seconds"), "HH:mm:ss")
)
time_diff_df.filter("previous_event_time IS Not Null").show()

In [0]:
"""
Complex Pivot with Aggregation and Ratio Calculation
Dataset:
transactions: user_id, category, amount, transaction_date

🛠️ Task:
Pivot to get total amount spent per category as separate columns
Add total amount and ratio of each category to total
"""
# Sample data
data = [
    ("u1", "grocery", 120.0, "2024-01-01"),
    ("u1", "electronics", 500.0, "2024-01-02"),
    ("u1", "grocery", 80.0, "2024-01-03"),
    ("u2", "grocery", 200.0, "2024-01-01"),
    ("u2", "fashion", 300.0, "2024-01-02"),
    ("u3", "fashion", 150.0, "2024-01-03"),
    ("u3", "electronics", 700.0, "2024-01-04")
]

# Define schema
schema = StructType([
    StructField("user_id", StringType(), True),
    StructField("category", StringType(), True),
    StructField("amount", DoubleType(), True),
    StructField("transaction_date", StringType(), True)  # You can also make it DateType if needed
])

# Create DataFrame
transactions_df = spark.createDataFrame(data, schema)
transactions_df = transactions_df.withColumn("transaction_date", to_date("transaction_date"))

transactions_df.show(truncate=False)

In [0]:
# Pivoted the dataframe to find each category amount
pivoted_df = transactions_df.groupBy("user_id").pivot("category").sum("amount")

# Aggregating total amount per user
aggregated_df = transactions_df.groupBy("user_id").agg(
    sum("amount").alias("total_amount")
)

# Joining both pivoted and aggregated dataframes
joined_df = pivoted_df.join(aggregated_df, on="user_id", how="inner")
joined_df.show()

# Computing ratios for each category
ratio_df = (joined_df
            .withColumn("electronics_ratio", round(coalesce(col("electronics"), lit(0))/ col("total_amount"), 2))
            .withColumn("fashion_ratio", round(coalesce(col("fashion"), lit(0))/ col("total_amount"), 2))
            .withColumn("grocery_ratio", round(coalesce(col("grocery"), lit(0))/ col("total_amount"), 2))
            )

ratio_df.show()            

In [0]:
# Dynamic way to calculate category ratios 
from pyspark.sql.functions import col, round, coalesce, lit

# Existing columns
basic_columns = joined_df.columns

# List of category columns (excluding user_id and total_amount)
category_columns = [c for c in basic_columns if c not in ("user_id", "total_amount")]

# Start with existing columns
final_cols = [col("user_id")] + [col(c) for c in category_columns] + [col("total_amount")]

# Add ratio columns dynamically
for cat in category_columns:
    ratio_col_name = f"{cat}_ratio"
    final_cols.append(
        round(coalesce(col(cat), lit(0)) / col("total_amount"), 2).alias(ratio_col_name)
    )

# Select all
dynamic_ratio_df = joined_df.select(*final_cols)

dynamic_ratio_df.show()


In [0]:
"""
Financial Transactions – Fraud Pattern Detection
Dataset:
transactions: transaction_id, account_id, amount, timestamp, location

🛠️ Tasks:
For each account:
Find if more than 3 transactions occur within 1 minute → mark as "suspicious burst"
Flag any transaction where location changes between two transactions < 5 minutes apart
Output flagged transactions with reason (burst, suspicious_location)
"""

In [0]:
# Define schema
schema = StructType([
    StructField("transaction_id", IntegerType(), True),
    StructField("account_id", IntegerType(), True),
    StructField("amount", IntegerType(), True),
    StructField("timestamp", StringType(), True),
    StructField("location", StringType(), True)
])

# Sample data
data = [
    (1, 101, 500, "2025-04-29 10:00:00", "New York"),
    (2, 101, 300, "2025-04-29 10:01:00", "New York"),
    (3, 101, 100, "2025-04-29 10:02:00", "New York"),
    (4, 101, 200, "2025-04-29 10:03:00", "New York"),
    (5, 101, 150, "2025-04-29 10:05:00", "Los Angeles"),
    (6, 102, 700, "2025-04-29 09:50:00", "Chicago"),
    (7, 102, 300, "2025-04-29 09:51:00", "Chicago"),
    (8, 102, 200, "2025-04-29 09:52:00", "Chicago"),
    (9, 102, 150, "2025-04-29 09:53:00", "Chicago"),
    (10, 102, 100, "2025-04-29 09:54:00", "Chicago"),
    (11, 103, 1000, "2025-04-29 10:30:00", "San Francisco"),
    (12, 103, 200, "2025-04-29 10:35:00", "San Francisco"),
    (13, 103, 300, "2025-04-29 10:36:00", "San Francisco"),
    (14, 103, 100, "2025-04-29 10:40:00", "Los Angeles")
]

# Create DataFrame
df = spark.createDataFrame(data, schema)
df = df.withColumn("timestamp", to_timestamp("timestamp"))

# Show the DataFrame
df.show(truncate=False)

In [0]:
# Find if more than 3 transactions occur within 1 minute → mark as "suspicious burst"

# Converting timestamp to seconds using unix_timestamp function
df = df.withColumn("time_in_seconds", unix_timestamp(col("timestamp")))

# window specification for rolling 1 minute window
window_spec = Window.partitionBy("account_id").orderBy("timestamp").rangeBetween(-60, 0)

# Counting total transactions per account in rolling 1 window
aggregated_df = df.withColumn(
    "total_transactions", 
    count("transaction_id").over(window_spec)
)

# Filtering transactions less than or equal to 3 to flag them as suspecious
suspecious_df = aggregated_df.filter(col("total_transactions") > 3)
suspecious_df = suspecious_df.withColumn("transaction_flag", lit("suspecious_burst"))

suspecious_burst_df = suspecious_df.select(
    col("transaction_id"),
    col("account_id"),
    col("timestamp"),
    col("location"),
    col("transaction_flag")
)

# Flag any transaction where location changes between two transactions < 5 minutes apart

# window specification to find previous transaction location and timestamp
window = Window.partitionBy("account_id").orderBy("timestamp")

# Creating prev location and prev timestamp columns
transaction_details_df = df.withColumn(
    "prev_location",
    lag("location").over(window)
).withColumn(
    "prev_timestamp",
    lag("timestamp").over(window)
)

# Identifying differences between timestamp and prevtimestamp 
transaction_analysis_df = transaction_details_df.withColumn(
    "time_diff",
    (unix_timestamp(col("timestamp")) - unix_timestamp(col("prev_timestamp")))
)

# Filtering suspecious locations transactions
suspecious_loc_df = transaction_analysis_df.filter(
    (col("time_diff") < 300) & 
    (col("location") != col("prev_location"))
)

suspecious_loc_df = suspecious_loc_df.withColumn("transaction_flag", lit("suspecious_location"))

suspecious_loc_df = suspecious_loc_df.select(
    col("transaction_id"),
    col("account_id"),
    col("timestamp"),
    col("location"),
    col("transaction_flag")
)

suspecious_trans_df = suspecious_burst_df.union(suspecious_loc_df)
suspecious_trans_df.show()

In [0]:
"""
Build Custom Aggregation Framework
Dataset:
metrics: system_id, metric_type, metric_value, timestamp

🛠️ Tasks:
Build a function that can:
Accept metric type(s) and a window (daily/weekly)
Return aggregate summary (avg, min, max, count)
Simulate custom rollups: CPU → daily, Memory → weekly, Disk → monthly
You’ll use parameterized functions, groupBy with truncation, and pivot.
"""

In [0]:
# Initialize Spark session
# spark = SparkSession.builder.appName("CustomAggregation").getOrCreate()

# Define schema
schema = StructType([
    StructField("system_id", StringType(), True),
    StructField("metric_type", StringType(), True),
    StructField("metric_value", FloatType(), True),
    StructField("timestamp", StringType(), True)
])

# Sample data
data = [
    ("sys1", "CPU", 75.0, "2025-05-01 09:00:00"),
    ("sys1", "Memory", 32.0, "2025-05-01 09:00:00"),
    ("sys1", "Disk", 500.0, "2025-05-01 09:00:00"),
    ("sys1", "CPU", 78.0, "2025-05-01 10:00:00"),
    ("sys1", "Memory", 30.0, "2025-05-01 10:00:00"),
    ("sys2", "CPU", 80.0, "2025-05-01 09:00:00"),
    ("sys2", "Memory", 28.0, "2025-05-01 09:00:00"),
    ("sys2", "Disk", 450.0, "2025-05-01 09:00:00"),
    ("sys2", "CPU", 85.0, "2025-05-02 09:00:00"),
    ("sys2", "Memory", 29.0, "2025-05-02 09:00:00"),
]

# Create DataFrame
df = spark.createDataFrame(data, schema)

# Convert timestamp string to actual timestamp format
df = df.withColumn("timestamp", col("timestamp").cast(TimestampType()))

# Show DataFrame
df.show(truncate=False)

In [0]:
class CustomMetricAggregation:
    def read_data(self, path):
        try:
            df = spark.read.format('csv').option('header', True).load(path)
            df = df.withColumn("metric_value", col("metric_value").cast("double")) \
                   .withColumn("timestamp", col("timestamp").cast("timestamp"))
            return df
        except Exception as e:
            print(f"Error occurring while reading: {e}")
            return None
        
    def metric_summary(self, df, metric_types: list, window_type: str):
        result = {}

        # Convert window_type string into Spark-supported values
        if window_type == 'daily':
            trunc_format = 'day'
        elif window_type == 'weekly':
            trunc_format = 'week'
        elif window_type == 'monthly':
            trunc_format = 'month'
        else:
            raise ValueError("Unsupported window_type. Choose from 'daily', 'weekly', 'monthly'.")

        df_filtered = df.filter(col("metric_type").isin(metric_types))
        df_truncated = df_filtered.withColumn("window_start", date_trunc(trunc_format, col("timestamp")))

        # Aggregate summary
        aggregated_df = df_truncated.groupBy("system_id", "metric_type", "window_start").agg(
            avg("metric_value").alias("avg_value"),
            min("metric_value").alias("min_value"),
            max("metric_value").alias("max_value"),
            count("metric_value").alias("count")
        )

        # Pivot the metric_type column
        pivoted_df = df_truncated.groupBy("system_id", "window_start").pivot("metric_type").agg(
            avg("metric_value").alias("avg_value")
        )

        result.update({
            "aggregated_df": aggregated_df,
            "pivoted_df": pivoted_df
        })

        return result


In [0]:
"""
Time Between Repeat Purchases by Category
Dataset:
purchases: user_id, product_category, purchase_time

🛠️ Tasks:
For each user and category:
Calculate days between purchases
Flag customers who buy same category repeatedly within short spans (< 5 days)
Output top 5 categories with the most repeat buyers
"""

In [0]:
data = [
    ("u1", "Electronics", "2023-12-01 10:00:00"),
    ("u1", "Electronics", "2023-12-03 09:30:00"),
    ("u1", "Groceries", "2023-12-05 11:00:00"),
    ("u1", "Electronics", "2023-12-10 12:15:00"),
    ("u2", "Groceries", "2023-12-02 14:00:00"),
    ("u2", "Groceries", "2023-12-04 14:00:00"),
    ("u2", "Electronics", "2023-12-05 16:00:00"),
    ("u3", "Books", "2023-12-01 09:00:00"),
    ("u3", "Books", "2023-12-20 09:00:00"),
    ("u3", "Books", "2023-12-22 10:00:00"),
    ("u4", "Clothing", "2023-12-01 15:00:00"),
    ("u4", "Clothing", "2023-12-02 16:00:00"),
    ("u5", "Groceries", "2023-12-06 10:00:00"),
]

schema = StructType([
    StructField("user_id", StringType(), True),
    StructField("product_category", StringType(), True),
    StructField("purchase_time", StringType(), True),
])

purchases_df = spark.createDataFrame(data, schema)

purchases_df = purchases_df.withColumn("purchase_time", to_timestamp("purchase_time", "yyyy-MM-dd HH:mm:ss"))

purchases_df.show(truncate=False)

In [0]:
class categoryRepeatPurchases:
    def read_data(self, path):
        try:
            df = spark.read.format('csv').option('header', True).load(path)
            return df
        except Exception as e:
            print(f"Error occured while file reading {e}")
            return None
        
    def daysDiffInPurchases(self, df):
        # Window specification to get previous purchase time
        window_spec = Window.partitionBy("user_id").orderBy("purchase_time")
        # Creating previous purchase time column by lag 
        prev_pur_df = df.withColumn(
            "prev_pur_time", 
            lag("purchase_time").over(window_spec)
        )
        # Identifying time differences by using unix timestamp
        time_diff_df = prev_pur_df.withColumn(
            "time_diff",
            (unix_timestamp(col("purchase_time")) - unix_timestamp(col("prev_pur_time"))) / 3600
        )    
        return time_diff_df
    
    def flagingCustomers(self, df):
        # Window spec to get previous category column and purchase time
        window = Window.partitionBy("user_id", "product_category").orderBy("purchase_time")
        # Creating columns prev category and prev time
        updated_df = df.withColumn(
            "prev_pur_time",
            lag("purchase_time").over(window)
        )
        # Identifying difference between current and previous purchases
        diff_df = updated_df.withColumn(
            "day_diff",
            datediff("purchase_time", "prev_pur_time")
        )
        # Filetering rows only having matching categories and day diff is 5 days
        filtered_df = diff_df.filter(
            col("day_diff") <= 5
        )
        # Aggregating total records for each user 
        aggregated_df = filtered_df.groupBy("user_id", "product_category").agg(
            count("*").alias("total_rows")
        )
        # Flaging users where total rows > 1
        flagging_df = aggregated_df.withColumn(
            "is_repeat_buyer",
            when(col("total_rows") > 1, 'Yes').otherwise('No')
        )
        return flagging_df
    
    def top5Categories(self, df):
        # Aggregating user level to get repeat buyers
        user_agg_df = df.groupBy("user_id", "product_category").agg(
            count("*").alias("total_rows")
        )
        # Filtering users whose count > 1
        filtered_df = user_agg_df.filter(col("total_rows") > 1)
        # Aggregating unique repeat users at category level
        cat_agg_df = filtered_df.groupBy("product_category").agg(
            countDistinct("user_id").alias("repeat_buyers")
        )
        # Ranking categories based on total repeat buyers
        window_rank = Window.orderBy(desc("repeat_buyers"))
        ranking_df = cat_agg_df.withColumn(
            "rank",
            dense_rank().over(window_rank)
        )
        # Filtering top 5 categories
        top5_categories_df = ranking_df.filter(col("rank") <= 5)
        return top5_categories_df
    
# Instantiting class    
user_beha_inst = categoryRepeatPurchases()

# Reading file
df = user_beha_inst.read_data("dbfs://FileStore/raw/user_purchases")

# Reading methods
time_diff_df = user_beha_inst.daysDiffInPurchases(df)
flagging_df = user_beha_inst.flagingCustomers(df)
top5_categories_df = user_beha_inst.top5Categories(df)    


In [0]:
"""
 Course Completion & Dropout Pattern (Education Platform)
Dataset:
learning_events: user_id, course_id, module_number, event_type, event_time

🛠️ Tasks:
For each course:
Calculate average module completion rate
Identify modules with highest drop-off
Flag users who skipped modules (missing module_number in sequence)
"""

In [0]:
# Create SparkSession
# spark = SparkSession.builder.appName("CourseCompletion").getOrCreate()

# Define schema
schema = StructType([
    StructField("user_id", StringType(), True),
    StructField("course_id", StringType(), True),
    StructField("module_number", IntegerType(), True),
    StructField("event_type", StringType(), True),
    StructField("event_time", StringType(), True)  # Initially as string
])

# Sample data
data = [
    ("U1", "C1", 1, "view", "2025-05-01 10:00:00"),
    ("U1", "C1", 2, "complete", "2025-05-01 10:30:00"),
    ("U1", "C1", 4, "view", "2025-05-01 11:00:00"),  # skipped module 3
    ("U2", "C1", 1, "view", "2025-05-02 09:00:00"),
    ("U2", "C1", 2, "drop", "2025-05-02 09:20:00"),
    ("U3", "C1", 1, "complete", "2025-05-03 08:00:00"),
    ("U3", "C1", 2, "complete", "2025-05-03 08:30:00"),
    ("U3", "C1", 3, "complete", "2025-05-03 09:00:00"),
    ("U3", "C1", 4, "complete", "2025-05-03 09:30:00"),
    ("U1", "C2", 1, "view", "2025-05-04 10:00:00"),
    ("U1", "C2", 2, "drop", "2025-05-04 10:15:00"),
    ("U2", "C2", 1, "complete", "2025-05-05 11:00:00"),
    ("U2", "C2", 2, "complete", "2025-05-05 11:30:00"),
    ("U2", "C2", 3, "view", "2025-05-05 12:00:00")
]

# Create DataFrame
learning_events_df = spark.createDataFrame(data, schema)

learning_events_df = learning_events_df.withColumn("event_time", to_timestamp("event_time", "yyyy-MM-dd HH:mm:ss"))


# Show raw DataFrame with string timestamps
learning_events_df.show(truncate=False)


In [0]:
# Flag users who skipped modules (missing module_number in sequence)

# Window specification to get previous module number
window_spec = Window.partitionBy("user_id", "course_id").orderBy("module_number")
prev_modules_df = learning_events_df.withColumn(
    "prev_module",
    lag("module_number").over(window_spec)
)

# Identifying gaps in module numbers
module_gaps_df = prev_modules_df.withColumn(
    "module_gap",
    (col("module_number") - col("prev_module"))
)

# Flaging users who has > 1 gap between modules
skipped_users_df = module_gaps_df.withColumn(
    "is_skipped",
    when(col("module_gap") > 1, 'Yes').otherwise('No')
)

# Filter the users whose have is_skipped is yes
filtered_df = skipped_users_df.filter(col("is_skipped") == 'Yes').distinct()
filtered_df.show()

In [0]:
# Identify modules with highest drop-off

# Aggregating total distinct users who dropped the module
agg_df = learning_events_df.groupBy("course_id", "module_number").agg(
    countDistinct(when(col("event_type") == "drop", col("user_id"))).alias("users_dropped")
)

# Ranking modules on each course based on users dropped
window_rank = Window.partitionBy("course_id").orderBy(desc("users_dropped"))
ranking_modules_df = agg_df.withColumn(
    "rank",
    dense_rank().over(window_rank)
)

# Filtered modules which has highest rank
filtered_df = ranking_modules_df.filter(col("rank") == 1)
filtered_df.show()

In [0]:
# Calculate average module completion rate

# Aggregating completion and total rows 
agg_df = learning_events_df.groupBy("course_id", "module_number").agg(
    countDistinct(when(col("event_type") == "complete", col("user_id"))).alias("users_completed"),
    countDistinct("user_id").alias("users_total")
)

# Calculating completion rate
comp_rate_df = agg_df.withColumn(
    "completion_rate",
    (col("total_com_rows") / col("total_rows"))
)

# Aggregating average completion rate for each course
avg_com_rate_df = comp_rate_df.groupBy("course_id").agg(
    avg("completion_rate").alias("avg_comp_rate")
)
avg_com_rate_df.show()

In [0]:
"""
Detect Salary Leaks in Departments
Dataset:
employees: emp_id, name, department, salary, joining_date

🛠️ Tasks:
For each department:
Calculate average salary
Identify employees with salary 3x more than average (potential anomaly)
Flag departments with such anomalies and report total impact
"""

In [0]:
# Define schema with joining_date as string
schema = StructType([
    StructField("emp_id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("department", StringType(), True),
    StructField("salary", DoubleType(), True),
    StructField("joining_date", StringType(), True)
])

# Create sample data with dates as string
data = [
    ("E001", "Alice",   "HR",        50000, "2020-01-15"),
    ("E002", "Bob",     "HR",        52000, "2021-03-12"),
    ("E003", "Carol",   "HR",       180000, "2019-07-23"),  # anomaly
    ("E004", "David",   "IT",        70000, "2018-10-01"),
    ("E005", "Eve",     "IT",        73000, "2020-06-20"),
    ("E006", "Frank",   "IT",       210000, "2021-08-30"),  # anomaly
    ("E007", "Grace",   "Finance",   60000, "2019-01-10"),
    ("E008", "Hank",    "Finance",   62000, "2020-11-11"),
    ("E009", "Ivy",     "Finance",   61000, "2022-01-05"),
    ("E010", "John",    "Finance",   65000, "2023-04-19"),
]

# Create DataFrame
employees_df = spark.createDataFrame(data, schema)

# Convert joining_date string to timestamp
employees_df = employees_df.withColumn("joining_date", to_timestamp("joining_date", "yyyy-MM-dd"))

employees_df.show(truncate=False)


In [0]:
# Calculate average salary
avg_dep_sal_df = employees_df.groupBy("department").agg(
    avg("salary").alias("avg_dept_sal")
)
avg_dep_sal_df.show()

# Identify employees with salary 3x more than average (potential anomaly)

# Joining both original dataframe with avg dep sal dataframe
joined_df = employees_df.join(avg_dep_sal_df, on="department", how="inner")

# Filtering employees whose salary is greater than 3 times of there dep avg salary
filtered_df = joined_df.filter(col("salary") > 3 * col("avg_dept_sal"))
filtered_df.show()

# Flagging departments which has anomalies
flaging_df = joined_df.withColumn(
    "is_anomalied",
    when(col("salary") > 3 * col("avg_dept_sal"), 'yes').otherwise('No')
)
anomalied_dept_df = flaging_df.select("department", "is_anomalied").filter(col("is_anomalied") == 'Yes')

# Filtered only anomalies records from dataframe
flaging_df = flaging_df.filter(col("is_anomalied") == 'Yes')

# Total impact over anomalies departments
dept_analysis_df = flaging_df.groupBy("department").agg(
    sum("salary").alias("total_anomalies_amount"),
    count("employee_id").alias("total_employees")
)

# Joining with dept avg dataframe to get avg salary to calculate impact
dept_analysis_with_avg_df = dept_analysis_df.join(avg_dep_sal_df, on="department", how="inner")
dept_analysis_with_avg_df = dept_analysis_with_avg_df.withColumn(
    "ExpectedSalary",
    (col("avg_dept_sal") * col("total_employees"))
)
total_impact_df = dept_analysis_with_avg_df.withColumn(
    "TotalImpact",
    (col("total_anomalies_amount") - col("ExpectedSalary"))
)
total_impact_df.show()

In [0]:
"""
Employee Promotion Path Tracking
Dataset:
employee_roles: emp_id, role, start_date, end_date

🛠️ Tasks:
For each employee:
Track complete promotion path in order.
Calculate how long they stayed in each role.
Find employees with more than 2 promotions in 3 years.
"""

In [0]:
data = [
    ("E001", "Junior Developer", "2018-01-01", "2019-06-30"),
    ("E001", "Developer",        "2019-07-01", "2020-12-31"),
    ("E001", "Senior Developer", "2021-01-01", "2022-06-30"),
    ("E001", "Tech Lead",        "2022-07-01", "2024-01-01"),
    
    ("E002", "Analyst",          "2020-01-01", "2022-12-31"),
    ("E002", "Senior Analyst",   "2023-01-01", "2024-12-31"),

    ("E003", "Intern",           "2021-06-01", "2022-05-31"),
    ("E003", "Junior Dev",       "2022-06-01", "2023-05-31"),
    ("E003", "Developer",        "2023-06-01", "2024-06-01"),
    ("E003", "Senior Dev",       "2024-06-02", "2025-05-31"),
]

schema = StructType([
    StructField("emp_id", StringType(), True),
    StructField("role", StringType(), True),
    StructField("start_date", StringType(), True),
    StructField("end_date", StringType(), True)
])

employee_roles_df = spark.createDataFrame(data, schema=schema)
employee_roles_df = employee_roles_df.withColumn("start_date", to_date("start_date")) \
    .withColumn("end_date", to_date("end_date"))

employee_roles_df.show(truncate=False)
employee_roles_df.printSchema()


In [0]:
# Track complete promotion path in order.

# Window specification for complete path of roles for employees
window_spec = Window.partitionBy("emp_id").orderBy("start_date")
emp_roles_order_df = employee_roles_df.withColumn(
    "rn",
    row_number().over(window_spec)
)
emp_roles_order_df.show()

In [0]:
# Calculate how long they stayed in each role.
role_durations_df = employee_roles_df.withColumn(
    "role_duration",
    datediff("end_date", "start_date")
)
role_durations_df.show()

In [0]:
# Find employees with more than 2 promotions in 3 years.
# Window spec for aggregating minimum start date 
window = Window.partitionBy("emp_id")
min_start_date_df = employee_roles_df.withColumn(
    "min_start_date",
    min("start_date").over(window)
)
# Filtering 3 years promotions for employees
filtered_df = min_start_date_df.filter(
    (col("start_date") >= col("min_start_date")) &
    (col("start_date") <= add_months(col("min_start_date"), 36))
)

# Aggregating total roles for employees
emp_total_promotions_df = filtered_df.groupBy("emp_id").agg(
    countDistinct("role").alias("total_promotions")
)

# Filtering employees who has more than 2 promotions
final_df = emp_total_promotions_df.filter(col("total_promotions") > 2)
final_df.show()

In [0]:
"""
Customer Repeat Purchase Frequency
Dataset:
orders: order_id, customer_id, order_date

🛠️ Tasks:
For each customer:
Calculate number of days between purchases.

Label customers:
frequent: buys every <10 days
infrequent: every 10–30 days
dormant: >30 days
Find average frequency per segment.
"""

In [0]:
# Define schema with order_date as StringType
schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("order_date", StringType(), True)  # date as string
])

# Sample data with string dates
data = [
    (1, 101, "2024-01-01"),
    (2, 101, "2024-01-08"),
    (3, 101, "2024-01-17"),
    (4, 102, "2024-02-01"),
    (5, 102, "2024-03-05"),
    (6, 102, "2024-06-10"),
    (7, 103, "2024-01-01"),
    (8, 103, "2024-01-20"),
    (9, 103, "2024-02-25"),
    (10, 104, "2024-05-01"),
]

# Create DataFrame
orders_df = spark.createDataFrame(data, schema)

# Convert string to date
orders_df = orders_df.withColumn("order_date", to_date(col("order_date"), "yyyy-MM-dd"))

orders_df.show()

In [0]:
# Calculate number of days between purchases.

# Window specification to get previous order date
window_spec = Window.partitionBy("customer_id").orderBy("order_date")
prev_purc_df = orders_df.withColumn(
    "prev_order_date",
    lag("order_date").over(window_spec)
)

# Day differences between dates
day_diff_df = prev_purc_df.withColumn(
    "day_diff",
    datediff(col("order_date"), col("prev_order_date"))
)

# Labelling purchases based on differencs
labelled_df = day_diff_df.withColumn(
    "label",
    when(col("day_diff") < 10, 'frequent')
    .when((col("day_diff") >= 10) & (col("day_diff") <= 30), 'infrequent')
    .otherwise('dormant')
).filter("prev_order_date is not null")
labelled_df.show()

# Aggregating average frequency over segments
aggregated_df = labelled_df.groupBy("label").agg(
    avg("day_diff").alias("avg_frequency")
)
aggregated_df.show()