# 💳 Financial Transaction Monitoring System – PySpark Edition

This PySpark-based notebook simulates 50,000+ financial transactions and applies enterprise-grade fraud detection logic including geo-anomaly detection, transaction velocity checks, and dynamic risk scoring. Built using Apache Spark to handle scale and distributed data processing.


In [11]:
import os
import sys

# Set python executable path
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

In [12]:
import pyspark

# Initialize Spark

In [13]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, lag, unix_timestamp, lit
from pyspark.sql.window import Window

spark = SparkSession.builder.appName("FraudDetection").getOrCreate()

In [14]:
spark

# Generate Synthetic Data

In [15]:
import random
from datetime import datetime, timedelta
import pandas as pd

cities = ['Delhi', 'Mumbai', 'New York', 'Tokyo', 'Berlin']
merchants = ['Amazon', 'Walmart', 'Netflix', 'Uber', 'Zomato']

def generate_transaction():
    return {
        'transaction_id': f'TXN{random.randint(100000, 999999)}',
        'timestamp': datetime.now() - timedelta(minutes=random.randint(0, 43200)),
        'account_id': f'ACC{random.randint(1000, 9999)}',
        'merchant': random.choice(merchants),
        'amount': round(random.uniform(10, 15000), 2),
        'location': random.choice(cities)
    }

df_pd = pd.DataFrame([generate_transaction() for _ in range(50000)])
df_spark = spark.createDataFrame(df_pd)
df_spark = df_spark.withColumn("timestamp", col("timestamp").cast("timestamp"))
df_spark.show(5)

+--------------+--------------------+----------+--------+--------+--------+
|transaction_id|           timestamp|account_id|merchant|  amount|location|
+--------------+--------------------+----------+--------+--------+--------+
|     TXN356652|2025-04-06 18:31:...|   ACC4277|  Amazon|10132.22|  Mumbai|
|     TXN786016|2025-04-25 04:55:...|   ACC9218|  Amazon|13814.53|  Berlin|
|     TXN614384|2025-04-27 10:31:...|   ACC2390| Netflix|   350.9|  Berlin|
|     TXN704476|2025-04-29 05:02:...|   ACC3600|  Zomato|11244.37|   Tokyo|
|     TXN189813|2025-04-23 11:36:...|   ACC5608| Netflix| 2410.97|  Berlin|
+--------------+--------------------+----------+--------+--------+--------+
only showing top 5 rows



# Add Lag Columns for Previous Location + Time

In [16]:
window = Window.partitionBy("account_id").orderBy("timestamp")

df_spark = df_spark \
    .withColumn("prev_location", lag("location").over(window)) \
    .withColumn("prev_time", lag("timestamp").over(window)) \
    .withColumn("time_diff_min", 
        (unix_timestamp("timestamp") - unix_timestamp("prev_time")) / 60
    )

# Define Fraud Rules + Risk Score

In [17]:
df_spark = df_spark \
    .withColumn("geo_anomaly", 
        when(
            (col("location") != col("prev_location")) & (col("time_diff_min") < 10), lit(True)
        ).otherwise(lit(False))
    ) \
    .withColumn("risk_score", 
        when(col("amount") > 5000, 30).otherwise(0) +
        when(~col("location").isin("Delhi", "Mumbai"), 30).otherwise(0) +
        when(col("geo_anomaly") == True, 40).otherwise(0)
    )

# Show High-Risk Transactions

In [None]:
high_risk = df_spark.filter(col("risk_score") > 70).cache()
#print(f"🚨 High-Risk Transaction Count: {high_risk.count()}")
high_risk.select("transaction_id", "account_id", "location", "amount", "risk_score").show(5)

# Save to CSV (Simulate Redshift/Snowflake Ingestion)

In [None]:
high_risk.coalesce(1).write.csv("high_risk_transactions_output", header=True, mode="overwrite")
print("✅ High-risk transactions saved to 'high_risk_transactions_output/'")

# (Optional) Convert for Matplotlib Graphing (via Pandas)

In [None]:
df_plot = df_spark.select("risk_score").toPandas()

import matplotlib.pyplot as plt
import seaborn as sns

plt.figure(figsize=(10,5))
sns.histplot(df_plot["risk_score"], bins=20, color="red", kde=True)
plt.title("Fraud Risk Score Distribution")
plt.xlabel("Risk Score (0–100)")
plt.ylabel("Transaction Count")
plt.grid(True)
plt.show()

## ✅ Conclusion

- 50,000+ transactions simulated in-memory and converted to Spark DataFrame
- Advanced rule-based fraud scoring logic applied (geo-anomaly, velocity, cross-border)
- Risk scoring model (0–100) generated and filtered
- Saved high-risk results for dashboarding or alert integration
- Ready for pipeline orchestration using Spark on AWS Glue, EMR, or Databricks
