In [0]:
import os
import time
from datetime import datetime
import psycopg2
from pyspark.sql.functions import col, count, avg, lower, when,row_number,percent_rank,lit
from pyspark.sql import Window
import requests
import uuid


In [0]:
chunk_input_path = "dbfs:/mnt/s3mount/chunk/"
detection_output_path = "dbfs:/mnt/s3mount/detections/"
customer_importance_path = "dbfs:/mnt/s3mount/Customer/CustomerImportance.csv"

In [0]:
pg_host = <host>
pg_port = "5432"
pg_db = "ass"
pg_user = "postgres"
pg_password = <pg_password>

In [0]:

def write_s3():
    file_id='1abe9EkM_uf2F2hjEkbhMBG9Mf2dFE4Wo'
    download_url = f"https://drive.google.com/uc?export=download&id={file_id}"
    local_path = f"/dbfs/temp/CustomerImportance.csv"
    r = requests.get(download_url)
    with open(local_path, 'wb') as f:
        f.write(r.content)
    dbutils.fs.cp(f'file:{local_path}', customer_importance_path)

write_s3()

In [0]:
def get_processed_files():
    conn = psycopg2.connect(host=pg_host, dbname=pg_db, user=pg_user, password=pg_password, port=pg_port)
    cur = conn.cursor()
    cur.execute("SELECT file_name FROM processed_chunks")
    files = [r[0] for r in cur.fetchall()]
    conn.close()
    return set(files)

In [0]:
def mark_file_processed(file_name):
    conn = psycopg2.connect(host=pg_host, dbname=pg_db, user=pg_user, password=pg_password, port=pg_port)
    cur = conn.cursor()
    cur.execute("INSERT INTO processed_chunks (file_name) VALUES (%s) ON CONFLICT DO NOTHING", (file_name,))
    conn.commit()
    conn.close()

In [0]:
def PatId1(trans_df,cust_importance_df,YStartTime):
    merchant_chunk_counts = (
        trans_df.groupBy("merchant")
        .count()
        .withColumnRenamed("count", "chunk_txn_count")
    )

    merchant_chunk_counts_pd = merchant_chunk_counts.toPandas()
    conn = psycopg2.connect(
        host=pg_host, dbname=pg_db, user=pg_user, password=pg_password, port=pg_port
    )
    cur = conn.cursor()
    for index, row in merchant_chunk_counts_pd.iterrows():
        cur.execute("""
            INSERT INTO merchant_transaction_totals (merchant, total_txns)
            VALUES (%s, %s)
            ON CONFLICT (merchant) DO UPDATE 
            SET total_txns = merchant_transaction_totals.total_txns + EXCLUDED.total_txns;
        """, (row['merchant'], int(row['chunk_txn_count'])))
    conn.commit()

    cur.execute("SELECT merchant FROM merchant_transaction_totals WHERE total_txns >= 50000")
    eligible_merchants = [r[0] for r in cur.fetchall()]
    conn.close()

    if not eligible_merchants:
        print("No merchants reached 50K threshold yet. Skipping PatId1.")
        return None

    eligible_df = spark.createDataFrame(eligible_merchants, "string").withColumnRenamed("value", "merchant")
    eligible_txns_df = trans_df.join(eligible_df, on="merchant", how="inner")

    customer_counts = (
        eligible_txns_df.groupBy("merchant", "customer")
        .agg(count("*").alias("txn_count"))
    )

    window_spec = Window.partitionBy("merchant").orderBy(col("txn_count").desc())
    ranked = customer_counts.withColumn("percentile", percent_rank().over(window_spec))
    top_1_percent = ranked.filter(col("percentile") <= 0.01)

    combined = top_1_percent.join(
        cust_importance_df,
        top_1_percent["customer"] == cust_importance_df["Source"],
        how="inner"
    ).withColumn("weight", col("weight").cast("float"))
    temp=combined.count()

    if temp > 0:
        weight_thresholds = combined.stat.approxQuantile("weight", [0.01], 0.01)
        if weight_thresholds:
            low_weight = weight_thresholds[0]
            pat1_df = combined.filter(col("weight") <= low_weight).withColumn("YStartTime", lit(YStartTime)) \
                .selectExpr(
                "YStartTime",
                "current_timestamp() as detectionTime",
                "'PatId1' as patternId",
                "'UPGRADE' as actionType",
                "customer as customerName",
                "merchant as MerchantId"
            )
            return pat1_df
    return None


