In [None]:
import math
import os
import uuid
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql import SparkSession


StatementMeta(, d5d9418a-8ce4-462d-a0a6-cac3cb8c591f, 3, Finished, Available, Finished)

In [None]:
# Read transactions from Bronze
df = spark.read.format("csv")\
    .option("header", True)\
    .option("inferSchema", True)\
    .load("abfss://DevDolphins_Assignment@onelake.dfs.fabric.microsoft.com/Bronze_Assignment.Lakehouse/Files/raw_data/CustomerImportance.csv")

StatementMeta(, d5d9418a-8ce4-462d-a0a6-cac3cb8c591f, 4, Finished, Available, Finished)

In [None]:
display(df)

StatementMeta(, d5d9418a-8ce4-462d-a0a6-cac3cb8c591f, 5, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 74662868-9d03-4b25-8a2e-c1029b7f2528)

In [None]:
import math
from datetime import datetime

from pyspark.sql.functions import (
    col, count, avg, expr, lit, date_format, current_timestamp,
    percentile_approx
)
from pyspark.sql.utils import AnalysisException

# ---------------------
# Load Customer Importance Reference
# ---------------------
importance_df = spark.read.format("csv")\
    .option("header", True)\
    .option("inferSchema", True)\
    .load("abfss://DevDolphins_Assignment@onelake.dfs.fabric.microsoft.com/Bronze_Assignment.Lakehouse/Files/raw_data/CustomerImportance.csv")

importance_df = importance_df.selectExpr(
    "Source as customer",
    "Target as merchant",
    "Weight as weight",
    "typeTrans as category"
).withColumn("weight", col("weight").cast("double"))

# ---------------------
# Define Paths
# ---------------------
chunk_base_path = "abfss://DevDolphins_Assignment@onelake.dfs.fabric.microsoft.com/Chunks_Silver_Data.Lakehouse/Files/ChunkData"
detection_output_path = "abfss://DevDolphins_Assignment@onelake.dfs.fabric.microsoft.com/Detections_Gold_Data.Lakehouse/Files/DetectionsData"

# ---------------------
# Function to get the next available detection batch number
# ---------------------
def get_next_batch_num(base_path):
    try:
        df = spark.read.format("binaryFile").load(base_path.rstrip("/") + "/*")
        file_names = [row.path.split("/")[-1] for row in df.collect()]
        existing_nums = []

        for name in file_names:
            if name.startswith("detection_") and name.endswith(".csv"):
                num_part = name.replace("detection_", "").replace(".csv", "")
                if num_part.isdigit():
                    existing_nums.append(int(num_part))

        return max(existing_nums + [0]) + 1
    except Exception as e:
        print(f"Failed to check existing files: {e}")
        return 1

