In [0]:
pip install sqlalchemy

Python interpreter will be restarted.
Collecting sqlalchemy
  Downloading sqlalchemy-2.0.41-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.2 MB)
Collecting typing-extensions>=4.6.0
  Downloading typing_extensions-4.14.0-py3-none-any.whl (43 kB)
Collecting greenlet>=1
  Downloading greenlet-3.2.2-cp39-cp39-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl (579 kB)
Installing collected packages: typing-extensions, greenlet, sqlalchemy
  Attempting uninstall: typing-extensions
    Found existing installation: typing-extensions 4.1.1
    Not uninstalling typing-extensions at /databricks/python3/lib/python3.9/site-packages, outside environment /local_disk0/.ephemeral_nfs/envs/pythonEnv-afe2b8f3-2b1c-4d20-88bb-00701520766e
    Can't uninstall 'typing-extensions'. No files were found to uninstall.
Successfully installed greenlet-3.2.2 sqlalchemy-2.0.41 typing-extensions-4.14.0
Python interpreter will be restarted.


In [0]:
# transaction_pattern_detection.py
import io
import os
import time
import threading
import pandas as pd
import boto3
import psycopg2
import logging
from datetime import datetime
from sqlalchemy import create_engine
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window




In [0]:
from pyspark.sql.functions import col, percent_rank


#### Below file contains the security keys and other secret information needed

In [0]:
%run ./aws_postgres_keys

In [0]:
# CONFIG
S3_BUCKET = 'my-databricks-bucket45'
S3_INPUT_PREFIX = 'input_chunks/'
S3_DETECTION_PREFIX = 'detections/'
TRANSACTIONS_S3_PATH = f's3a://{S3_BUCKET}/data/transactions.csv'
IMPORTANCE_S3_PATH = f's3a://{S3_BUCKET}/data/CustomerImportance.csv'
CHUNK_SIZE = 10000
CHUNK_INTERVAL = 1  # seconds

# AWS RDS PostgreSQL credentials
RDS_PORT = '5432'
RDS_DBNAME = 'spark'
POSTGRES_URI = f'postgresql://{RDS_USER}:{RDS_PASSWORD}@{RDS_HOST}:{RDS_PORT}/{RDS_DBNAME}'