In [0]:
def PatId2(trans_df,YStartTime):
    pat2_df = (
        trans_df.groupBy("merchant", "customer")
        .agg(avg("amount").alias("avg_txn"), count("*").alias("txn_count"))
        .filter((col("avg_txn") < 23) & (col("txn_count") >= 80))
        .withColumn("YStartTime", lit(YStartTime)) \
        .selectExpr(
                "YStartTime",
            "current_timestamp() as detectionTime",
            "'PatId2' as patternId",
            "'CHILD' as actionType",
            "customer as customerName",
            "merchant as MerchantId"
        )
    )
    return pat2_df

In [0]:
from pyspark.sql.functions import col, lower, current_timestamp

def PatId3(trans_df, YStartTime):
    

    gender_df = trans_df.withColumn("gender", lower(col("gender")))\
                        .filter(col("gender").isin("f", "m"))
    gender_count = gender_df.groupBy("merchant", "gender").count()
    gender_pivot = gender_count.groupBy("merchant")\
                               .pivot("gender", ["f", "m"])\
                               .sum("count")\
                               .fillna(0)  
    pat3_df = gender_pivot.filter((col("f") < col("m")) & (col("f") > 100))\
                          .withColumn("YStartTime", lit(YStartTime)) \
                          .selectExpr(
                              "YStartTime",
                              "current_timestamp() as detectionTime",
                              "'PatId3' as patternId",
                              "'DEI-NEEDED' as actionType",
                              "'' as customerName",
                              "merchant as MerchantId"
                          )
    
    return pat3_df

In [0]:
def detect_patterns(trans_df, cust_importance_df,YStartTime):
    pat1_df = PatId1(trans_df,cust_importance_df,YStartTime)
    pat2_df = PatId2(trans_df,YStartTime)
    pat3_df = PatId3(trans_df,YStartTime)
    final_df =pat2_df.unionByName(pat3_df)
    if pat1_df is not None:
       final_df = final_df.unionByName(pat1_df)
    return final_df

In [0]:
schema_trans = """step INT,
    customer STRING,
    age INT,           
    gender STRING,
    zipcodeOri STRING,
    merchant STRING,
    zipMerchant STRING,
    category STRING,
    amount DOUBLE,
    fraud INT"""

schema_cust='Source string, Target string, Weight float, typeTrans string, fraud int'

schema_detection='YStartTime string, detectionTime timestamp, patternId string, actionType string, customerName string, MerchantId string'

In [0]:
def write_detection_batch(batch, schema, output_path):
    if not batch:
        return
    batch_df = spark.createDataFrame(batch, schema=schema)
    batch_df.coalesce(1).write.mode("append").option("header", "true").csv(output_path)
    print(f"Wrote {len(batch)} detections to {output_path}")

