#setting up the databricks with ADLS


In [0]:
spark.conf.set("fs.azure.account.auth.type.devdol1.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.devdol1.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.devdol1.dfs.core.windows.net", "2324ad62-ef2b-4a4f-8d05-871ebc6180b2")
spark.conf.set("fs.azure.account.oauth2.client.secret.devdol1.dfs.core.windows.net", "16Y8Q~UTaC_KQ.sG~aKA.D2dAfAn5flYo321dbKC"
)
spark.conf.set("fs.azure.account.oauth2.client.endpoint.devdol1.dfs.core.windows.net", "https://login.microsoftonline.com/c6b02a8c-4e30-4e38-86b4-117f4f2150a8/oauth2/token")

#Loading the transactions and cusotmerimportance data from ADLS

In [0]:
main_path = "abfss://main-files@devdol1.dfs.core.windows.net"
raw_path = "abfss://raw@devdol1.dfs.core.windows.net"
processed_path = "abfss://processed@devdol1.dfs.core.windows.net"

transactions_path = f"{main_path}/transactions.csv"
importance_path = f"{main_path}/customerimportance.csv"
chunks_output_dir = f"{raw_path}/chunks"
detections_output_dir = f"{processed_path}/detections"

# ===== STEP 2: Load input files =====
df_t = spark.read.option("header", "true").option("inferSchema", "true").csv(transactions_path)
df_cust = spark.read.option("header", "true").option("inferSchema", "true").csv(importance_path)

In [0]:
df_t.printSchema()

root
 |-- step: integer (nullable = true)
 |-- customer: string (nullable = true)
 |-- age: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- zipcodeOri: string (nullable = true)
 |-- merchant: string (nullable = true)
 |-- zipMerchant: string (nullable = true)
 |-- category: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- fraud: integer (nullable = true)



In [0]:
df_cust.printSchema()

root
 |-- Source: string (nullable = true)
 |-- Target: string (nullable = true)
 |-- Weight: double (nullable = true)
 |-- typeTrans: string (nullable = true)
 |-- fraud: integer (nullable = true)



#Ncessary modules for transformation

In [0]:
from pyspark.sql.functions import col, count, avg, lit, lower, trim, coalesce, current_timestamp, row_number, monotonically_increasing_id
from pyspark.sql.window import Window
from datetime import datetime
from pytz import timezone


#Mechanism X Architecture


In [0]:
# Load full transactions dataset
df_t = spark.read.format("csv").option("header", True).option("inferSchema", True) \
    .load("abfss://main-files@devdol1.dfs.core.windows.net/transactions.csv")

# Add row number for chunking
window = Window.orderBy(monotonically_increasing_id())
df_t_indexed = df_t.withColumn("row_id", row_number().over(window))

# Chunk config
chunk_size = 10000
total_rows = df_t_indexed.count()
num_chunks = (total_rows + chunk_size - 1) // chunk_size
chunks_output_dir = "abfss://main-files@devdol1.dfs.core.windows.net/stream_simulation/bronze/chunks"

# Write chunk files into a bronze directory in ADLS
for i in range(num_chunks):
    lower_bound = i * chunk_size + 1
    upper_bound = (i + 1) * chunk_size + 1
    chunk_df = df_t_indexed.filter((col("row_id") >= lower_bound) & (col("row_id") < upper_bound)).drop("row_id")
    chunk_df.write.mode("overwrite").option("header", True).csv(f"{chunks_output_dir}/chunk_{i+1}")

#Mechanism Y Architecture

In [0]:
# Load customer importance data
imp_file = "abfss://main-files@devdol1.dfs.core.windows.net/customerimportance.csv"
df_imp = spark.read.option("header", True).csv(imp_file) \
    .drop("fraud") \
    .withColumn("Source", lower(trim(col("Source")))) \
    .withColumn("Target", lower(trim(col("Target"))))

# Initialize state
merchant_txn_counts = {}
merchant_gender_customers = {}
detection_batch = []
batch_size = 50
batch_num = 1
output_path = "abfss://main-files@devdol1.dfs.core.windows.net/stream_simulation/silver/detections"

#Pattern detection block

In [0]:
# List chunks
chunks = dbutils.fs.ls(chunks_output_dir)
chunk_files = sorted([f.path for f in chunks if f.name.startswith("chunk_")])

