# Project: Bank Transaction Monitoring and Fraud Detection Pipeline 

## Project Details

In the modern digital banking ecosystem, millions of transactions occur daily through ATMs, online
portals, and physical branches. These transactions need to be analyzed for operational efficiency,
compliance reporting, and fraud detection.

As part of the Data Engineering team at a leading bank, your goal is to design and implement a robust
ETL pipeline that performs both:
    
- Batch Processing for historical transaction analysis and reporting.
- Real-time Stream Processing to identify potentially fraudulent transactions as they occur.

You will build this solution using a Big Data stack including HDFS, Hive, HBase, PySpark, SparkSQL
(DataFrame API), Spark Streaming, and Kafka.


### Project Goals

- Build a robust batch processing pipeline to clean, process, and analyze historical transactions.
- Develop a real-time streaming pipeline to detect suspicious activities and store alerts with low latency.
- Generate actionable analytical reports and insights using SparkSQL.
- Use industry-standard tools across data ingestion, storage, processing, and output layers.

### Dataset

- Batch Input File: banking_transaction.csv
- Real-Time Input: Same schema simulated using a Kafka producer.

### Dataset Fields

|       Column     |                 Description                  |
| :--------------: | -------------------------------------------- |
| transaction_id   | Unique transaction identifier                |
| customer_id      | ID of the customer                           |
| timestamp        | DateTime of the transaction (ISO format)     |
| amount           | Transaction amount in INR                    |
| transaction_type | Type of transaction: deposit / withdrawal    |
| channel          | Source of transaction: ATM / Online / Branch |
| status           | Transaction result: success / failed         |

---

## Setting ENV variables for kafka support

In [5]:
import os

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 pyspark-shell'
os.environ

