In [0]:
import os

os.environ['AWS_ACCESS_KEY_ID'] = '***************'
os.environ['AWS_SECRET_ACCESS_KEY'] = '**************'

In [0]:
import boto3

session = boto3.Session(region_name = 'ap-south-1',aws_access_key_id = os.environ.get('AWS_ACCESS_KEY_ID'),
                    aws_secret_access_key = os.environ.get('AWS_SECRET_ACCESS_KEY'))
print('successfully aws session created')

In [0]:
aws_access_key = os.environ.get('AWS_ACCESS_KEY_ID')
aws_secret_key = os.environ.get('AWS_SECRET_ACCESS_KEY')
bucket_name = "pattern-detection-pyspark"
s3_folder = "streaming/input/"

spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", aws_access_key)
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", aws_secret_key)
spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.amazonaws.com")

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

transaction_schema = (
    StructType()
    .add("step", IntegerType())
    .add("customer", StringType())
    .add("age", IntegerType())
    .add("gender", StringType())
    .add("zipcodeOri", StringType())
    .add("merchant", StringType())
    .add("zipMerchant", StringType())
    .add("category", StringType())
    .add("amount", DoubleType())
    .add("fraud", IntegerType())
    .add("weight", DoubleType())
)

In [0]:
import time
import boto3
import psycopg2
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, avg, count, current_timestamp, row_number
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType

# ------------------ Spark Initialization ------------------
spark = SparkSession.builder.appName("Pattern-Detection-Pipeline").getOrCreate()

# ------------------ Configuration ------------------
bucket_name = "pattern-detection-pyspark"
input_prefix = "streaming/input/"
output_prefix = "streaming/output/"
y_start_time = time.strftime("%Y-%m-%d %H:%M:%S")

s3_client = boto3.client("s3")

pg_conn = psycopg2.connect(
    host="database-1.cl80waky6nkw.ap-south-1.rds.amazonaws.com",
    database="pattern_detection_db",
    user="postgres",
    password="rdspostgres"
)
pg_cursor = pg_conn.cursor()

# ------------------ Transaction Schema ------------------
transaction_schema = StructType([
    StructField("step", IntegerType(), True),
    StructField("customer", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("gender", StringType(), True),
    StructField("zipcodeOri", StringType(), True),
    StructField("merchant", StringType(), True),
    StructField("zipMerchant", StringType(), True),
    StructField("category", StringType(), True),
    StructField("amount", DoubleType(), True),
    StructField("fraud", IntegerType(), True),
    StructField("weight", DoubleType(), True),
])


pg_cursor.execute("""
CREATE TABLE IF NOT EXISTS processed_chunks (
    chunk_folder TEXT PRIMARY KEY
);
""")

pg_cursor.execute("""
CREATE TABLE IF NOT EXISTS pattern1_customer_merchant_counts (
    customer_name TEXT,
    merchant_id TEXT,
    txn_count BIGINT,
    detected BOOLEAN DEFAULT FALSE,
    PRIMARY KEY (customer_name, merchant_id)
);
""")

pg_cursor.execute("""
CREATE TABLE IF NOT EXISTS pattern2_customer_merchant_stats (
    customer_name TEXT,
    merchant_id TEXT,
    txn_count BIGINT,
    total_amount DOUBLE PRECISION,
    avg_amount DOUBLE PRECISION,
    detected BOOLEAN DEFAULT FALSE,
    PRIMARY KEY (customer_name, merchant_id)
);
""")

pg_cursor.execute("""
CREATE TABLE IF NOT EXISTS pattern3_merchant_gender_stats (
    merchant_id TEXT PRIMARY KEY,
    male_count BIGINT DEFAULT 0,
    female_count BIGINT DEFAULT 0,
    detected BOOLEAN DEFAULT FALSE
);
""")

pg_conn.commit()


def list_available_chunks():
    print("Listing available chunks from S3...")
    response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=input_prefix, Delimiter="/")
    return [obj["Prefix"] for obj in response.get("CommonPrefixes", [])]

def is_chunk_processed(folder_key):
    pg_cursor.execute("SELECT 1 FROM processed_chunks WHERE chunk_folder = %s", (folder_key,))
    return pg_cursor.fetchone() is not None

def mark_chunk_processed(folder_key):
    pg_cursor.execute("INSERT INTO processed_chunks (chunk_folder) VALUES (%s) ON CONFLICT DO NOTHING", (folder_key,))
    pg_conn.commit()