# Process each chunk
for chunk_file in chunk_files:
    print(f"Processing {chunk_file}")
    df_txn = spark.read.option("header", True).csv(chunk_file) \
        .withColumn("amount", col("amount").cast("double")) \
        .withColumn("customer", lower(trim(col("customer"))))

    # Update merchant txn count
    txn_counts_df = df_txn.groupBy("merchant").agg(count("*").alias("txn_count"))
    for row in txn_counts_df.collect():
        m, cnt = row["merchant"], row["txn_count"]
        merchant_txn_counts[m] = merchant_txn_counts.get(m, 0) + cnt

    # Update gender-customer sets
    for row in df_txn.select("merchant", "customer", "gender").distinct().collect():
        m, c, g = row["merchant"], row["customer"], row["gender"]
        if g in ("M", "F"):
            if m not in merchant_gender_customers:
                merchant_gender_customers[m] = {"M": set(), "F": set()}
            merchant_gender_customers[m][g].add(c)




    # Pattern 2: CHILD
    df_pat2 = df_txn.groupBy("customer", "merchant") \
        .agg(avg("amount").alias("avg_txn"), count("*").alias("txn_count")) \
        .filter((col("avg_txn") < 23) & (col("txn_count") >= 80)) \
        .withColumn("YStartTime", lit(datetime.now(timezone("Asia/Kolkata")).strftime("%Y-%m-%d %H:%M:%S"))) \
        .withColumn("detectionTime", current_timestamp()) \
        .withColumn("patternId", lit("PatId2")) \
        .withColumn("ActionType", lit("CHILD")) \
        .select("YStartTime", "detectionTime", "patternId", "ActionType", "customer", "merchant")

    detection_batch += [row.asDict() for row in df_pat2.collect()]





    # Pattern 1: UPGRADE
    df_txn_joined = df_txn.join(
        df_imp,
        (df_txn["customer"] == df_imp["Source"]) &
        (df_txn["merchant"] == df_imp["Target"]) &
        (df_txn["category"] == df_imp["typeTrans"]),
        "left"
    ).withColumn("Weight", coalesce(col("Weight"), lit(0.0)))

    cust_agg = df_txn_joined.groupBy("customer", "merchant") \
        .agg(count("step").alias("txn_count"), avg("Weight").alias("avg_weight"))

    merchant_list = [r["merchant"] for r in cust_agg.select("merchant").distinct().collect()]
    for m in merchant_list:
        if merchant_txn_counts.get(m, 0) < 50000:
            continue
        dfm = cust_agg.filter(col("merchant") == m)
        txn90 = dfm.approxQuantile("txn_count", [0.9], 0.05)[0]
        w10 = dfm.approxQuantile("avg_weight", [0.1], 0.05)[0]

        df_pat1 = dfm.filter((col("txn_count") >= txn90) & (col("avg_weight") <= w10)) \
            .withColumn("YStartTime", lit(datetime.now(timezone("Asia/Kolkata")).strftime("%Y-%m-%d %H:%M:%S"))) \
            .withColumn("detectionTime", current_timestamp()) \
            .withColumn("patternId", lit("PatId1")) \
            .withColumn("ActionType", lit("UPGRADE")) \
            .select("YStartTime", "detectionTime", "patternId", "ActionType", "customer", "merchant")

        detection_batch += [row.asDict() for row in df_pat1.collect()]





    # Pattern 3: DEI-NEEDED
    for m, gender_map in merchant_gender_customers.items():
        f_count = len(gender_map["F"])
        m_count = len(gender_map["M"])
        if f_count > 100 and m_count > f_count:
            detection_batch.append({
                "YStartTime": datetime.now(timezone("Asia/Kolkata")).strftime("%Y-%m-%d %H:%M:%S"),
                "detectionTime": datetime.now(),
                "patternId": "PatId3",
                "ActionType": "DEI-NEEDED",
                "customer": "",
                "merchant": m
            })
            
#Storing the batches in a silver directory

    # Write batches of 50
    while len(detection_batch) >= batch_size:
        dfout = spark.createDataFrame(detection_batch[:batch_size])
        outfile = f"{output_path}/detection_batch_{batch_num}"
        dfout.write.mode("overwrite").option("header", True).csv(outfile)
        print(f"Wrote {outfile}")
        detection_batch = detection_batch[batch_size:]
        batch_num += 1

# Final flush
if detection_batch:
    dfout = spark.createDataFrame(detection_batch)
    outfile = f"{output_path}/detection_batch_{batch_num}"
    dfout.write.mode("overwrite").option("header", True).csv(outfile)
    print(f"Wrote final {outfile}")

com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:132)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:132)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:129)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:129)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:715)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:435)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:435)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.can