In [0]:
%run ./env-setup


In [0]:
import uuid
import time
import boto3
import pandas as pd
import psycopg2
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import os

In [0]:
AWS_ACCESS_KEY_ID = os.environ.get("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.environ.get("AWS_SECRET_ACCESS_KEY")
AWS_REGION = os.environ.get("AWS_REGION")
S3_BUCKET = os.environ.get("S3_BUCKET")
S3_FOLDER = os.environ.get("S3_FOLDER")
S3_PREFIX = os.environ.get("S3_FOLDER")
S3_DESTINATION = os.environ.get("S3_DESTINATION")

PG_HOST = os.environ.get("PG_HOST")
PG_PORT = int(os.environ.get("PG_PORT"))
PG_DATABASE = os.environ.get("PG_DATABASE")
PG_USER = os.environ.get("PG_USER")
PG_PASSWORD = os.environ.get("PG_PASSWORD")


In [0]:
S3_INPUT_PATH = f"s3a://{S3_BUCKET}/{S3_PREFIX}"
S3_OUTPUT_BASE = f"s3a://{S3_BUCKET}/{S3_DESTINATION}"

In [0]:
# --- Spark Session --- #
spark = SparkSession.builder \
    .appName("OptimizedPatternDetection") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.sql.shuffle.partitions", "8") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("fs.s3a.access.key", AWS_ACCESS_KEY_ID) \
    .config("fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY) \
    .getOrCreate()

# --- Boto3 S3 Client --- #
s3 = boto3.client(
    "s3",
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
    region_name=AWS_REGION
)

