In [0]:
# Install required packages
%pip install psycopg2-binary boto3

Python interpreter will be restarted.
Collecting psycopg2-binary
  Using cached psycopg2_binary-2.9.10-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.0 MB)
Installing collected packages: psycopg2-binary
Successfully installed psycopg2-binary-2.9.10
Python interpreter will be restarted.


In [0]:
# CELL 3: Test PostgreSQL connection
try:
    conn = psycopg2.connect(**PG_CONFIG)
    cur = conn.cursor()
    cur.execute("SELECT version();")
    version = cur.fetchone()
    print(f"✅ PostgreSQL connection successful: {version[0]}")
    cur.close()
    conn.close()
except Exception as e:
    print(f"❌ PostgreSQL connection failed: {e}")

✅ PostgreSQL connection successful: PostgreSQL 17.4 on x86_64-pc-linux-gnu, compiled by gcc (GCC) 12.4.0, 64-bit


In [0]:
# CELL 4: Create the bankdata database (if needed)
try:
    conn = psycopg2.connect(**PG_CONFIG)
    conn.autocommit = True
    cur = conn.cursor()
    
    cur.execute("CREATE DATABASE bankdata;")
    print("✅ Database 'bankdata' created successfully")
    
    cur.close()
    conn.close()
except psycopg2.errors.DuplicateDatabase:
    print("⚠️ Database 'bankdata' already exists")
except Exception as e:
    print(f"❌ Error creating database: {e}")

⚠️ Database 'bankdata' already exists


In [0]:
# CELL 5: Connect to your bankdata database and create tables
PG_CONFIG['database'] = 'bankdata'  # Switch to your target database

try:
    conn = psycopg2.connect(**PG_CONFIG)
    cur = conn.cursor()
    
    # Create your tracking table
    cur.execute("""
        CREATE TABLE IF NOT EXISTS processed_chunks (
            chunk_id TEXT PRIMARY KEY,
            processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );
    """)
    conn.commit()
    print("✅ Tables created successfully")
    
    cur.close()
    conn.close()
except Exception as e:
    print(f"❌ Error creating tables: {e}")


✅ Tables created successfully


In [0]:
# CELL 6: Set up S3 configuration (only if you need S3 access)
if 'AWS_ACCESS_KEY_ID' in os.environ:
    # Configure Spark for S3 access
    sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", os.environ['AWS_ACCESS_KEY_ID'])
    sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", os.environ['AWS_SECRET_ACCESS_KEY'])
    sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.amazonaws.com")
    
    # Set up boto3 S3 client
    session = boto3.Session(
        aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
        aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"]
    )
    s3 = session.client('s3', config=Config(signature_version='s3v4'))
    print("✅ S3 configuration complete")

✅ S3 configuration complete


In [0]:
# CELL 7: Your utility functions
def get_processed_chunks():
    conn = psycopg2.connect(**PG_CONFIG)
    cur = conn.cursor()
    cur.execute("SELECT chunk_id FROM processed_chunks")
    chunks = {row[0] for row in cur.fetchall()}
    cur.close()
    conn.close()
    return chunks

def list_s3_chunk_folders():
    response = s3.list_objects_v2(Bucket='banktransactionskrnl1', Prefix='chunks/', Delimiter='/')
    folders = [prefix['Prefix'].split('/')[-2] for prefix in response.get('CommonPrefixes', [])]
    return folders

def get_new_chunks():
    processed = get_processed_chunks()
    available = list_s3_chunk_folders()
    new_chunks = [chunk for chunk in available if chunk not in processed]
    return new_chunks

print("✅ All functions defined")

✅ All functions defined


In [0]:
# CELL 8: Test your pipeline
try:
    new_chunks = get_new_chunks()
    if new_chunks:
        print("🟢 New chunks found:")
        for chunk in new_chunks:
            print(f" - {chunk}")
    else:
        print("🟡 No new chunks found to process")
except Exception as e:
    print(f"❌ Error checking chunks: {e}")