# ------------------ Pattern 1 ------------------
def update_stats_pattern1(df):
    print("Updating Pattern 1 stats...")
    counts_df = df.groupBy("merchant", "customer").agg(count("*").alias("txn_count"))
    for row in counts_df.toLocalIterator():
        pg_cursor.execute("""
            INSERT INTO pattern1_customer_merchant_counts (customer_name, merchant_id, txn_count)
            VALUES (%s, %s, %s)
            ON CONFLICT (customer_name, merchant_id) DO UPDATE SET
                txn_count = pattern1_customer_merchant_counts.txn_count + EXCLUDED.txn_count
        """, (row["customer"], row["merchant"], row["txn_count"]))
    pg_conn.commit()

def detect_pattern1():
    print("Detecting Pattern 1...")
    pg_cursor.execute("""
        SELECT merchant_id, SUM(txn_count) as total_txns
        FROM pattern1_customer_merchant_counts
        GROUP BY merchant_id
        HAVING SUM(txn_count) > 500
    """)
    merchants = pg_cursor.fetchall()
    detections = []
    for merchant_id, _ in merchants:
        pg_cursor.execute("""
            SELECT customer_name, txn_count FROM pattern1_customer_merchant_counts
            WHERE merchant_id = %s AND detected = FALSE
            ORDER BY txn_count DESC
        """, (merchant_id,))
        rows = pg_cursor.fetchall()
        if not rows:
            continue
        top_10_index = max(1, len(rows) // 10)
        for row in rows[:top_10_index]:
            detections.append((row[0], merchant_id))
    return format_detection_rows(detections, "PatId1", "UPGRADE")

# ------------------ Pattern 2 ------------------
def update_stats_pattern2(df):
    print("Updating Pattern 2 stats...")
    stats_df = df.groupBy("merchant", "customer").agg(
        count("*").alias("txn_count"),
        avg("amount").alias("avg_amount")
    )
    for row in stats_df.toLocalIterator():
        total_amt = row["avg_amount"] * row["txn_count"]
        pg_cursor.execute("""
            INSERT INTO pattern2_customer_merchant_stats (customer_name, merchant_id, txn_count, total_amount, avg_amount)
            VALUES (%s, %s, %s, %s, %s)
            ON CONFLICT (customer_name, merchant_id) DO UPDATE SET
                txn_count = pattern2_customer_merchant_stats.txn_count + EXCLUDED.txn_count,
                total_amount = pattern2_customer_merchant_stats.total_amount + EXCLUDED.total_amount,
                avg_amount = (pattern2_customer_merchant_stats.total_amount + EXCLUDED.total_amount) /
                             (pattern2_customer_merchant_stats.txn_count + EXCLUDED.txn_count)
        """, (row["customer"], row["merchant"], row["txn_count"], total_amt, row["avg_amount"]))
    pg_conn.commit()

def detect_pattern2():
    print("Detecting Pattern 2...")
    pg_cursor.execute("""
        SELECT customer_name, merchant_id
        FROM pattern2_customer_merchant_stats
        WHERE txn_count > 1 AND (total_amount / txn_count) < 800 AND detected = FALSE
    """)
    rows = pg_cursor.fetchall()
    return format_detection_rows(rows, "PatId2", "CHILD")

# ------------------ Pattern 3 ------------------
def update_stats_pattern3(df):
    print("Updating Pattern 3 stats...")
    gender_counts = df.groupBy("merchant", "gender").count().toLocalIterator()
    for row in gender_counts:
        if row["gender"] == "F":
            pg_cursor.execute("""
                INSERT INTO pattern3_merchant_gender_stats (merchant_id, female_count)
                VALUES (%s, %s)
                ON CONFLICT (merchant_id) DO UPDATE
                SET female_count = pattern3_merchant_gender_stats.female_count + EXCLUDED.female_count
            """, (row["merchant"], row["count"]))
        elif row["gender"] == "M":
            pg_cursor.execute("""
                INSERT INTO pattern3_merchant_gender_stats (merchant_id, male_count)
                VALUES (%s, %s)
                ON CONFLICT (merchant_id) DO UPDATE
                SET male_count = pattern3_merchant_gender_stats.male_count + EXCLUDED.male_count
            """, (row["merchant"], row["count"]))
    pg_conn.commit()

def detect_pattern3():
    print("Detecting Pattern 3...")
    pg_cursor.execute("""
        SELECT merchant_id FROM pattern3_merchant_gender_stats
        WHERE female_count > 10 AND female_count < male_count AND detected = FALSE
    """)
    rows = pg_cursor.fetchall()
    return format_detection_rows([(None, row[0]) for row in rows], "PatId3", "DEI-NEEDED")


def format_detection_rows(rows, pattern_id, action_type):
    schema = StructType([
        StructField("YStartTime", StringType(), True),
        StructField("detectionTime", TimestampType(), True),
        StructField("patternId", StringType(), True),
        StructField("actionType", StringType(), True),
        StructField("customerName", StringType(), True),
        StructField("merchantId", StringType(), True),
    ])
    
    if not rows:
        print(f">>> No rows to format for {pattern_id}")
        return spark.createDataFrame([], schema=schema)
    
    formatted = [(y_start_time, None, pattern_id, action_type, row[0] or "", row[1] or "") for row in rows]
    return spark.createDataFrame(formatted, schema=schema).withColumn("detectionTime", current_timestamp())


def process_chunk(folder_key):
    print(f"\n Processing chunk: {folder_key} ---")

    if is_chunk_processed(folder_key):
        print(f"Already processed: {folder_key}")
        return

    try:
        df_chunk = (
            spark.read
            .option("header", True)
            .schema(transaction_schema)
            .csv(f"s3a://{bucket_name}/{folder_key}")
        )

        if df_chunk.rdd.isEmpty():
            print(f"Chunk {folder_key} is empty. Skipping.")
            mark_chunk_processed(folder_key)
            return

        print("Updating pattern statistics...")
        update_stats_pattern1(df_chunk)
        update_stats_pattern2(df_chunk)
        update_stats_pattern3(df_chunk)

        # Detect patterns
        detections1 = detect_pattern1().filter(col("customerName") != "").filter(col("merchantId") != "")
        detections2 = detect_pattern2().filter(col("customerName") != "").filter(col("merchantId") != "")
        detections3 = detect_pattern3().filter(col("merchantId") != "")  # PatId3 may not have customerName

        # For Debugging
        if not detections1.rdd.isEmpty():
            print(">>> Pattern 1 detections (up to 5 rows):")
            detections1.show(5, truncate=False)
        else:
            print(">>> Pattern 1: No detections found.")

        if not detections2.rdd.isEmpty():
            print(">>> Pattern 2 detections (up to 5 rows):")
            detections2.show(5, truncate=False)
        else:
            print(">>> Pattern 2: No detections found.")

        if not detections3.rdd.isEmpty():
            print(">>> Pattern 3 detections (up to 5 rows):")
            detections3.show(5, truncate=False)
        else:
            print(">>> Pattern 3: No detections found.")

        detections = detections1.unionByName(detections2).unionByName(detections3)

        if detections.rdd.isEmpty():
            print(f"No patterns detected in {folder_key}")
            mark_chunk_processed(folder_key)
            return

        print(f"Total detections in {folder_key}: {detections.count()}")

        # Batch and write detections to S3
        detections = detections.withColumn("row_id", row_number().over(Window.orderBy("customerName", "merchantId")))
        total = detections.count()
        batch_count = (total + 99) // 100

        for batch_num in range(batch_count):
            batch_df = detections.filter(
                (col("row_id") > batch_num * 50) & (col("row_id") <= (batch_num + 1) * 50)
            ).drop("row_id")

            timestamp = int(time.time())
            output_path = f"s3a://{bucket_name}/{output_prefix}pattern_detected_{timestamp}_{batch_num}.csv"
            print(f"Writing batch {batch_num + 1}/{batch_count} to {output_path}")
            batch_df.coalesce(1).write.mode("overwrite").option("header", True).csv(output_path)

            # --- Update detected flags in DB ---
            for row in batch_df.select("patternId", "customerName", "merchantId").toLocalIterator():
                if row["patternId"] == "PatId1":
                    pg_cursor.execute("""
                        UPDATE pattern1_customer_merchant_counts
                        SET detected = TRUE
                        WHERE customer_name = %s AND merchant_id = %s
                    """, (row["customerName"], row["merchantId"]))
                elif row["patternId"] == "PatId2":
                    pg_cursor.execute("""
                        UPDATE pattern2_customer_merchant_stats
                        SET detected = TRUE
                        WHERE customer_name = %s AND merchant_id = %s
                    """, (row["customerName"], row["merchantId"]))
                elif row["patternId"] == "PatId3":
                    pg_cursor.execute("""
                        UPDATE pattern3_merchant_gender_stats
                        SET detected = TRUE
                        WHERE merchant_id = %s
                    """, (row["merchantId"],))

            pg_conn.commit()

        mark_chunk_processed(folder_key)
        print(f"Finished processing chunk: {folder_key}\n")

    except Exception as e:
        print(f"Error processing chunk {folder_key}: {e}")


In [0]:
def main():
    chunk_folders = list_available_chunks()
    
    for folder_key in chunk_folders:
        process_chunk(folder_key)

main()  