In [0]:
def setup_postgres_tracking():
    """Create tracking table with only required columns."""
    try:
        conn = psycopg2.connect(
            host=PG_HOST, port=PG_PORT,
            database=PG_DATABASE, user=PG_USER, password=PG_PASSWORD
        )
        with conn.cursor() as cursor:
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS processed_files (
                    file_key TEXT PRIMARY KEY,
                    processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    status TEXT DEFAULT 'pending'
                )
            """)
            conn.commit()
            print("PostgreSQL tracking table ready")
    except Exception as e:
        print(f"PostgreSQL setup error: {e}")
    finally:
        if conn:
            conn.close()

In [0]:
def detect_patterns(transactions_df):
    transactions_clean = transactions_df.withColumn("amount", F.col("amount").cast("double"))
    for column in ["customer", "merchant", "gender", "category"]:
        transactions_clean = transactions_clean.withColumn(
            column, F.regexp_replace(F.col(column), "'", "")
        )

    customer_imp_url = "https://drive.google.com/uc?export=download&id=1abe9EkM_uf2F2hjEkbhMBG9Mf2dFE4Wo"
    customer_imp_pd = pd.read_csv(customer_imp_url, on_bad_lines='skip').drop("fraud", axis=1)
    customer_imp_pd.columns = customer_imp_pd.columns.str.strip().str.replace("'", "")
    for col in customer_imp_pd.select_dtypes(include=['object']):
        customer_imp_pd[col] = customer_imp_pd[col].str.replace("'", "")
    customer_imp_spark = spark.createDataFrame(customer_imp_pd)

    merged_df = transactions_clean.join(
        F.broadcast(customer_imp_spark),
        (transactions_clean.customer == customer_imp_spark.Source) & 
        (transactions_clean.merchant == customer_imp_spark.Target),
        "left"
    )
    merged_df.createOrReplaceTempView("merged_transactions")

    pattern1 = spark.sql("""
        WITH merchant_stats AS (
            SELECT merchant FROM merged_transactions 
            GROUP BY merchant HAVING COUNT(*) > 50000
        ),
        customer_metrics AS (
            SELECT 
                customer,
                merchant,
                PERCENT_RANK() OVER (PARTITION BY merchant ORDER BY COUNT(*)) as txn_percentile,
                PERCENT_RANK() OVER (PARTITION BY merchant ORDER BY AVG(Weight)) as weight_percentile
            FROM merged_transactions
            WHERE merchant IN (SELECT merchant FROM merchant_stats)
            GROUP BY customer, merchant
        )
        SELECT 
            CURRENT_TIMESTAMP() as detectionTime,
            'PatId1' as patternId,
            'UPGRADE' as ActionType,
            customer as customerName,
            merchant as MerchantId
        FROM customer_metrics
        WHERE txn_percentile >= 0.99 AND weight_percentile <= 0.01
    """)

    pattern2 = spark.sql("""
        SELECT 
            CURRENT_TIMESTAMP() as detectionTime,
            'PatId2' as patternId,
            'CHILD' as ActionType,
            customer as customerName,
            merchant as MerchantId
        FROM merged_transactions
        GROUP BY customer, merchant
        HAVING COUNT(*) >= 80 AND AVG(amount) < 23
    """)

    pattern3 = spark.sql("""
        SELECT 
            CURRENT_TIMESTAMP() as detectionTime,
            'PatId3' as patternId,
            'DEI-NEEDED' as ActionType,
            '' as customerName,
            merchant as MerchantId
        FROM merged_transactions
        WHERE gender IN ('M', 'F')
        GROUP BY merchant
        HAVING 
            SUM(CASE WHEN gender = 'F' THEN 1 ELSE 0 END) > 100 AND
            SUM(CASE WHEN gender = 'M' THEN 1 ELSE 0 END) > 
            SUM(CASE WHEN gender = 'F' THEN 1 ELSE 0 END)
    """)

    return pattern1.unionByName(pattern2).unionByName(pattern3)

In [0]:
def repartition_into_50row_chunks():
    S3_RESULT_PATH= f"s3a://{S3_BUCKET}/result/"

    combined_df = spark.read.parquet(f"{S3_OUTPUT_BASE}/*/*.parquet")
    df_with_batches = combined_df.withColumn(
        "batch_id", F.floor(F.row_number().over(Window.orderBy(F.monotonically_increasing_id())) / 50)
    ).cache()

    total_rows = df_with_batches.count()
    num_batches = (total_rows // 50) + (1 if total_rows % 50 else 0)

    df_with_batches.repartition(num_batches, "batch_id") \
        .write.partitionBy("batch_id") \
        .option("maxRecordsPerFile", 50) \
        .mode("overwrite") \
        .parquet(f"{S3_RESULT_PATH}")

    print(f"Saved {total_rows} rows in {num_batches} batches of 50 rows each")
    df_with_batches.unpersist()

In [0]:
def process_all_chunks():
    setup_postgres_tracking()

    spark.conf.set("spark.hadoop.fs.s3a.connection.timeout", "600000")
    spark.conf.set("spark.hadoop.fs.s3a.attempts.maximum", "5")
    spark.conf.set("fs.s3a.committer.name", "directory")

    response = s3.list_objects_v2(Bucket=S3_BUCKET, Prefix=S3_PREFIX)

    processed_files = set()
    try:
        conn = psycopg2.connect(
            host=PG_HOST, port=PG_PORT,
            database=PG_DATABASE, user=PG_USER, password=PG_PASSWORD
        )
        with conn.cursor() as cursor:
            cursor.execute("SELECT file_key FROM processed_files WHERE status = 'completed'")
            processed_files = {row[0] for row in cursor.fetchall()}
    except Exception as e:
        print(f"Warning: Could not fetch processed files from PostgreSQL: {e}")
    finally:
        if 'conn' in locals():
            conn.close()

    for i, obj in enumerate(response.get("Contents", [])):
        file_key = obj["Key"]

        if not file_key.endswith(".csv") or file_key in processed_files:
            continue

        processing_start = time.time()
        conn = None

        try:
            print(f"\nProcessing file {i}: {file_key}")

            conn = psycopg2.connect(
                host=PG_HOST, port=PG_PORT,
                database=PG_DATABASE, user=PG_USER, password=PG_PASSWORD
            )
            with conn.cursor() as cursor:
                cursor.execute("""
                    INSERT INTO processed_files (file_key, status)
                    VALUES (%s, 'processing')
                    ON CONFLICT (file_key) DO UPDATE SET
                        status = 'processing',
                        processed_at = CURRENT_TIMESTAMP
                """, (file_key,))
                conn.commit()

            df = spark.read.csv(f"s3a://{S3_BUCKET}/{file_key}", header=True)
            detections = detect_patterns(df)
            rows_processed = df.count()
            detection_count = detections.count()

            save_path = f"{S3_OUTPUT_BASE}/chunk_{i}_{int(time.time())}"
            detections.repartition(8).write.mode("append").option("compression", "snappy").parquet(save_path)

            with conn.cursor() as cursor:
                cursor.execute("""
                    UPDATE processed_files SET
                        status = 'completed',
                        processed_at = CURRENT_TIMESTAMP
                    WHERE file_key = %s
                """, (file_key,))
                conn.commit()

            print(f"✅ Processed {rows_processed} rows, found {detection_count} detections")

        except Exception as e:
            print(f"❌ Failed processing {file_key}: {e}")
            if conn:
                try:
                    with conn.cursor() as cursor:
                        cursor.execute("""
                            UPDATE processed_files SET
                                status = 'failed',
                                processed_at = CURRENT_TIMESTAMP
                            WHERE file_key = %s
                        """, (file_key,))
                        conn.commit()
                except Exception as db_error:
                    print(f"⚠️ Could not record failure: {db_error}")
        finally:
            if conn:
                conn.close()

    print("\n✅ All file processing complete.")
    repartition_into_50row_chunks()

In [0]:
process_all_chunks()

PostgreSQL tracking table ready

✅ All file processing complete.
Saved 74437 rows in 1489 batches of 50 rows each


In [0]:
test_file=spark.read.parquet(f's3a://{S3_BUCKET}/result/batch_id=1014/*.parquet')

In [0]:
import boto3
import os

# --- AWS Credentials (ensure these are loaded via .env or environment vars) --- #
AWS_ACCESS_KEY_ID = os.environ.get("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.environ.get("AWS_SECRET_ACCESS_KEY")
AWS_REGION = os.environ.get("AWS_REGION")

# --- S3 Setup --- #
S3_BUCKET = "bank-transactions-vishak"
S3_PREFIX = "result/"  # the folder inside the bucket

# --- Local download folder --- #
local_dir = "/tmp/s3_result_downloads"
os.makedirs(local_dir, exist_ok=True)

# --- Initialize S3 client --- #
s3 = boto3.client(
    "s3",
    region_name=AWS_REGION,
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY
)

# --- Paginate through all objects in the folder --- #
paginator = s3.get_paginator("list_objects_v2")
page_iterator = paginator.paginate(Bucket=S3_BUCKET, Prefix=S3_PREFIX)

downloaded = 0

for page in page_iterator:
    for obj in page.get("Contents", []):
        key = obj["Key"]

        # Skip any folders
        if key.endswith("/"):
            continue

        filename = os.path.basename(key)
        local_path = os.path.join(local_dir, filename)

        print(f"⬇️ Downloading {key} → {local_path}")
        s3.download_file(S3_BUCKET, key, local_path)
        downloaded += 1

print(f"✅ Download complete: {downloaded} files saved to {local_dir}")


⬇️ Downloading result/_SUCCESS → /tmp/s3_result_downloads/_SUCCESS
⬇️ Downloading result/_committed_7780928008968029570 → /tmp/s3_result_downloads/_committed_7780928008968029570
⬇️ Downloading result/batch_id=0/_SUCCESS → /tmp/s3_result_downloads/_SUCCESS
⬇️ Downloading result/batch_id=0/_committed_7780928008968029570 → /tmp/s3_result_downloads/_committed_7780928008968029570
⬇️ Downloading result/batch_id=0/_committed_9094343648041532070 → /tmp/s3_result_downloads/_committed_9094343648041532070
⬇️ Downloading result/batch_id=0/_started_7780928008968029570 → /tmp/s3_result_downloads/_started_7780928008968029570
⬇️ Downloading result/batch_id=0/_started_9094343648041532070 → /tmp/s3_result_downloads/_started_9094343648041532070
⬇️ Downloading result/batch_id=0/part-00336-tid-7780928008968029570-6ecf0772-0486-474c-a19e-24c5b00da5ff-7430-1.c000.snappy.parquet → /tmp/s3_result_downloads/part-00336-tid-7780928008968029570-6ecf0772-0486-474c-a19e-24c5b00da5ff-7430-1.c000.snappy.parquet
⬇️ Dow