# ---------------------
# Loop through Chunks
# ---------------------
for i in range(1, 1000):  # Adjust upper limit as needed
    try:
        chunk_path = f"{chunk_base_path}/Chunk_Part_{i:02d}"
        chunk_output_folder = f"{detection_output_path}/Detect_Part_{i:02d}".rstrip("/")

        # Skip this chunk if detection folder already exists
        try:
            existing = spark.read.format("binaryFile").load(chunk_output_folder + "/*")
            if existing.count() > 0:
                print(f"⏭️ Skipping Chunk_Part_{i:02d} — output already exists.")
                continue
        except:
            pass  # Folder not found — proceed normally

        print(f"🔍 Processing: {chunk_path}")

        try:
            df = spark.read.format("csv").option("header", True).load(chunk_path)
        except AnalysisException as ae:
            if "Path does not exist" in str(ae):
                print(f"Reached end of available chunks at Chunk_Part_{i:02d}.")
                break
            else:
                raise ae

        df = df.withColumn("amount", col("amount").cast("double"))

        joined_df = df.join(importance_df, on=["customer", "merchant", "category"], how="left")

        detections = []
        detection_start_time = date_format(current_timestamp(), "yyyy-MM-dd HH:mm:ss")

        # ---------------------
        # PatId1: Top 10% txn count, bottom 10% weight, merchant > 50K txns
        # ---------------------
        merchant_txn_count = joined_df.groupBy("merchant").count().filter(col("count") >= 50000)
        eligible_merchants = [row['merchant'] for row in merchant_txn_count.collect()]

        if eligible_merchants:
            filtered_df = joined_df.filter(col("merchant").isin(eligible_merchants))

            txn_counts = filtered_df.groupBy("merchant", "customer") \
                                    .agg(count("*").alias("txn_count"))

            threshold_txn = txn_counts.approxQuantile("txn_count", [0.9], 0.01)[0]
            top_txns = txn_counts.filter(col("txn_count") >= threshold_txn)

            weight_df = filtered_df.select("merchant", "customer", "weight").dropna()
            weight_stats = weight_df.groupBy("merchant") \
                                    .agg(percentile_approx("weight", 0.1).alias("p10_weight"))
            
            low_weight = filtered_df.join(weight_stats, "merchant") \
                                    .filter(col("weight") <= col("p10_weight"))

            pat1_df = top_txns.join(low_weight, ["merchant", "customer"]) \
                .selectExpr(
                    "'PatId1' as patternId",
                    "'UPGRADE' as ActionType",
                    "customer as customerName",
                    "merchant as MerchantId"
                ).dropDuplicates()
            detections.append(pat1_df)

        # ---------------------
        # PatId2: Avg txn < 23 and >= 80
        # ---------------------
        txn_stats = joined_df.groupBy("merchant", "customer").agg(
            count("*").alias("txn_count"),
            avg("amount").alias("avg_amount")
        )

        pat2_df = txn_stats.filter((col("txn_count") >= 80) & (col("avg_amount") < 23)) \
            .selectExpr(
                "'PatId2' as patternId",
                "'CHILD' as ActionType",
                "customer as customerName",
                "merchant as MerchantId"
            )
        detections.append(pat2_df)

        # ---------------------
        # PatId3: Gender Imbalance (F < M, F > 100)
        # ---------------------
        gender_df = joined_df.select("merchant", "gender").na.drop().dropDuplicates()
        gender_count = gender_df.groupBy("merchant", "gender").count() \
            .groupBy("merchant").pivot("gender").sum("count").na.fill(0)

        if set(["F", "M"]).issubset(gender_count.columns):
            pat3_df = gender_count.filter((col("F") > 100) & (col("F") < col("M"))) \
                .selectExpr(
                    "'PatId3' as patternId",
                    "'DEI-NEEDED' as ActionType",
                    "'' as customerName",
                    "merchant as MerchantId"
                )
            detections.append(pat3_df)

        # ---------------------
        # Combine Detections and Write Output in Batches of 50
        # ---------------------
        if detections:
            final_df = detections[0]
            for d in detections[1:]:
                final_df = final_df.unionByName(d, allowMissingColumns=True)

            final_df = final_df.withColumn("YStartTime(IST)", date_format(current_timestamp(), "yyyy-MM-dd HH:mm:ss")) \
                               .withColumn("detectionTime(IST)", lit(detection_start_time))

            final_df = final_df.select(
                "YStartTime(IST)",
                "detectionTime(IST)",
                "patternId",
                "ActionType",
                "customerName",
                "MerchantId"
            )

            rows = final_df.collect()
            start_batch_num = get_next_batch_num(chunk_output_folder)

            for j in range(0, len(rows), 50):
                batch = rows[j:j+50]
                batch_df = spark.createDataFrame(batch, schema=final_df.schema)

                batch_num = start_batch_num + (j // 50)
                output_file_path = f"{chunk_output_folder}/detection_{batch_num}.csv"

                batch_df.coalesce(1).write.mode("overwrite").option("header", True).csv(output_file_path)
                print(f"Wrote: {output_file_path} with {len(batch)} rows")
        else:
            print(f"No detections found in chunk {i:02d}.")

    except Exception as e:
        print(f"Failed to process chunk {i:02d}: {str(e)}")
        break


StatementMeta(, 6fcff57a-3d6a-45d1-ae38-38b621c79f9a, 10, Finished, Available, Finished)

⏭️ Skipping Chunk_Part_01 — output already exists.


⏭️ Skipping Chunk_Part_02 — output already exists.
⏭️ Skipping Chunk_Part_03 — output already exists.
🔍 Processing: abfss://DevDolphins_Assignment@onelake.dfs.fabric.microsoft.com/Chunks_Silver_Data.Lakehouse/Files/ChunkData/Chunk_Part_04
✅ Reached end of available chunks at Chunk_Part_04.