🟢 New chunks found:
 - chunk_0_20250605035607
 - chunk_10_20250605040201
 - chunk_11_20250605040236
 - chunk_12_20250605040310
 - chunk_13_20250605040344
 - chunk_14_20250605040419
 - chunk_15_20250605040453
 - chunk_16_20250605040528
 - chunk_17_20250605040602
 - chunk_18_20250605040636
 - chunk_19_20250605040710
 - chunk_1_20250605035643
 - chunk_20_20250605040744
 - chunk_21_20250605040819
 - chunk_22_20250605040853
 - chunk_23_20250605040927
 - chunk_24_20250605041001
 - chunk_25_20250605041035
 - chunk_26_20250605041109
 - chunk_27_20250605041144
 - chunk_28_20250605041219
 - chunk_29_20250605041253
 - chunk_2_20250605035718
 - chunk_30_20250605041328
 - chunk_31_20250605041401
 - chunk_32_20250605041435
 - chunk_33_20250605041510
 - chunk_34_20250605041544
 - chunk_35_20250605041618
 - chunk_36_20250605041653
 - chunk_37_20250605041727
 - chunk_38_20250605041802
 - chunk_39_20250605041836
 - chunk_3_20250605035754
 - chunk_40_20250605041910
 - chunk_41_20250605041944
 - chunk_42_

### Detecting Patterns

In [0]:
from pyspark.sql.functions import col, count, avg, lit, current_timestamp, percent_rank
from pyspark.sql.functions import regexp_replace
from pyspark.sql.window import Window
from datetime import datetime
import uuid
import psycopg2

def mark_chunk_processed(chunk_id):
    conn = psycopg2.connect(**PG_CONFIG)
    cur = conn.cursor()
    cur.execute("INSERT INTO processed_chunks (chunk_id, processed_at) VALUES (%s, %s) ON CONFLICT DO NOTHING", 
                (chunk_id, datetime.now()))
    conn.commit()
    cur.close()
    conn.close()