# INIT
spark = SparkSession.builder \
    .appName("TransactionPatternDetection") \
    .config("spark.hadoop.fs.s3a.access.key", AWS_ACCESS_KEY_ID) \
    .config("spark.hadoop.fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY) \
    .config("spark.hadoop.fs.s3a.session.token", AWS_SESSION_TOKEN) \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()

s3 = boto3.client(
    's3',
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
    aws_session_token=AWS_SESSION_TOKEN if AWS_SESSION_TOKEN else None
)
pg_engine = create_engine(POSTGRES_URI)




In [0]:
class ChunkUploader:
    def __init__(self):
        self.offset = 0
        response = s3.get_object(Bucket=S3_BUCKET, Key="transactions.csv")
        self.df = pd.read_csv(io.BytesIO(response['Body'].read()))

    def upload_chunk(self):
        with threading.Lock():
            if self.offset >= len(self.df):
                return

            chunk = self.df.iloc[self.offset:self.offset + CHUNK_SIZE]
            timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
            local_path = f"/tmp/transactions_chunk_{timestamp}.csv"
            chunk.to_csv(local_path, index=False)

            s3_key = f"{S3_INPUT_PREFIX}transactions_chunk_{timestamp}.csv"
            s3.upload_file(local_path, S3_BUCKET, s3_key)
            print(f"Uploaded chunk {self.offset} to S3: {s3_key}")

            self.offset += CHUNK_SIZE
        threading.Timer(CHUNK_INTERVAL, self.upload_chunk).start()


# MECHANISM Y

def detect_patterns(transactions_df):
    detections = []
    current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    df = transactions_df.withColumn("Amount", col("amount").cast("float"))

    # Store merchant tx count to PostgreSQL
    merchant_counts = df.groupBy("merchant").count()
    merchant_counts_df = merchant_counts.toPandas()
    merchant_counts_df.to_sql("merchant_txn_counts", pg_engine, if_exists='replace', index=False)

    # PatId1
    merchant_txn_counts = transactions_df.groupBy("merchant").count().withColumnRenamed("count", "merchant_txn_count")
    active_merchants = merchant_txn_counts.filter(col("merchant_txn_count") > 50000)
    filtered_txns = transactions_df.join(active_merchants, on="merchant", how="inner")
    customer_txn_counts = filtered_txns.groupBy("merchant", "customer") \
                                       .agg(count("*").alias("txn_count"), sum("amount").alias("total_weight"))
    txn_rank_window = Window.partitionBy("merchant").orderBy(col("txn_count").desc())
    ranked_customers = customer_txn_counts \
        .withColumn("txn_rank", percent_rank().over(txn_rank_window))
    pattern_df = ranked_customers \
        .filter((col("txn_rank") <= 0.01))
    weight_rank_window = Window.partitionBy("merchant").orderBy(col("total_weight").asc())
    ranked_customers_1 = pattern_df \
        .withColumn("weight_rank", percent_rank().over(weight_rank_window))
    pattern_df_1 = ranked_customers_1 \
        .filter((col("weight_rank") <= 0.01)) \
        .withColumn("actionType", lit("UPGRADE"))

    for row in pattern_df_1.select(col("customer"), col("merchant")).distinct().collect():
            detections.append((current_time, current_time, "PatId1", "UPGRADE", row['customer'], row['merchant']))

    # PatId2
    avg_txn = transactions_df.groupBy("customer", "merchant") \
        .agg(avg("Amount").alias("avg_amt"), count("*").alias("txn_count")) \
        .filter((col("avg_amt") < 23) & (col("txn_count") >= 80))
    for row in avg_txn.collect():
        detections.append((current_time, current_time, "PatId2", "CHILD", row['customer'], row['merchant']))

    # PatId3
    gender_df = transactions_df.groupBy("merchant", "gender").count().groupBy("merchant").pivot("gender").sum("count").na.fill(0)
    gender_filtered = gender_df.filter((col("F") < col("M")) & (col("F") > 100))
    for row in gender_filtered.select("merchant").collect():
        detections.append((current_time, current_time, "PatId3", "DEI-NEEDED", "", row['merchant']))

    return detections


def process_stream():
    obj = s3.get_object(Bucket=S3_BUCKET, Key='CustomerImportance.csv')
    importance_pd_df = pd.read_csv(io.BytesIO(obj['Body'].read()))
    importance_df = spark.createDataFrame(importance_pd_df)

    # Clean quotes and rename columns
    for col_name in ['Source', 'Target', 'typeTrans']:
        importance_df = importance_df.withColumn(col_name, regexp_replace(col_name, "'", ""))
    importance_df = importance_df.withColumnRenamed("Source", "customer") \
                                 .withColumnRenamed("Target", "merchant") \
                                 .withColumnRenamed("typeTrans", "category")

    processed_files = set()

    while True:
        try:
            objects = s3.list_objects_v2(Bucket=S3_BUCKET, Prefix=S3_INPUT_PREFIX).get('Contents', [])
            new_files = [o['Key'] for o in objects if not o['Key'].endswith('/')]

            
            unprocessed_files = [key for key in new_files if key not in processed_files]

            if not unprocessed_files:
                print("All files processed. Exiting.")
                break

            # Combine all unprocessed files into one DataFrame
            dfs = []
            for key in unprocessed_files:
                obj = s3.get_object(Bucket=S3_BUCKET, Key=key)
                df_pd = pd.read_csv(io.BytesIO(obj['Body'].read()))
                dfs.append(spark.createDataFrame(df_pd))

            # Combine all Spark DataFrames into one
            combined_df = dfs[0]
            for df_part in dfs[1:]:
                combined_df = combined_df.unionByName(df_part)

            # Clean customer, merchant, category
            for col_name in ['customer', 'age', 'gender','zipcodeOri','merchant','zipMerchant','category']:
                combined_df = combined_df.withColumn(col_name, regexp_replace(col_name, "'", ""))

            detections = detect_patterns(combined_df)

            for i in range(0, len(detections), 50):
                batch = detections[i:i + 50]
                out_df = pd.DataFrame(batch, columns=["YStartTime", "detectionTime", "patternId", "ActionType", "customerName", "MerchantId"])
                timestamp = datetime.now().strftime("%Y%m%d%H%M%S%f")
                local_file = f"/tmp/detections_{timestamp}.csv"
                out_df.to_csv(local_file, index=False)
                s3.upload_file(local_file, S3_BUCKET, f"{S3_DETECTION_PREFIX}detections_{timestamp}.csv")
                print(f"Uploaded detections_{timestamp}.csv")

            # Mark all files as processed
            processed_files.update(unprocessed_files)
            print(f"Processed {len(unprocessed_files)} files together.")

        except Exception as e:
            logging.exception("Error in stream processing")
            time.sleep(10)

# MAIN
if __name__ == '__main__':
    uploader = ChunkUploader()
    uploader.upload_chunk()
    process_stream()

Uploaded chunk 0 to S3: input_chunks/transactions_chunk_20250605165232.csv
Uploaded chunk 10000 to S3: input_chunks/transactions_chunk_20250605165234.csv
Uploaded chunk 20000 to S3: input_chunks/transactions_chunk_20250605165235.csv
Uploaded chunk 30000 to S3: input_chunks/transactions_chunk_20250605165237.csv
Uploaded chunk 40000 to S3: input_chunks/transactions_chunk_20250605165238.csv
Uploaded chunk 50000 to S3: input_chunks/transactions_chunk_20250605165240.csv
Uploaded chunk 60000 to S3: input_chunks/transactions_chunk_20250605165241.csv
Uploaded chunk 70000 to S3: input_chunks/transactions_chunk_20250605165243.csv
Uploaded chunk 80000 to S3: input_chunks/transactions_chunk_20250605165244.csv
Uploaded chunk 90000 to S3: input_chunks/transactions_chunk_20250605165246.csv
Uploaded chunk 100000 to S3: input_chunks/transactions_chunk_20250605165247.csv
Uploaded chunk 110000 to S3: input_chunks/transactions_chunk_20250605165249.csv
Uploaded chunk 120000 to S3: input_chunks/transactions

In [0]:
class ChunkUploader:
    def __init__(self):
        self.offset = 0
        response = s3.get_object(Bucket=S3_BUCKET, Key="transactions.csv")
        self.df = pd.read_csv(io.BytesIO(response['Body'].read()))

    def upload_chunk(self):
        with threading.Lock():
            if self.offset >= len(self.df):
                return

            chunk = self.df.iloc[self.offset:self.offset + CHUNK_SIZE]
            timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
            local_path = f"/tmp/transactions_chunk_{timestamp}.csv"
            chunk.to_csv(local_path, index=False)

            s3_key = f"{S3_INPUT_PREFIX}transactions_chunk_{timestamp}.csv"
            s3.upload_file(local_path, S3_BUCKET, s3_key)
            print(f"Uploaded chunk {self.offset} to S3: {s3_key}")

            self.offset += CHUNK_SIZE
        threading.Timer(CHUNK_INTERVAL, self.upload_chunk).start()


# MECHANISM Y

def detect_patterns(transactions_df):
    detections = []
    current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    df = transactions_df.withColumn("Amount", col("amount").cast("float"))

    # Store merchant tx count to PostgreSQL
    merchant_counts = df.groupBy("merchant").count()
    merchant_counts_df = merchant_counts.toPandas()
    merchant_counts_df.to_sql("merchant_txn_counts", pg_engine, if_exists='replace', index=False)

    # PatId1
    merchant_txn_counts = transactions_df.groupBy("merchant").count().withColumnRenamed("count", "merchant_txn_count")
    active_merchants = merchant_txn_counts.filter(col("merchant_txn_count") > 50000)
    filtered_txns = transactions_df.join(active_merchants, on="merchant", how="inner")
    customer_txn_counts = filtered_txns.groupBy("merchant", "customer") \
                                       .agg(count("*").alias("txn_count"), sum("amount").alias("total_weight"))
    txn_rank_window = Window.partitionBy("merchant").orderBy(col("txn_count").desc())
    ranked_customers = customer_txn_counts \
        .withColumn("txn_rank", percent_rank().over(txn_rank_window))
    pattern_df = ranked_customers \
        .filter((col("txn_rank") <= 0.01))
    weight_rank_window = Window.partitionBy("merchant").orderBy(col("total_weight").asc())
    ranked_customers_1 = pattern_df \
        .withColumn("weight_rank", percent_rank().over(weight_rank_window))
    pattern_df_1 = ranked_customers_1 \
        .filter((col("weight_rank") <= 0.01)) \
        .withColumn("actionType", lit("UPGRADE"))

    for row in pattern_df_1.select(col("customer"), col("merchant")).distinct().collect():
            detections.append((current_time, current_time, "PatId1", "UPGRADE", row['customer'], row['merchant']))

    # PatId2
    avg_txn = transactions_df.groupBy("customer", "merchant") \
        .agg(avg("Amount").alias("avg_amt"), count("*").alias("txn_count")) \
        .filter((col("avg_amt") < 23) & (col("txn_count") >= 80))
    for row in avg_txn.collect():
        detections.append((current_time, current_time, "PatId2", "CHILD", row['customer'], row['merchant']))

    # PatId3
    gender_df = transactions_df.groupBy("merchant", "gender").count().groupBy("merchant").pivot("gender").sum("count").na.fill(0)
    gender_filtered = gender_df.filter((col("F") < col("M")) & (col("F") > 100))
    for row in gender_filtered.select("merchant").collect():
        detections.append((current_time, current_time, "PatId3", "DEI-NEEDED", "", row['merchant']))

    return detections


def process_stream():
    obj = s3.get_object(Bucket=S3_BUCKET, Key='CustomerImportance.csv')
    importance_pd_df = pd.read_csv(io.BytesIO(obj['Body'].read()))
    importance_df = spark.createDataFrame(importance_pd_df)

    # Clean quotes and rename columns
    for col_name in ['Source', 'Target', 'typeTrans']:
        importance_df = importance_df.withColumn(col_name, regexp_replace(col_name, "'", ""))
    importance_df = importance_df.withColumnRenamed("Source", "customer") \
                                 .withColumnRenamed("Target", "merchant") \
                                 .withColumnRenamed("typeTrans", "category")

    processed_files = set()

    while True:
        try:
            objects = s3.list_objects_v2(Bucket=S3_BUCKET, Prefix=S3_INPUT_PREFIX).get('Contents', [])
            new_files = [o['Key'] for o in objects if not o['Key'].endswith('/')]


            if len(new_files)==60:

            
                unprocessed_files = [key for key in new_files if key not in processed_files]

                if not unprocessed_files:
                    print("All files processed. Exiting.")
                    break

                # Combine all unprocessed files into one DataFrame
                dfs = []
                for key in unprocessed_files:
                    obj = s3.get_object(Bucket=S3_BUCKET, Key=key)
                    df_pd = pd.read_csv(io.BytesIO(obj['Body'].read()))
                    dfs.append(spark.createDataFrame(df_pd))

                # Combine all Spark DataFrames into one
                combined_df = dfs[0]
                for df_part in dfs[1:]:
                    combined_df = combined_df.unionByName(df_part)

                # Clean customer, merchant, category
                for col_name in ['customer', 'age', 'gender','zipcodeOri','merchant','zipMerchant','category']:
                    combined_df = combined_df.withColumn(col_name, regexp_replace(col_name, "'", ""))

                detections = detect_patterns(combined_df)

                for i in range(0, len(detections), 50):
                    batch = detections[i:i + 50]
                    out_df = pd.DataFrame(batch, columns=["YStartTime", "detectionTime", "patternId", "ActionType", "customerName", "MerchantId"])
                    timestamp = datetime.now().strftime("%Y%m%d%H%M%S%f")
                    local_file = f"/tmp/detections_{timestamp}.csv"
                    out_df.to_csv(local_file, index=False)
                    s3.upload_file(local_file, S3_BUCKET, f"{S3_DETECTION_PREFIX}detections_{timestamp}.csv")
                    print(f"Uploaded detections_{timestamp}.csv")

                # Mark all files as processed
                processed_files.update(unprocessed_files)
                print(f"Processed {len(unprocessed_files)} files together.")

        except Exception as e:
            logging.exception("Error in stream processing")
            time.sleep(10)

# MAIN
if __name__ == '__main__':
    # uploader = ChunkUploader()
    # uploader.upload_chunk()
    process_stream()

Uploaded detections_20250605170507240693.csv
Processed 60 files together.
All files processed. Exiting.


In [0]:
import zipfile

folder1 = 'input_chunks/'
folder2 = 'detections/'
output_file = 'output.zip'
OUTPUT_S3_KEY = f'zipped_output/{output_file}'

# Use /tmp for temporary storage
LOCAL_TMP_DIR = '/tmp/s3_zip_temp'
ZIP_PATH = f'{LOCAL_TMP_DIR}/{output_file}'

# Create temp folder
os.makedirs(LOCAL_TMP_DIR, exist_ok=True)


def download_folder(prefix):
    response = s3.list_objects_v2(Bucket=S3_BUCKET, Prefix=prefix)
    if 'Contents' not in response:
        print(f"No objects found in {prefix}")
        return []

    downloaded_files = []

    for obj in response['Contents']:
        key = obj['Key']
        if key.endswith('/'):  # Skip folder "keys"
            continue

        file_name = key.split('/')[-1]
        local_file_path = os.path.join(LOCAL_TMP_DIR, file_name)

        s3.download_file(S3_BUCKET, key, local_file_path)
        downloaded_files.append(local_file_path)

    return downloaded_files

# Download both folders
files1 = download_folder(folder1)
files2 = download_folder(folder2)

# Combine and zip
with zipfile.ZipFile(ZIP_PATH, 'w') as zipf:
    for file_path in files1 + files2:
        arcname = os.path.basename(file_path)  # Store just filename
        zipf.write(file_path, arcname)

print(f"Created ZIP: {ZIP_PATH}")

# Upload zip back to S3
s3.upload_file(ZIP_PATH, S3_BUCKET, OUTPUT_S3_KEY)
print(f"Uploaded ZIP to s3://{S3_BUCKET}/{OUTPUT_S3_KEY}")

Created ZIP: /tmp/s3_zip_temp/output.zip
Uploaded ZIP to s3://my-databricks-bucket45/zipped_output/output.zip