environ{'LANG': 'C.UTF-8',
        'PATH': '/opt/anaconda3/bin:/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/opt/hadoop/bin',
        'HOME': '/home/shamithna75gedu',
        'LOGNAME': 'shamithna75gedu',
        'USER': 'shamithna75gedu',
        'SHELL': '/bin/bash',
        'INVOCATION_ID': 'd5d8282707a44410a02eb8595f14b87f',
        'JOURNAL_STREAM': '8:214551749',
        'RUNTIME_DIRECTORY': '/run/jupyter-shamithna75gedu-singleuser',
        'CPU_GUARANTEE': '1.0',
        'CPU_LIMIT': '1.0',
        'HADOOP_CONF_DIR': '/opt/hadoop/etc/hadoop',
        'JAVA_HOME': '/usr/lib/jvm/java-8-openjdk-amd64/',
        'JPY_API_TOKEN': '91fc0a54c3b346609e6238a734a45bb9',
        'JUPYTERHUB_ACTIVITY_URL': 'http://127.0.0.1:8081/jupyter/hub/api/users/shamithna75gedu/activity',
        'JUPYTERHUB_ADMIN_ACCESS': '1',
        'JUPYTERHUB_API_TOKEN': '91fc0a54c3b346609e6238a734a45bb9',
        'JUPYTERHUB_API_URL': 'http://127.0.0.1:8081/jupyter/hub/api',
        'JUPYTERHUB_BASE_UR

---

## Data Ingestion (Batch)

- Upload the sample CSV file into HDFS

In [1]:
!hdfs dfs -mkdir -p '/user/shamithna75gedu/banking/raw/'

In [2]:
!hdfs dfs -copyFromLocal 'dataset/banking_transaction.csv' '/user/shamithna75gedu/banking/raw'

copyFromLocal: `/user/shamithna75gedu/banking/raw/banking_transaction.csv': File exists


In [3]:
!hdfs dfs -ls '/user/shamithna75gedu/banking/raw/banking_transaction.csv'

-rw-r--r--   3 shamithna75gedu shamithna75gedu     173847 2025-06-26 06:39 /user/shamithna75gedu/banking/raw/banking_transaction.csv


- Validate schema and handle malformed or missing records

In [4]:
!hdfs dfs -head '/user/shamithna75gedu/banking/raw/banking_transaction.csv'

transaction_id,customer_id,timestamp,amount,transaction_type,channel,status
1,1082,2025-05-30T22:50:12,200.2,withdrawal,ATM,success
2,1095,2025-05-26T18:15:58,192.25,deposit,Branch,success
3,1004,2025-06-07T21:33:32,184.33,deposit,Branch,success
4,1072,2025-06-06T10:26:46,278.95,withdrawal,ATM,success
5,1036,2025-06-05T09:11:09,828.49,deposit,ATM,success
6,1044,2025-06-01T07:42:00,350.08,deposit,Online,success
7,1049,2025-05-30T12:32:31,187.04,withdrawal,Branch,success
8,1006,2025-06-21T00:31:29,756.76,deposit,Online,success
9,1038,2025-05-29T18:25:03,846.46,withdrawal,Branch,success
10,1009,2025-06-23T05:06:01,141.24,deposit,Online,failed
11,1030,2025-06-15T00:42:04,879.84,withdrawal,Online,success
12,1047,2025-05-27T02:09:22,246.39,withdrawal,ATM,success
13,1090,2025-05-26T23:50:34,942.99,deposit,Branch,success
14,1069,2025-05-30T02:17:22,756.21,deposit,Online,success
15,1082,2025-06-05T03:51:30,719.35,deposit,Branch,success
16,1099,2025-06-05T20:11:46,798.4,deposit,ATM,success
17,10

---

## Data Transformation with PySpark

In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, when, dayofmonth, hour

In [7]:
spark = SparkSession \
            .builder \
            .master('local[5]') \
            .appName('data-transformation-with-pyspark') \
            .enableHiveSupport() \
            .getOrCreate()

spark

:: loading settings :: url = jar:file:/opt/anaconda3/lib/python3.9/site-packages/pyspark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/shamithna75gedu/.ivy2/cache
The jars for the packages stored in: /home/shamithna75gedu/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-5c6bee15-d243-4929-83b1-cb089390eb1f;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.1.2 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.1.2 in central
	found org.apache.kafka#kafka-clients;2.6.0 in central
	found com.github.luben#zstd-jni;1.4.8-1 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.2 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.commons#commons-pool2;2.6.2 in central
:: resolution report :: resolve 817ms :: artifacts dl 30ms
	:: modules in use:
	com.github.luben#zstd-jni;1.4.8-1 from central in [default]
	org.apache.commons#commons-pool2;

- Load the HDFS data into a PySpark DataFrame

In [8]:
bank_transaction_df = spark \
                        .read \
                        .csv(
                            path='/user/shamithna75gedu/banking/raw/banking_transaction.csv',
                            inferSchema=True,
                            header=True)

bank_transaction_df.show()

                                                                                

+--------------+-----------+-------------------+------+----------------+-------+-------+
|transaction_id|customer_id|          timestamp|amount|transaction_type|channel| status|
+--------------+-----------+-------------------+------+----------------+-------+-------+
|             1|       1082|2025-05-30 22:50:12| 200.2|      withdrawal|    ATM|success|
|             2|       1095|2025-05-26 18:15:58|192.25|         deposit| Branch|success|
|             3|       1004|2025-06-07 21:33:32|184.33|         deposit| Branch|success|
|             4|       1072|2025-06-06 10:26:46|278.95|      withdrawal|    ATM|success|
|             5|       1036|2025-06-05 09:11:09|828.49|         deposit|    ATM|success|
|             6|       1044|2025-06-01 07:42:00|350.08|         deposit| Online|success|
|             7|       1049|2025-05-30 12:32:31|187.04|      withdrawal| Branch|success|
|             8|       1006|2025-06-21 00:31:29|756.76|         deposit| Online|success|
|             9|     

In [9]:
bank_transaction_df.printSchema()

root
 |-- transaction_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- amount: double (nullable = true)
 |-- transaction_type: string (nullable = true)
 |-- channel: string (nullable = true)
 |-- status: string (nullable = true)



In [10]:
bank_transaction_df = bank_transaction_df \
                            .withColumnRenamed('timestamp', 'txn_timestamp')

bank_transaction_df.printSchema()

root
 |-- transaction_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- txn_timestamp: timestamp (nullable = true)
 |-- amount: double (nullable = true)
 |-- transaction_type: string (nullable = true)
 |-- channel: string (nullable = true)
 |-- status: string (nullable = true)



In [11]:
bank_transaction_df.describe().show()

[Stage 3:>                                                          (0 + 1) / 1]

+-------+-----------------+------------------+-----------------+----------------+-------+-------+
|summary|   transaction_id|       customer_id|           amount|transaction_type|channel| status|
+-------+-----------------+------------------+-----------------+----------------+-------+-------+
|  count|             2900|              2900|             2900|            2900|   2900|   2900|
|   mean|           1450.5|1051.0475862068965|552.2341793103421|            null|   null|   null|
| stddev|837.3022154515058| 28.75166016835395|258.8857565238447|            null|   null|   null|
|    min|                1|              1001|           100.08|         deposit|    ATM| failed|
|    max|             2900|              1100|           999.49|      withdrawal| Online|success|
+-------+-----------------+------------------+-----------------+----------------+-------+-------+



                                                                                

Perform the following:

- Normalize text fields (`transaction_type`, `channel`) to lowercase.

In [12]:
bank_transaction_df = bank_transaction_df \
                        .withColumn('transaction_type', lower(col('transaction_type'))) \
                        .withColumn('channel', lower(col('channel')))
bank_transaction_df.show()

+--------------+-----------+-------------------+------+----------------+-------+-------+
|transaction_id|customer_id|      txn_timestamp|amount|transaction_type|channel| status|
+--------------+-----------+-------------------+------+----------------+-------+-------+
|             1|       1082|2025-05-30 22:50:12| 200.2|      withdrawal|    atm|success|
|             2|       1095|2025-05-26 18:15:58|192.25|         deposit| branch|success|
|             3|       1004|2025-06-07 21:33:32|184.33|         deposit| branch|success|
|             4|       1072|2025-06-06 10:26:46|278.95|      withdrawal|    atm|success|
|             5|       1036|2025-06-05 09:11:09|828.49|         deposit|    atm|success|
|             6|       1044|2025-06-01 07:42:00|350.08|         deposit| online|success|
|             7|       1049|2025-05-30 12:32:31|187.04|      withdrawal| branch|success|
|             8|       1006|2025-06-21 00:31:29|756.76|         deposit| online|success|
|             9|     

- Filter out failed transactions

In [13]:
cleaned_bank_transaction = bank_transaction_df \
                            .filter(col('status') != 'failed')

cleaned_bank_transaction.show()

+--------------+-----------+-------------------+------+----------------+-------+-------+
|transaction_id|customer_id|      txn_timestamp|amount|transaction_type|channel| status|
+--------------+-----------+-------------------+------+----------------+-------+-------+
|             1|       1082|2025-05-30 22:50:12| 200.2|      withdrawal|    atm|success|
|             2|       1095|2025-05-26 18:15:58|192.25|         deposit| branch|success|
|             3|       1004|2025-06-07 21:33:32|184.33|         deposit| branch|success|
|             4|       1072|2025-06-06 10:26:46|278.95|      withdrawal|    atm|success|
|             5|       1036|2025-06-05 09:11:09|828.49|         deposit|    atm|success|
|             6|       1044|2025-06-01 07:42:00|350.08|         deposit| online|success|
|             7|       1049|2025-05-30 12:32:31|187.04|      withdrawal| branch|success|
|             8|       1006|2025-06-21 00:31:29|756.76|         deposit| online|success|
|             9|     

- Add derived columns: 
    - txn_day
    - txn_hour
    - high_value_flag (if amount > ₹500).

In [14]:
cleaned_bank_transaction = cleaned_bank_transaction \
                                .withColumn('txn_day', dayofmonth(col('txn_timestamp'))) \
                                .withColumn('txn_hour', hour(col('txn_timestamp'))) \
                                .withColumn('high_value_flag', when(col('amount') > 500, 1).otherwise(0))

cleaned_bank_transaction.show()

+--------------+-----------+-------------------+------+----------------+-------+-------+-------+--------+---------------+
|transaction_id|customer_id|      txn_timestamp|amount|transaction_type|channel| status|txn_day|txn_hour|high_value_flag|
+--------------+-----------+-------------------+------+----------------+-------+-------+-------+--------+---------------+
|             1|       1082|2025-05-30 22:50:12| 200.2|      withdrawal|    atm|success|     30|      22|              0|
|             2|       1095|2025-05-26 18:15:58|192.25|         deposit| branch|success|     26|      18|              0|
|             3|       1004|2025-06-07 21:33:32|184.33|         deposit| branch|success|      7|      21|              0|
|             4|       1072|2025-06-06 10:26:46|278.95|      withdrawal|    atm|success|      6|      10|              0|
|             5|       1036|2025-06-05 09:11:09|828.49|         deposit|    atm|success|      5|       9|              1|
|             6|       1

---

## Hive-Based Reporting (Batch Analytics)

Create Hive external or managed tables for:
- Raw transactions
- Cleaned/processed data
- Report outputs

```sql
CREATE DATABASE IF NOT EXISTS banking_shamithna75gedu
LOCATION '/user/shamithna75gedu/banking';

use banking_shamithna75gedu;

CREATE EXTERNAL TABLE IF NOT EXISTS raw_transactions
(
  transaction_id INT,
  customer_id INT,
  txn_timestamp TIMESTAMP,
  amount DOUBLE,
  transaction_type STRING,
  status STRING
)
PARTITIONED BY (channel STRING)
STORED AS PARQUET
LOCATION '/user/shamithna75gedu/banking/parquet/raw_transactions/';

CREATE EXTERNAL TABLE IF NOT EXISTS transactions
 (
  transaction_id INT,
  customer_id INT,
  txn_timestamp TIMESTAMP,
  amount DOUBLE,
  transaction_type STRING,
  status STRING,
  txn_day STRING, 
  txn_hour INT , 
  high_value_flag INT
 ) 
PARTITIONED BY (channel STRING)
STORED AS PARQUET
LOCATION '/user/shamithna75gedu/banking/parquet/transactions/';
```

In [15]:
bank_transaction_df \
        .write \
        .mode('overwrite') \
        .format('parquet') \
        .partitionBy('channel') \
        .option('path', '/user/shamithna75gedu/banking/parquet/raw_transactions/') \
        .saveAsTable('banking_shamithna75gedu.raw_transactions')

25/06/26 12:48:10 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.


In [16]:
cleaned_bank_transaction \
    .write \
    .mode('overwrite') \
    .format('parquet') \
    .partitionBy('channel') \
    .option('path', '/user/shamithna75gedu/banking/parquet/transactions/') \
    .saveAsTable('banking_shamithna75gedu.transactions')

                                                                                

In [17]:
spark.stop()

```sql
SHOW PARTITIONS raw_transactions;
SHOW PARTITIONS transactions;

SELECT * FROM raw_transactions LIMIT 10;
SELECT * FROM transactions LIMIT 10;
```

---

## Analytical Queries

In [18]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, sum, month, dayofmonth, round, avg, window, collect_set, size

In [19]:
spark = SparkSession \
            .builder \
            .master('local[5]') \
            .appName('analytical-queries') \
            .enableHiveSupport() \
            .getOrCreate()

spark

25/06/26 12:48:11 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/06/26 12:48:11 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
25/06/26 12:48:11 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
25/06/26 12:48:11 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.
25/06/26 12:48:11 WARN Utils: Service 'SparkUI' could not bind on port 4044. Attempting port 4045.
25/06/26 12:48:11 WARN Utils: Service 'SparkUI' could not bind on port 4045. Attempting port 4046.
25/06/26 12:48:11 WARN Utils: Service 'SparkUI' could not bind on port 4046. Attempting port 4047.
25/06/26 12:48:11 WARN Utils: Service 'SparkUI' could not bind on port 4047. Attempting port 4048.


In [20]:
raw_bank_transaction_df = spark \
                        .read \
                        .table('banking_shamithna75gedu.raw_transactions')

raw_bank_transaction_df.show()

25/06/26 12:48:13 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.


+--------------+-----------+-------------------+------+----------------+-------+-------+
|transaction_id|customer_id|      txn_timestamp|amount|transaction_type| status|channel|
+--------------+-----------+-------------------+------+----------------+-------+-------+
|             1|       1082|2025-05-30 22:50:12| 200.2|      withdrawal|success|    atm|
|             4|       1072|2025-06-06 10:26:46|278.95|      withdrawal|success|    atm|
|             5|       1036|2025-06-05 09:11:09|828.49|         deposit|success|    atm|
|            12|       1047|2025-05-27 02:09:22|246.39|      withdrawal|success|    atm|
|            16|       1099|2025-06-05 20:11:46| 798.4|         deposit|success|    atm|
|            19|       1083|2025-05-26 20:02:35|512.97|      withdrawal|success|    atm|
|            22|       1097|2025-06-21 00:01:46| 142.4|         deposit|success|    atm|
|            26|       1038|2025-06-23 03:40:15|491.29|      withdrawal| failed|    atm|
|            28|     

In [21]:
raw_bank_transaction_df.printSchema()

root
 |-- transaction_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- txn_timestamp: timestamp (nullable = true)
 |-- amount: double (nullable = true)
 |-- transaction_type: string (nullable = true)
 |-- status: string (nullable = true)
 |-- channel: string (nullable = true)



In [22]:
bank_transaction_df = spark \
                        .read \
                        .table('banking_shamithna75gedu.transactions')

bank_transaction_df.show()

+--------------+-----------+-------------------+------+----------------+-------+-------+--------+---------------+-------+
|transaction_id|customer_id|      txn_timestamp|amount|transaction_type| status|txn_day|txn_hour|high_value_flag|channel|
+--------------+-----------+-------------------+------+----------------+-------+-------+--------+---------------+-------+
|             1|       1082|2025-05-30 22:50:12| 200.2|      withdrawal|success|     30|      22|              0|    atm|
|             4|       1072|2025-06-06 10:26:46|278.95|      withdrawal|success|      6|      10|              0|    atm|
|             5|       1036|2025-06-05 09:11:09|828.49|         deposit|success|      5|       9|              1|    atm|
|            12|       1047|2025-05-27 02:09:22|246.39|      withdrawal|success|     27|       2|              0|    atm|
|            16|       1099|2025-06-05 20:11:46| 798.4|         deposit|success|      5|      20|              1|    atm|
|            19|       1

In [23]:
bank_transaction_df.printSchema()

root
 |-- transaction_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- txn_timestamp: timestamp (nullable = true)
 |-- amount: double (nullable = true)
 |-- transaction_type: string (nullable = true)
 |-- status: string (nullable = true)
 |-- txn_day: integer (nullable = true)
 |-- txn_hour: integer (nullable = true)
 |-- high_value_flag: integer (nullable = true)
 |-- channel: string (nullable = true)



> Store all final results in Hive tables and share sample query outputs

### Customer Behavior Insights

- Top 5 customers by number of transactions

In [24]:
top_five_customers = bank_transaction_df \
                        .groupBy(col('customer_id')) \
                        .agg(count(col('transaction_id')).alias('no_of_transactions')) \
                        .orderBy(col('no_of_transactions').desc()) \
                        .limit(5)

top_five_customers \
        .write \
        .mode('overwrite') \
        .format('parquet') \
        .option('path', '/user/shamithna75gedu/banking/parquet/top_five_customers/') \
        .saveAsTable('banking_shamithna75gedu.top_five_customers')

top_five_customers.show()

                                                                                

+-----------+------------------+
|customer_id|no_of_transactions|
+-----------+------------------+
|       1032|                42|
|       1041|                38|
|       1014|                38|
|       1084|                37|
|       1052|                37|
+-----------+------------------+



- Customer with the highest total withdrawal amount

In [25]:
customer_with_highest_total_withdrawal = bank_transaction_df \
                                                .where(col('transaction_type') == 'withdrawal') \
                                                .groupBy(col('customer_id')) \
                                                .agg(sum(col('amount')).alias('total_withdrawal_amount')) \
                                                .orderBy(col('total_withdrawal_amount').desc()) \
                                                .limit(1)
customer_with_highest_total_withdrawal \
            .write \
            .mode('overwrite') \
            .format('parquet') \
            .option('path', '/user/shamithna75gedu/banking/parquet/customer_with_highest_total_withdrawal/') \
            .saveAsTable('banking_shamithna75gedu.customer_with_highest_total_withdrawal')

customer_with_highest_total_withdrawal.show()

                                                                                

+-----------+-----------------------+
|customer_id|total_withdrawal_amount|
+-----------+-----------------------+
|       1032|               13811.57|
+-----------+-----------------------+



- Monthly transaction volume per customer

In [26]:
monthly_txns_volume = bank_transaction_df \
                        .withColumn('txn_month', month(col('txn_timestamp'))) \
                        .groupBy(col('customer_id'), col('txn_month')) \
                        .agg(
                            count(col('transaction_id')).alias('total_no_of_transactions'), 
                            round(sum(col('amount')), 2).alias('total_amount_transfered'))

monthly_txns_volume \
            .write \
            .mode('overwrite') \
            .format('parquet') \
            .option('path', '/user/shamithna75gedu/banking/parquet/monthly_txns_volume/') \
            .saveAsTable('banking_shamithna75gedu.monthly_txns_volume')

monthly_txns_volume.show()

                                                                                

+-----------+---------+------------------------+-----------------------+
|customer_id|txn_month|total_no_of_transactions|total_amount_transfered|
+-----------+---------+------------------------+-----------------------+
|       1070|        5|                       5|                3100.01|
|       1089|        6|                      22|                13063.4|
|       1031|        5|                       5|                3417.88|
|       1090|        5|                       6|                3605.19|
|       1016|        5|                       3|                2567.28|
|       1024|        5|                       6|                3010.31|
|       1064|        5|                       4|                1641.61|
|       1031|        6|                      21|               11986.97|
|       1062|        5|                       7|                5204.74|
|       1013|        6|                      22|               13408.37|
|       1061|        5|                       4|   

### Channel & Platform Analytics

- Average transaction amount by `channel`

In [27]:
avg_txn_amt_by_channel = bank_transaction_df \
                            .groupBy(col('channel')) \
                            .agg(round(avg(col('amount')), 2).alias('avg_amount'))

avg_txn_amt_by_channel \
            .write \
            .mode('overwrite') \
            .format('parquet') \
            .option('path', '/user/shamithna75gedu/banking/parquet/avg_txn_amt_by_channel/') \
            .saveAsTable('banking_shamithna75gedu.avg_txn_amt_by_channel')

avg_txn_amt_by_channel.show()

                                                                                

+-------+----------+
|channel|avg_amount|
+-------+----------+
| online|    550.99|
|    atm|    543.22|
| branch|    563.81|
+-------+----------+



- Success rate of transactions by `channel`

In [28]:
success_rate_by_channel = raw_bank_transaction_df \
                            .withColumn('success_flag', when(col('status') == 'success', 1).otherwise(0)) \
                            .groupBy(col('channel')) \
                            .agg(round(sum(col('success_flag')) * 100 / count(col('transaction_id')), 2).alias('success_rate'))

success_rate_by_channel \
            .write \
            .mode('overwrite') \
            .format('parquet') \
            .option('path', '/user/shamithna75gedu/banking/parquet/success_rate_by_channel/') \
            .saveAsTable('banking_shamithna75gedu.success_rate_by_channel')

success_rate_by_channel.show()

                                                                                

+-------+------------+
|channel|success_rate|
+-------+------------+
| online|       90.35|
|    atm|       90.66|
| branch|       90.43|
+-------+------------+



### Time-Based Analytics

- Hourly transaction trend (peak hours)

In [29]:
hourly_txn_trend = bank_transaction_df \
                        .groupBy(col('txn_hour')) \
                        .agg(
                            count(col('transaction_id')).alias('no_of_transaction'),
                            round(sum(col('amount')), 2).alias('total_amount_transfered')) \
                        .orderBy(col('txn_hour').desc()) \
                        .limit(10)

hourly_txn_trend \
        .write \
        .mode('overwrite') \
        .format('parquet') \
        .option('path', '/user/shamithna75gedu/banking/parquet/hourly_txn_trend/') \
        .saveAsTable('banking_shamithna75gedu.hourly_txn_trend')

hourly_txn_trend.show()



+--------+-----------------+-----------------------+
|txn_hour|no_of_transaction|total_amount_transfered|
+--------+-----------------+-----------------------+
|      23|              116|               64075.05|
|      22|              119|               65337.85|
|      21|              105|               60847.12|
|      20|              101|               56580.27|
|      19|              123|               63337.75|
|      18|              102|               56143.26|
|      17|              109|               61653.74|
|      16|               97|               53444.58|
|      15|               98|               52827.33|
|      14|              101|               56519.29|
+--------+-----------------+-----------------------+



                                                                                

- Day-wise total transaction volume and amount

In [30]:
day_wise_txn_volume = bank_transaction_df \
                            .groupBy(col('txn_day')) \
                            .agg(
                                count(col('transaction_id')).alias('no_of_transaction'),
                                round(sum(col('amount')), 2).alias('total_amount_transfered')) \
                            .orderBy(col('txn_day'))

day_wise_txn_volume \
        .write \
        .mode('overwrite') \
        .format('parquet') \
        .option('path', '/user/shamithna75gedu/banking/parquet/day_wise_txn_volume/') \
        .saveAsTable('banking_shamithna75gedu.day_wise_txn_volume')

day_wise_txn_volume.show()



+-------+-----------------+-----------------------+
|txn_day|no_of_transaction|total_amount_transfered|
+-------+-----------------+-----------------------+
|      1|               88|               48719.05|
|      2|               75|               39444.05|
|      3|               78|               43144.49|
|      4|               69|               35842.83|
|      5|               87|               47592.81|
|      6|              102|               59603.52|
|      7|               87|               45472.08|
|      8|               94|               53767.77|
|      9|               97|               49049.63|
|     10|               92|               47463.42|
|     11|               78|               44055.86|
|     12|               99|               56518.86|
|     13|               83|               50373.61|
|     14|               94|               54303.43|
|     15|               88|               51513.83|
|     16|              101|               55754.42|
|     17|   

                                                                                

### Fraud Pattern Indicators

- Number of high-value transactions (> ₹500) per customer per day

In [31]:
high_value_txn_per_day = bank_transaction_df \
                            .where(col('amount') > 500) \
                            .groupBy(col('customer_id'), col('txn_day')) \
                            .agg(count(col('transaction_id')).alias('no_of_transaction'))

high_value_txn_per_day \
        .write \
        .mode('overwrite') \
        .format('parquet') \
        .option('path', '/user/shamithna75gedu/banking/parquet/high_value_txn_per_day/') \
        .saveAsTable('banking_shamithna75gedu.high_value_txn_per_day')


high_value_txn_per_day.show()

                                                                                

+-----------+-------+-----------------+
|customer_id|txn_day|no_of_transaction|
+-----------+-------+-----------------+
|       1063|      2|                1|
|       1062|     27|                2|
|       1070|      5|                1|
|       1037|     22|                1|
|       1088|     31|                1|
|       1048|     12|                1|
|       1049|     18|                1|
|       1064|      2|                1|
|       1027|      8|                1|
|       1065|      2|                1|
|       1033|      1|                1|
|       1019|     27|                1|
|       1038|      3|                1|
|       1039|      4|                1|
|       1099|      3|                1|
|       1060|     27|                1|
|       1072|     14|                1|
|       1090|     19|                2|
|       1041|     21|                1|
|       1085|     25|                1|
+-----------+-------+-----------------+
only showing top 20 rows



- Customers with more than 3 failed transactions in a day

In [32]:
customers_with_more_failed_txn = raw_bank_transaction_df \
                                    .where(col('status') == 'failed') \
                                    .withColumn('txn_day', dayofmonth(col('txn_timestamp'))) \
                                    .groupBy(col('customer_id'), col('txn_day')) \
                                    .agg(count(col('transaction_id')).alias('no_of_failed_txn')) \
                                    .where(col('no_of_failed_txn') > 3) \
                                    .select(col('customer_id'))

customers_with_more_failed_txn \
        .write \
        .mode('overwrite') \
        .format('parquet') \
        .option('path', '/user/shamithna75gedu/banking/parquet/customers_with_more_failed_txn/') \
        .saveAsTable('banking_shamithna75gedu.customers_with_more_failed_txn')

customers_with_more_failed_txn.show()

                                                                                

+-----------+
|customer_id|
+-----------+
+-----------+



- Customers who used all three channels (ATM, Online, Branch) in the last 7 days

In [33]:
no_of_unqiue_channel = bank_transaction_df \
                            .select(col('channel')) \
                            .distinct() \
                            .count()

customers_used_all_channels_in_week = bank_transaction_df \
                                            .groupBy(window(col('txn_timestamp'), '7 day'), col('customer_id')) \
                                            .agg(collect_set(col('channel')).alias('channel_set')) \
                                            .where(size(col('channel_set')) == no_of_unqiue_channel) \
                                            .select(col('customer_id'))

customers_used_all_channels_in_week \
        .write \
        .mode('overwrite') \
        .format('parquet') \
        .option('path', '/user/shamithna75gedu/banking/parquet/customers_used_all_channels_in_week/') \
        .saveAsTable('banking_shamithna75gedu.customers_used_all_channels_in_week')

customers_used_all_channels_in_week.show()

                                                                                

+-----------+
|customer_id|
+-----------+
|       1063|
|       1072|
|       1083|
|       1066|
|       1082|
|       1014|
|       1087|
|       1092|
|       1087|
|       1029|
|       1013|
|       1045|
|       1052|
|       1093|
|       1010|
|       1038|
|       1019|
|       1071|
|       1084|
|       1097|
+-----------+
only showing top 20 rows



In [34]:
spark.stop()

---

## Real-Time Fraud Detection Pipeline

- Simulate real-time transactions using Kafka producer (send events from the CSV file).
- Develop a Spark Streaming (PySpark) application to:
    - Consume Kafka stream
    - Parse and validate data
    - Apply real-time fraud logic:
- **If a customer performs more than two withdrawals over ₹400 within a 1-minute window, flag the behavior as suspicious.**
- Output suspicious transaction records as alerts.

In [35]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_csv, window, collect_list, struct, explode, lit
from pyspark.sql.window import *
import happybase

In [36]:
spark = SparkSession \
            .builder \
            .master('local[5]') \
            .appName('real-time-fraud-detection') \
            .getOrCreate()

25/06/26 12:49:22 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/06/26 12:49:22 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
25/06/26 12:49:22 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
25/06/26 12:49:22 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.
25/06/26 12:49:22 WARN Utils: Service 'SparkUI' could not bind on port 4044. Attempting port 4045.
25/06/26 12:49:22 WARN Utils: Service 'SparkUI' could not bind on port 4045. Attempting port 4046.
25/06/26 12:49:22 WARN Utils: Service 'SparkUI' could not bind on port 4046. Attempting port 4047.
25/06/26 12:49:22 WARN Utils: Service 'SparkUI' could not bind on port 4047. Attempting port 4048.


In [37]:
bank_txn_schema = \
'''
transaction_id INT NOT NULL,
customer_id INT,
txn_timestamp TIMESTAMP NOT NULL,
amount DOUBLE,
transaction_type STRING,
status STRING,
txn_day INT NOT NULL,
txn_hour INT NOT NULL,
channel STRING
'''

print(bank_txn_schema)


transaction_id INT NOT NULL,
customer_id INT,
txn_timestamp TIMESTAMP NOT NULL,
amount DOUBLE,
transaction_type STRING,
status STRING,
txn_day INT NOT NULL,
txn_hour INT NOT NULL,
channel STRING



In [38]:
bank_txn_stream_df = spark \
                        .readStream \
                        .format('kafka') \
                        .option('kafka.bootstrap.servers', 'master:9092') \
                        .option('subscribe', 'bank_txn_shamithna75gedu') \
                        .option('checkpointLocation', 'chk_bank_txn_shamithna75gedu') \
                        .option('startingOffsets', 'latest') \
                        .load()

bank_txn_stream_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [39]:
bank_txn_stream_df = bank_txn_stream_df \
                        .select(
                            from_csv(
                                col('value').cast('string'),
                                bank_txn_schema) \
                            .alias('values')) \
                        .select('values.*')

bank_txn_stream_df.printSchema()

root
 |-- transaction_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- txn_timestamp: timestamp (nullable = true)
 |-- amount: double (nullable = true)
 |-- transaction_type: string (nullable = true)
 |-- status: string (nullable = true)
 |-- txn_day: integer (nullable = true)
 |-- txn_hour: integer (nullable = true)
 |-- channel: string (nullable = true)



In [40]:
suspicious_txn_stream = bank_txn_stream_df \
                            .where(col('transaction_type') == 'withdrawal') \
                            .where(col('amount') > 400)

```bash
create_namespace 'banking_shamithna75gedu'

create 'banking_shamithna75gedu:suspicious_transaction', {NAME => 'info', VERSIONS => 4}
```

In [46]:
def write_into_hbase(batch_df, batch_id):
    required_attribute_names = [
        'transaction_count',
        'transaction_id',
        'customer_id',
        'txn_timestamp',
        'amount',
        'channel'
    ]

    windowed_df = batch_df \
                    .groupBy(
                        window(col('txn_timestamp'), '1 minute'),
                        col('customer_id')) \
                    .agg(count('transaction_id').alias('transaction_count')) \
                    .where(col('transaction_count') > 2)

    res_df = windowed_df \
                .join(batch_df, on='customer_id', how='inner') \
                .where(col('txn_timestamp') >= col('window.start')) \
                .where(col('txn_timestamp') <= col('window.end')) \
                .withColumn('reason', lit('Withdrew amount more than 400 INR multiple times within small timeframe')) \
                .select(*required_attribute_names)

    res_df.show()

    data = res_df.collect()

    connection = happybase.Connection('master')  
    table = connection.table('banking_shamithna75gedu.suspicious_transaction')

    for row in data:
        row_key = f"{row['customer_id']}_{row['transaction_id']}"
        record_data = { 
            str(f'info:{column}').encode() : str(row[column]).encode()
            for column in required_attribute_names 
        }
        table.put(row_key, record_data)

    connection.close()

```bash
hdfs dfs -cat '/user/shamithna75gedu/banking/raw/banking_transactions.csv' | tail +2 | \
kafka-console-producer.sh \
    --bootstrap-server master:9092 \
    --topic bank_txn_shamithna75gedu
```

In [None]:
suspicious_txn_stream \
    .writeStream \
    .format('console') \
    .foreachBatch(write_into_hbase) \
    .start() \
    .awaitTermination()

25/06/26 13:04:53 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-c021e042-7a21-43ea-b873-133d80a0f7ff. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
                                                                                

+-----------------+--------------+-----------+-------------+------+-------+
|transaction_count|transaction_id|customer_id|txn_timestamp|amount|channel|
+-----------------+--------------+-----------+-------------+------+-------+
+-----------------+--------------+-----------+-------------+------+-------+



                                                                                

+-----------------+--------------+-----------+-------------------+------+-------+
|transaction_count|transaction_id|customer_id|      txn_timestamp|amount|channel|
+-----------------+--------------+-----------+-------------------+------+-------+
|                3|             1|       1082|2025-05-30 22:50:12| 400.2|     22|
|                3|             6|       1082|2025-05-30 22:50:12| 419.2|     22|
|                3|            10|       1082|2025-05-30 22:50:12| 419.2|     22|
+-----------------+--------------+-----------+-------------------+------+-------+



                                                                                

```bash
echo "echo 'scan banking_shamithna75gedu.suspicious_transaction'" | hbase shell
```

---

### [RETRACTED]

```python
suspicious_txn_stream = suspicious_txn_stream \
                            .withWatermark('txn_timestamp', '1 minute') \
                            .groupBy(
                                window(col('txn_timestamp'), '1 minute', '1 second').alias('window'),
                                col('customer_id')) \
                            .agg(
                                collect_list(struct('*')).alias('transactions'),
                                count('transaction_id').alias('transaction_count')) \
                            .where(col("transaction_count") > 2)

suspicious_txn_stream.printSchema()
```

```python
suspicious_txn_stream = suspicious_txn_stream \
                        .withColumn('txn_cols', explode(col('transactions'))) \
                        .select(
                            col('transaction_count'),
                            col('txn_cols.*'))

suspicious_txn_stream.printSchema()
```

```python
suspicious_txn_stream = suspicious_txn_stream \
                            .select(
                                col('transaction_id'),
                                col('customer_id'),
                                col('txn_timestamp'),
                                col('amount'),
                                col('channel'),
                                col('transaction_count'))

suspicious_txn_stream.printSchema()
```

```bash
create_namespace 'banking_shamithna75gedu'

create 'banking_shamithna75gedu:suspicious_transaction', {NAME => 'info', VERSIONS => 4}
```

```python
def write_into_hbase(rows, batch):
    connection = happybase.Connection('master')  
    table = connection.table('banking_shamithna75gedu.suspicious_transaction')
    
    attribute_names = ['transaction_id', 'customer_id', 'txn_timestamp', 'amount', 'channel']
    
    data = rows.collect()
    
    for row in data:
        row_key = f"{row['customer_id']}_{row['transaction_id']}"
        record_data = { f'info:{column}'.encode() : str(row[column]).encode() for column in attribute_names }
        record_data['reason'] = str(f"{row['customer_id']} has withdrawn {row['transaction_count']} times under 1 minute of amount more than 400 rupees (Rs. {row['amount']}).").encode()
        table.put(row_key, record_data)

    connection.close()
```

```bash
while IFS= read -r line;do echo $line;sleep 5;done <$(tail +2 /home/shamithna75gedu/capstone-project/bank-transaction/dataset/banking_transaction.csv) | kafka-console-producer.sh --bootstrap-server master:9092 --topic bank_txn_shamithna75gedu
```

```python
suspicious_txn_stream \
    .writeStream \
    .format('console') \
    .foreachBatch(write_into_hbase) \
    .start() \
    .awaitTermination()
```