In [1]:
# Before running this code, ensure you have successfully completed:
# 1. findspark.init("/opt/homebrew/Cellar/apache-spark/4.0.0/libexec")
# 2. Created the SparkSession (spark = SparkSession.builder...)
# -----------------------------------------------------------

import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, sum, avg, count, row_number, round, dayofmonth, min, max, current_date, datediff, upper, to_date
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
from pyspark.sql.window import Window

# --- SETUP: INITIALIZE SPARK SESSION (Copy these from your working setup) ---
# NOTE: Replace 'YOUR_SPARK_HOME' with the confirmed path that worked for you.
try:
    import findspark
    # This path is the one that successfully connected your environment:
    findspark.init("/opt/homebrew/Cellar/apache-spark/4.0.0/libexec") 
except Exception as e:
    print(f"Error initializing findspark. Ensure the path is correct: {e}")



### PySpark Bank Data Mining - Implementation Requirements

#### ðŸ“‹ Project Overview

Write a Spark job to extract information from banking data, which will mine information from the files `accounts.csv` and `transactions.csv` and perform data manipulation on the same.

---
#### data
account.csv schema `accountNumber`, `balance`

transactions.csv schema  `fromAccountNumber`, `toAccountNumber`, `transferAmount`

#### Accounts-Transactions Relationship
One account could have multiple transactions. A valid transaction is the transaction from a valid account number in `accounts.csv`.


### ðŸŽ¯ Implementation Tasks

#### Task 1: `init_spark_session(self)` â†’ `SparkSession`

**Requirements:**
- Create a spark session with master `local` and name `Banking Data Mining`
- Return the SparkSession object



In [2]:
spark = SparkSession.builder \
                    .appName("Banking Data Minning") \
                    .master("local[*]") \
                    .getOrCreate()


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/10 21:32:09 WARN Utils: Your hostname, Azads-Mac-mini.local, resolves to a loopback address: 127.0.0.1; using 192.168.4.31 instead (on interface en1)
25/12/10 21:32:09 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/10 21:32:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


#### Task 2: `extract_valid_transactions(self, accounts: DataFrame, transactions: DataFrame)` â†’ `DataFrame`

**Requirements:**
- Transaction is valid if:
  1. `transferAmount` is less than or equals to `balance`
  2. The `toAccountNumber` exists in `accountsDf`
- Return the filtered `transactionDf`


In [6]:
transactionDF = spark.read.csv("source_data/transactions.csv", header=True, inferSchema=True)
accountsDF = spark.read.csv("source_data/accounts.csv", header=True, inferSchema=True)

+-------------+-------+
|accountNumber|balance|
+-------------+-------+
|         a844|   3124|
|         a331|   3812|
|         a887|   2773|
|          a24|   3818|
|         a569|   4359|
|         a548|   2342|
|         a596|   4621|
|         a784|   4298|
|         a859|   3587|
|         a452|   2976|
|         a469|   4829|
|         a126|   2978|
|         a887|   2484|
|         a688|   3285|
|         a652|   2834|
|         a935|   2579|
|         a854|   3616|
|          a92|   2449|
|         a268|   2299|
|         a185|   4685|
+-------------+-------+
only showing top 20 rows


In [10]:
joined = transactionDF.join(accountsDF, transactionDF["fromAccountNumber"]==accountsDF["accountNumber"], how="inner")
valid_transactions_df = joined.filter(col("transferAmount") <= col("balance"))


+-----------------+---------------+--------------+-------------+-------+
|fromAccountNumber|toAccountNumber|transferAmount|accountNumber|balance|
+-----------------+---------------+--------------+-------------+-------+
|             a279|           a812|          3655|         a279|   3655|
|             a425|           a795|          2439|         a425|   2439|
|              a48|           a967|          4909|          a48|   4909|
|             a745|           a251|          4333|         a745|   4333|
|             a278|           a993|          4399|         a278|   4399|
|             a122|            a42|          4218|         a122|   4218|
|             a949|           a289|          3377|         a949|   3377|
|             a416|           a135|          2662|         a416|   2662|
|             a948|           a334|          4997|         a948|   4997|
|             a355|           a486|          4519|         a355|   4519|
|             a226|           a518|          2920| 


#### Task 3: `distinct_transactions(self, transactions: DataFrame)` â†’ `int`

**Requirements:**
- Return the count of total distinct transactions based on `fromAccountNumber`


In [11]:
valid_transactions_df.select("fromAccountNumber").distinct().count()

37

In [None]:
result = (filtered_transactionsDF
        .groupBy("fromAccountNumber")
        .count()
        .orderBy(col("count").desc())
        .limit(10)
        .rdd
        .collectAsMap())
result

#### Task 4: `transactions_per_account(self, transactions: DataFrame)` â†’ `dict`

**Requirements:**
- Find the count of transactions per `fromAccountNumber`
- Return top 10, `fromAccountNumber` and corresponding count as a dictionary

In [13]:
result = valid_transactions_df.groupBy("fromAccountNumber").count().orderBy(col("count").desc()).limit(10).rdd.collectAsMap()
result

{'a226': 2,
 'a92': 2,
 'a278': 2,
 'a452': 2,
 'a949': 2,
 'a688': 2,
 'a627': 2,
 'a994': 2,
 'a948': 2,
 'a575': 2}

In [14]:
spark.stop()