In [0]:
wait_rounds=0
detection_buffer = []
while True:
    YStartTime=datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%fZ")[:-3] + "Z"
    input_files = dbutils.fs.ls(chunk_input_path)
    processed_files = get_processed_files()

    new_files = [
        f.path for f in input_files
        if f.path.endswith(".csv") and os.path.basename(f.path) not in processed_files
    ]

    if not new_files:
        print("No new files. Waiting...")
        time.sleep(10)
        wait_rounds += 1
        if wait_rounds >= 5:
            if detection_buffer:
                batch = detection_buffer[:50]
                write_detection_batch(batch, schema_detection, detection_output_path)
            print("Exiting loop after waiting.")
            break

        continue

    for file_path in new_files:
        print(f"Processing: {file_path}")
        
        df = spark.read.option("header", "true").schema(schema_trans).csv(file_path)

        cust_importance_df = (
            spark.read.option("header", "true")
            .option("quote", "'")
            .schema(schema_cust)
            .csv(customer_importance_path)
            .withColumn("weight", col("weight").cast("float"))
        )

        detections_df = detect_patterns(df, cust_importance_df,YStartTime)

        if detections_df is None or detections_df.count() == 0:
            print("No detections.")
            mark_file_processed(os.path.basename(file_path))
            continue

        new_detections = detections_df.collect()
        detection_buffer.extend(new_detections)

        while len(detection_buffer) >= 50:
            batch = detection_buffer[:50]
            detection_buffer = detection_buffer[50:]
            write_detection_batch(batch, schema_detection, detection_output_path)

        mark_file_processed(os.path.basename(file_path))
        time.sleep(1)
    wait_rounds = 0

Processing: dbfs:/mnt/s3mount/chunk/part-00000-tid-3977065896322773474-20987141-cf7d-40d4-9b6c-d91640f9e9d8-64-1-c000.csv
No merchants reached 50K threshold yet. Skipping PatId1.
No detections.
Processing: dbfs:/mnt/s3mount/chunk/part-00000-tid-5270464475915362499-6a54e8ab-5ff6-4abf-8532-1faf2a0ee54a-63-1-c000.csv
No merchants reached 50K threshold yet. Skipping PatId1.
No detections.
Processing: dbfs:/mnt/s3mount/chunk/part-00000-tid-6624530746572677371-8b756a4d-09fb-46ce-9dfd-877f730a4173-62-1-c000.csv
No merchants reached 50K threshold yet. Skipping PatId1.
No detections.
Processing: dbfs:/mnt/s3mount/chunk/part-00000-tid-6964740517520853725-53c06f2a-bb82-4483-8d3f-cc6d51b0fa86-59-1-c000.csv
No merchants reached 50K threshold yet. Skipping PatId1.
No detections.
Processing: dbfs:/mnt/s3mount/chunk/part-00000-tid-7147507964213923869-8e36e423-7511-44fd-8ea3-c5deac91635e-60-1-c000.csv
No merchants reached 50K threshold yet. Skipping PatId1.
No detections.
Processing: dbfs:/mnt/s3mount/

In [0]:
spark.read.csv(detection_output_path,header=True).show(truncate=False)

+-------------------------+------------------------+---------+----------+------------+-----------+
|YStartTime               |detectionTime           |patternId|actionType|customerName|MerchantId |
+-------------------------+------------------------+---------+----------+------------+-----------+
|2025-06-01T15:51:31.4245Z|2025-06-01T15:51:57.145Z|PatId1   |UPGRADE   |C1018653381 |M1823072687|
|2025-06-01T15:51:31.4245Z|2025-06-01T15:51:57.145Z|PatId1   |UPGRADE   |C1710930627 |M1823072687|
|2025-06-01T15:51:31.4245Z|2025-06-01T15:52:07.846Z|PatId1   |UPGRADE   |C833126375  |M1823072687|
|2025-06-01T15:51:31.4245Z|2025-06-01T15:52:18.957Z|PatId1   |UPGRADE   |C876944738  |M1823072687|
|2025-06-01T15:51:31.4245Z|2025-06-01T15:52:18.957Z|PatId1   |UPGRADE   |C1918953803 |M1823072687|
|2025-06-01T15:51:31.4245Z|2025-06-01T15:52:18.957Z|PatId1   |UPGRADE   |C1814870538 |M1823072687|
|2025-06-01T15:51:31.4245Z|2025-06-01T15:52:18.957Z|PatId1   |UPGRADE   |C782199851  |M1823072687|
|2025-06-0