def process_chunk(chunk_id):
    s3_path = f"s3a://banktransactionskrnl1/chunks/{chunk_id}/"
    print(f"📦 Processing chunk: {chunk_id}")
    
    # Load chunk
    df_chunk = spark.read.option("header", True).csv(s3_path)
    df_chunk = df_chunk.withColumn("amount", col("amount").cast("float"))
    
    ### -------- Pattern 1: UPGRADE -------- ###
    trans_df = df_chunk.groupBy("merchant", "customer").agg(count("*").alias("txn_count"))
    
    enriched_df = trans_df.join(cdf, on=["merchant", "customer"], how="left") \
                          .withColumn("Weight", col("Weight").cast("float")) \
                          .withColumn("txn_count", col("txn_count").cast("int"))
    
    merchant_total_txns = df_chunk.groupBy("merchant").agg(count("*").alias("total_txns")) \
                                  .filter(col("total_txns") > 50000)

    percentile_joined = enriched_df.join(merchant_total_txns, on="merchant", how="inner")

    window_spec = Window.partitionBy("merchant").orderBy(col("txn_count").desc())
    percent_ranked = percentile_joined.withColumn("rank", percent_rank().over(window_spec))

    pattern1_output = percent_ranked.filter((col("rank") <= 0.01) & (col("Weight") <= 0.01)) \
        .withColumn("YStartTime", lit(datetime.now().isoformat())) \
        .withColumn("detectionTime", current_timestamp()) \
        .withColumn("patternId", lit("PatId1")) \
        .withColumn("ActionType", lit("UPGRADE")) \
        .withColumnRenamed("customer", "customerName") \
        .withColumnRenamed("merchant", "MerchantId") \
        .select("YStartTime", "detectionTime", "patternId", "ActionType", "customerName", "MerchantId")

    if pattern1_output.count() > 0:
        print(f"✅ {pattern1_output.count()} UPGRADE detections found in chunk {chunk_id}")
        for i, batch in enumerate(pattern1_output.randomSplit([1.0] * ((pattern1_output.count() // 50) + 1))):
            if batch.count() > 0:
                output_path = f"s3a://banktransactionskrnl1/detections/test_detection_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:8]}.csv"
                batch.write.mode("overwrite").option("header", True).csv(output_path)
                print(f"💾 Saved UPGRADE batch {i+1} to {output_path}")
    else:
        print(f"⚪ No Pattern 1 detections for chunk {chunk_id}")
    
    ### -------- Pattern 2: CHILD -------- ###
    grouped_df = df_chunk.groupBy("customer", "merchant") \
                         .agg(count("*").alias("txn_count"), avg("amount").alias("avg_amt"))

    pattern2_output = grouped_df.filter((col("avg_amt") < 23) & (col("txn_count") >= 80)) \
        .withColumn("YStartTime", lit(datetime.now().isoformat())) \
        .withColumn("detectionTime", current_timestamp()) \
        .withColumn("patternId", lit("PatId2")) \
        .withColumn("ActionType", lit("CHILD")) \
        .withColumnRenamed("customer", "customerName") \
        .withColumnRenamed("merchant", "MerchantId") \
        .select("YStartTime", "detectionTime", "patternId", "ActionType", "customerName", "MerchantId")

    if pattern2_output.count() > 0:
        print(f"✅ {pattern2_output.count()} CHILD detections found in chunk {chunk_id}")
        for i, batch in enumerate(pattern2_output.randomSplit([1.0] * ((pattern2_output.count() // 50) + 1))):
            if batch.count() > 0:
                output_path = f"s3a://banktransactionskrnl1/detections/test_detection_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:8]}.csv"
                batch.write.mode("overwrite").option("header", True).csv(output_path)
                print(f"💾 Saved CHILD batch {i+1} to {output_path}")
    else:
        print(f"⚪ No Pattern 2 detections for chunk {chunk_id}")
    

    ### -------- Pattern 3: DEI-NEEDED -------- ###  

    cleans_df = df_chunk.withColumn("gender_clean", regexp_replace(col("gender"), "'", ""))

    gender_df = cleans_df.select("merchant", "gender_clean") \
                        .filter(col("gender_clean").isin("F", "M"))

    # 🛠️ Use pivot with fixed values, fill missing with 0
    gender_counts = gender_df.groupBy("merchant") \
        .pivot("gender_clean", ["F", "M"]) \
        .count() \
        .na.fill(0)

    # Now F and M columns are guaranteed to exist
    pattern3_matches = gender_counts.filter((col("F") > 100) & (col("F") < col("M")))

    pattern3_output = pattern3_matches.withColumn("YStartTime", lit(datetime.now().isoformat())) \
        .withColumn("detectionTime", current_timestamp()) \
        .withColumn("patternId", lit("PatId3")) \
        .withColumn("ActionType", lit("DEI-NEEDED")) \
        .withColumnRenamed("merchant", "MerchantId") \
        .withColumn("customerName", lit("")) \
        .select("YStartTime", "detectionTime", "patternId", "ActionType", "customerName", "MerchantId")

    if pattern3_output.count() > 0:
        print(f"✅ {pattern3_output.count()} DEI-NEEDED detections found in chunk {chunk_id}")
        for i, batch in enumerate(pattern3_output.randomSplit([1.0] * ((pattern3_output.count() // 50) + 1))):
            if batch.count() > 0:
                output_path = f"s3a://banktransactionskrnl1/detections/test_detection_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:8]}.csv"
                batch.write.mode("overwrite").option("header", True).csv(output_path)
                print(f"💾 Saved DEI-NEEDED batch {i+1} to {output_path}")
    else:
        print(f"⚪ No Pattern 3 detections for chunk {chunk_id}")
    
    # ✅ Mark chunk as processed
    mark_chunk_processed(chunk_id)
    print(f"✅ Marked chunk {chunk_id} as processed\n")


In [0]:
cdf = spark.read.option("header", True).csv("dbfs:/FileStore/tables/CustomerImportance.csv")
cdf = cdf.withColumnRenamed("Source", "customer") \
                                               .withColumnRenamed("Target", "merchant") \
                                               .withColumnRenamed("typeTrans", "category") \
                                               .withColumn("Weight", col("Weight").cast("float"))


In [0]:
grouped_df = df_chunk.groupBy("customer", "merchant") \
                         .agg(count("*").alias("txn_count"), avg("amount").alias("avg_amt"))
grouped_df.orderBy("txn_count", ascending=False).show()
grouped_df.filter(col("txn_count") >= 80).orderBy("avg_amt").show()



In [0]:
from pyspark.sql.functions import col, regexp_replace

# 🧼 Step 1: Clean gender values by removing apostrophes
cleans_df = df_chunk.withColumn("gender_clean", regexp_replace(col("gender"), "'", ""))

# 🔍 Step 2: Filter only valid gender values ("F", "M")
gender_df = cleans_df.select("merchant", "gender_clean") \
                     .filter(col("gender_clean").isin("F", "M"))

# ✅ Optional: View unique cleaned gender values to verify
gender_df.select("gender_clean").distinct().show()

# 📊 Step 3: Pivot table to count F and M by merchant
gender_counts = gender_df.groupBy("merchant") \
    .pivot("gender_clean", ["F", "M"]) \
    .count() \
    .na.fill(0)

# 👀 Step 4: Display top merchants with most female customers
gender_counts.orderBy("F", ascending=False).show()




In [0]:
df_chunk.select("gender").distinct().show()




#### Testing a sample chunk

In [0]:
process_chunk("chunk_0_20250604172038/")



In [0]:
import time
import boto3
import re
import psycopg2

# AWS S3 setup
s3 = boto3.client('s3')
bucket = 'banktransactionskrnl1'
prefix = 'chunks/'

# Function to detect unprocessed chunks
def detect_unprocessed_chunks():
    result = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)
    all_chunks = set()
    for obj in result.get('Contents', []):
        match = re.match(r'chunks/(.+?)/', obj['Key'])
        if match:
            all_chunks.add(match.group(1))
    
    # Fetch already-processed chunks from PostgreSQL
    conn = psycopg2.connect(**PG_CONFIG)
    cur = conn.cursor()
    cur.execute("SELECT chunk_id FROM processed_chunks")
    processed = {row[0] for row in cur.fetchall()}
    cur.close()
    conn.close()

    return sorted(all_chunks - processed)

# 🔁 Polling loop with auto-exit after 5 minutes
MAX_DURATION_SECONDS = 5 * 60  
start_time = time.time()
idle_counter = 0

print("🔄 Starting Mechanism Y polling (5-minute session)...")

while time.time() - start_time < MAX_DURATION_SECONDS:
    unprocessed_chunks = detect_unprocessed_chunks()

    if unprocessed_chunks:
        if idle_counter > 0:
            print(f"\n🟢 New chunks found after {idle_counter} idle checks!")
            idle_counter = 0
        for chunk_id in unprocessed_chunks:
            process_chunk(chunk_id)
    else:
        if idle_counter == 0:
            print("⏳ Waiting for new chunks", end="", flush=True)
        else:
            print(".", end="", flush=True)
        idle_counter += 1

    time.sleep(1)

print("\n✅ 5 minutes passed. Stopping Mechanism Y polling.")



🔄 Starting Mechanism Y polling (5-minute session)...
📦 Processing chunk: chunk_0_20250605035607
⚪ No Pattern 1 detections for chunk chunk_0_20250605035607
⚪ No Pattern 2 detections for chunk chunk_0_20250605035607
⚪ No Pattern 3 detections for chunk chunk_0_20250605035607
✅ Marked chunk chunk_0_20250605035607 as processed

📦 Processing chunk: chunk_10_20250605040201
⚪ No Pattern 1 detections for chunk chunk_10_20250605040201
⚪ No Pattern 2 detections for chunk chunk_10_20250605040201
⚪ No Pattern 3 detections for chunk chunk_10_20250605040201
✅ Marked chunk chunk_10_20250605040201 as processed

📦 Processing chunk: chunk_11_20250605040236
⚪ No Pattern 1 detections for chunk chunk_11_20250605040236
⚪ No Pattern 2 detections for chunk chunk_11_20250605040236
⚪ No Pattern 3 detections for chunk chunk_11_20250605040236
✅ Marked chunk chunk_11_20250605040236 as processed

📦 Processing chunk: chunk_12_20250605040310
⚪ No Pattern 1 detections for chunk chunk_12_20250605040310
⚪ No Pattern 2 de

#### Logs in Postgres within Databricks itself

In [0]:
rds_url = "jdbc:postgresql://bankdb-1.cvw2g6mimvrd.ap-south-1.rds.amazonaws.com:5432/bankdata"
rds_properties = {
    "user": "postgres",
    "password": "Bappa143",
    "driver": "org.postgresql.Driver"
}

# Read from RDS table
processed_df = spark.read.jdbc(
    url=rds_url,
    table="processed_chunks",
    properties=rds_properties
)

# Show latest 10 entries
processed_df.orderBy("processed_at", ascending=False).show(10, truncate=False)


+-----------------------+--------------------------+
|chunk_id               |processed_at              |
+-----------------------+--------------------------+
|chunk_9_20250605040127 |2025-06-05 05:01:33.271735|
|chunk_8_20250605040051 |2025-06-05 05:01:16.478478|
|chunk_7_20250605040015 |2025-06-05 05:00:58.260752|
|chunk_6_20250605035939 |2025-06-05 05:00:41.630211|
|chunk_5_20250605035904 |2025-06-05 05:00:20.175719|
|chunk_59_20250605043002|2025-06-05 04:59:53.54403 |
|chunk_58_20250605042925|2025-06-05 04:59:25.364176|
|chunk_57_20250605042851|2025-06-05 04:59:08.071549|
|chunk_56_20250605042817|2025-06-05 04:58:50.877979|
|chunk_55_20250605042744|2025-06-05 04:58:27.70714 |
+-----------------------+--------------------------+
only showing top 10 rows

