<a href="https://colab.research.google.com/github/Rohit-Saswadkar/Data-Manipulation-using-Pyspark/blob/main/Banking_data_Transforming_and_Analysis_using_Pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#**Let's get start**

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


**Install pyspark**


In [2]:
!pip install pyspark
!pip install openpyxl

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488493 sha256=1556bd4704bfd5fab48824fa44579024a5d709dd34f1a576baf72578dfcc5c42
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [3]:
# set the spark session
from pyspark.sql import SparkSession

sp = SparkSession.builder \
    .appName("Banking_Analysis_using_Pyspark") \
    .getOrCreate()

In [4]:
# Configure the SparkSession
sp.conf.set("spark.sql.shuffle.partitions", "50")

In [5]:
# Configure the logging settings
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


##**Loading Dataset**

In [6]:
# load df paths
path1 = '/content/drive/MyDrive/Py Spark/Banking Project/BankingDataset_1.csv'
path2 = '/content/drive/MyDrive/Py Spark/Banking Project/BankingDataset_2.csv'

In [7]:
df1 = sp.read.csv(path1, header=True, inferSchema=True, sep=",", ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True)
df2 = sp.read.csv(path2, header=True, inferSchema=True, sep=",", ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True)

In [8]:
# check schema of the df's
df1.printSchema()
df2.printSchema()

root
 |-- TransactionID: integer (nullable = true)
 |-- AccountNumber: integer (nullable = true)
 |-- TransactionType: string (nullable = true)
 |-- Amount: double (nullable = true)
 |-- TransactionDate: timestamp (nullable = true)
 |-- BranchCode: integer (nullable = true)
 |-- Currency: string (nullable = true)
 |-- TransactionTime: integer (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)

root
 |-- AccountNumber: integer (nullable = true)
 |-- AccountHolder: string (nullable = true)
 |-- AccountType: string (nullable = true)
 |-- Balance: double (nullable = true)
 |-- InterestRate: double (nullable = true)
 |-- CreditScore: integer (nullable = true)
 |-- OpeningDate: string (nullable = true)
 |-- LoanAmount: double (nullable = true)
 |-- AccountHolderDetails: string (nullable = true)



In [9]:
# drop unnecessary columns
columns_to_drop = ['_c9', '_c10', '_c11', '_c12', '_c13', '_c14']
df2 = df2.drop(*columns_to_drop)

In [10]:
columns_to_drop = ['_C8','_C9']

df1 = df1.drop(* columns_to_drop)

##**Initial data exploration**

In [11]:
df1.describe().show() # check the data
df2.describe().show()

+-------+-----------------+------------------+---------------+------------------+------------------+--------+-----------------+
|summary|    TransactionID|     AccountNumber|TransactionType|            Amount|        BranchCode|Currency|  TransactionTime|
+-------+-----------------+------------------+---------------+------------------+------------------+--------+-----------------+
|  count|             1000|              1000|            940|               940|              1000|    1000|             1000|
|   mean|            500.5|        560160.735|           NULL| 5063.896222674368|           253.436|  -999.0|           11.863|
| stddev|288.8194360957494|260670.28320861366|           NULL|2847.0992164759687|144.11454092111913|     0.0|6.515726029053529|
|    min|                1|            100268|        Deposit|        138.777986|                 1|    *^$$|                1|
|    max|             1000|            996071|     Withdrawal|        9987.02423|               499|    

In [12]:
df1.dtypes # data types of columns

[('TransactionID', 'int'),
 ('AccountNumber', 'int'),
 ('TransactionType', 'string'),
 ('Amount', 'double'),
 ('TransactionDate', 'timestamp'),
 ('BranchCode', 'int'),
 ('Currency', 'string'),
 ('TransactionTime', 'int')]

In [13]:
df2.dtypes

[('AccountNumber', 'int'),
 ('AccountHolder', 'string'),
 ('AccountType', 'string'),
 ('Balance', 'double'),
 ('InterestRate', 'double'),
 ('CreditScore', 'int'),
 ('OpeningDate', 'string'),
 ('LoanAmount', 'double'),
 ('AccountHolderDetails', 'string')]

In [14]:
df1 = df1.dropDuplicates() # drop duplicates to enhance performance
df2 = df2.dropDuplicates()

In [15]:
# check unique counts of each column of DF1
unique_counts = {}
for col_name in df1.columns:
    unique_count = df1.select(col_name).distinct().count()
    unique_counts[col_name] = unique_count

# Display the unique counts
for col_name, unique_count in unique_counts.items():
    print(f"Column {col_name} has {unique_count} unique values")


Column TransactionID has 1000 unique values
Column AccountNumber has 558 unique values
Column TransactionType has 5 unique values
Column Amount has 941 unique values
Column TransactionDate has 1000 unique values
Column BranchCode has 431 unique values
Column Currency has 7 unique values
Column TransactionTime has 23 unique values


In [16]:
# check unique counts of each column of DF2
unique_counts = {}
for col_name in df2.columns:
    unique_count = df2.select(col_name).distinct().count()
    unique_counts[col_name] = unique_count

# Display the unique counts
for col_name, unique_count in unique_counts.items():
    print(f"Column {col_name} has {unique_count} unique values")


Column AccountNumber has 689 unique values
Column AccountHolder has 336 unique values
Column AccountType has 4 unique values
Column Balance has 689 unique values
Column InterestRate has 689 unique values
Column CreditScore has 389 unique values
Column OpeningDate has 689 unique values
Column LoanAmount has 689 unique values
Column AccountHolderDetails has 377 unique values


##**Data Cleaning and Preprocessing**

###**Check ulll values in columns**

In [45]:
from pyspark.sql.functions import col, when, count, isnull

# Show columns with missing values for df1
df1.select([count(when(isnull(c), c)).alias(c) for c in df1.columns]).show()

# Show columns with missing values for df2
df2.select([count(when(isnull(c), c)).alias(c) for c in df2.columns]).show()

+-------------+-------------+---------------+------+---------------+----------+--------+---------------+
|TransactionID|AccountNumber|TransactionType|Amount|TransactionDate|BranchCode|Currency|TransactionTime|
+-------------+-------------+---------------+------+---------------+----------+--------+---------------+
|            0|            0|             60|    60|              0|         0|       0|              0|
+-------------+-------------+---------------+------+---------------+----------+--------+---------------+

+-------------+-------------+-----------+-------+------------+-----------+-----------+----------+--------------------+
|AccountNumber|AccountHolder|AccountType|Balance|InterestRate|CreditScore|OpeningDate|LoanAmount|AccountHolderDetails|
+-------------+-------------+-----------+-------+------------+-----------+-----------+----------+--------------------+
|            0|            0|          0|      0|           0|          0|          0|         0|                   0

###**A] Null value imputation**

**1) Identifying Numeric and Categorical Columns in DF! and replace null with them**

In [18]:
# separating the categorical and numerical columns by their datatype
from pyspark.sql.types import IntegerType, DoubleType, StringType

numeric_cols = [field.name for field in df1.schema.fields if isinstance(field.dataType, (IntegerType, DoubleType))]
categorical_cols = [field.name for field in df1.schema.fields if isinstance(field.dataType, StringType)]


In [19]:
# calculating median for numerical columns
numeric_medians = {}
for col in numeric_cols:
    median = df1.approxQuantile(col, [0.5], 0.01)[0]
    numeric_medians[col] = median


In [20]:
# calculating mode for categorical columns
from pyspark.sql.functions import col, count, desc

categorical_modes = {}
for col_name in categorical_cols:
    mode_df = df1.groupBy(col_name).count().orderBy(desc("count")).first()
    if mode_df:
        categorical_modes[col_name] = mode_df[col_name]


In [21]:
# replaced null with median and mode repectively
df1_filled = df1.fillna(numeric_medians)
df1_filled = df1_filled.fillna(categorical_modes)


In [22]:
# Show columns with missing values for df1
df1_filled.select([count(when(isnull(c), c)).alias(c) for c in df1.columns]).show()

+-------------+-------------+---------------+------+---------------+----------+--------+---------------+
|TransactionID|AccountNumber|TransactionType|Amount|TransactionDate|BranchCode|Currency|TransactionTime|
+-------------+-------------+---------------+------+---------------+----------+--------+---------------+
|            0|            0|              0|     0|              0|         0|       0|              0|
+-------------+-------------+---------------+------+---------------+----------+--------+---------------+



**B] Creating new Df by merging df1 and df2 using AccountNumber**

In [23]:
df = df1_filled.join( df2, on='AccountNumber', how='inner')

In [24]:
df.show()

+-------------+-------------+---------------+-----------+-------------------+----------+--------+---------------+-----------------+-----------+-----------+------------+-----------+---------------+-----------+--------------------+
|AccountNumber|TransactionID|TransactionType|     Amount|    TransactionDate|BranchCode|Currency|TransactionTime|    AccountHolder|AccountType|    Balance|InterestRate|CreditScore|    OpeningDate| LoanAmount|AccountHolderDetails|
+-------------+-------------+---------------+-----------+-------------------+----------+--------+---------------+-----------------+-----------+-----------+------------+-----------+---------------+-----------+--------------------+
|       334892|           42|     Withdrawal| 5037.65665|2023-02-11 00:00:00|       342|     USD|              9|Patricia Gonzalez|       Loan|25577.87469|  2.00892616|        847| 2/11/2020 0:00|39920.51583|Sector: Finance, ...|
|       334892|          595|       Transfer|6418.314831|2024-08-17 00:00:00|   

**C] Outliers checking**

In [25]:
from pyspark.sql.functions import mean, stddev, col

# Calculate mean and standard deviation for the 'Amount' column
stats = df.select(mean('Amount').alias('mean'), stddev('Amount').alias('stddev')).first()
mean_value = stats['mean']
stddev_value = stats['stddev']

# Define the filter condition for outliers (values outside the range of mean ± 3 * stddev)
outlier_condition = (col('Amount') < mean_value - 3 * stddev_value) | (col('Amount') > mean_value + 3 * stddev_value)

# Count the number of outliers
outliers_count = df.filter(outlier_condition).count()

print(f"Number of outliers: {outliers_count}")


Number of outliers: 0


**D] Extract Year , Month and Day from Transaction date**

In [26]:
from pyspark.sql.functions import year, month, dayofmonth, hour
df = df.withColumn('TransactionYear', year(col('TransactionDate')))
df = df.withColumn('TransactionMonth', month(col('TransactionDate')))
df = df.withColumn('TransactionDay', dayofmonth(col('TransactionDate')))


##**Exploratory Data analysis**

In [27]:
# get stats
df.describe().show()


+-------+------------------+------------------+---------------+------------------+------------------+--------+-----------------+----------------+-----------+------------------+------------------+------------------+-------------+------------------+--------------------+------------------+------------------+------------------+
|summary|     AccountNumber|     TransactionID|TransactionType|            Amount|        BranchCode|Currency|  TransactionTime|   AccountHolder|AccountType|           Balance|      InterestRate|       CreditScore|  OpeningDate|        LoanAmount|AccountHolderDetails|   TransactionYear|  TransactionMonth|    TransactionDay|
+-------+------------------+------------------+---------------+------------------+------------------+--------+-----------------+----------------+-----------+------------------+------------------+------------------+-------------+------------------+--------------------+------------------+------------------+------------------+
|  count|             

**1] Check most frequent transaction type**

In [34]:
df.groupBy('TransactionType').count().orderBy(col('count').desc()).show()

+---------------+-----+
|TransactionType|count|
+---------------+-----+
|        Payment|  277|
|       Transfer|  207|
|        Deposit|  197|
|     Withdrawal|  190|
+---------------+-----+



**2] Analysing Total Amount and Avg balance of each account**

In [44]:
from pyspark.sql.functions import sum as Fsum, count as Fcount, mean as Fmean, median as Fmedian, round as Fround, desc as Fdesc, asc as Fasc

df_aggregated = df.groupBy('AccountNumber').agg(
    Fround(Fsum('Amount'),2).alias('TotalAmount'),
    Fround(Fmean('Balance'),2).alias('AverageBalance')
).orderBy(col('TotalAmount').desc())

df_aggregated.show()

+-------------+-----------+--------------+
|AccountNumber|TotalAmount|AverageBalance|
+-------------+-----------+--------------+
|       936946|   19675.18|      46025.21|
|       463702|   19001.19|      16185.74|
|       845733|   18923.71|      15652.27|
|       988691|   18755.95|       4588.23|
|       764728|    18640.2|      46664.95|
|       140388|   18455.01|       26414.3|
|       192126|   18417.82|      44891.35|
|       875740|   18390.96|      29666.73|
|       624735|   18326.77|      18466.39|
|       119466|   18055.77|       8117.24|
|       341941|   18038.01|      18678.81|
|       273833|   18008.43|      32003.43|
|       873344|    17953.0|      27501.74|
|       479868|   17662.21|       9528.76|
|       118749|   17641.94|       4143.09|
|       959316|    17593.1|       13124.1|
|       851388|   17225.94|      15181.64|
|       640162|   16831.73|       18674.9|
|       653358|   16815.23|      45276.78|
|       394174|    16695.1|       9864.19|
+----------

**Insights:**

- **High Total Amount Correlates with Higher Average Balance**: Accounts with higher total amounts generally have higher average balances, indicating a trend where larger transactions are associated with higher average account balances.
- **Notable Exceptions**: Some accounts, like `817886` and `119374`, have high total amounts but significantly lower average balances, suggesting occasional large transactions amidst generally lower average balances.

**3] Analysing Trends of Total Amount and no of transactions by year and montha**

In [39]:
df_trends = df.groupBy('TransactionYear', 'TransactionMonth').agg(
    Fround(Fsum('Amount'),2).alias('TotalAmount'),
    Fcount('TransactionID').alias('TransactionCount')
).orderBy(col('TransactionYear').asc(), col('TransactionMonth').asc())
df_trends.show()

+---------------+----------------+-----------+----------------+
|TransactionYear|TransactionMonth|TotalAmount|TransactionCount|
+---------------+----------------+-----------+----------------+
|           2023|               1|  127000.38|              25|
|           2023|               2|  134818.35|              25|
|           2023|               3|  129389.81|              26|
|           2023|               4|   153976.9|              27|
|           2023|               5|  130079.14|              27|
|           2023|               6|  135687.85|              26|
|           2023|               7|  139689.75|              28|
|           2023|               8|  135001.78|              28|
|           2023|               9|  116264.52|              23|
|           2023|              10|  159495.98|              29|
|           2023|              11|  125990.91|              25|
|           2023|              12|  134070.37|              27|
|           2024|               1|  1462

Insights:

- **High Transaction Amounts in Early 2025**: The total transaction amounts peak notably in early 2025, particularly in April (`163028.91`) and January (`136148.89`), indicating a significant increase in transaction volume during this period.
  
- **Consistent Transaction Activity**: Despite fluctuations in total amounts, transaction counts are relatively stable across most months, with variations typically within a few transactions, suggesting consistent transaction activity across the months.

Summary

The data shows that while transaction amounts vary significantly, especially in early 2025, the number of transactions remains fairly steady, pointing to periods of high transaction activity.

**4] Analysing account holder name by highest avg loan amount as well as their avg credit scores and avg interest rates**

In [43]:
df_account_holders = df.groupby('AccountHolder').agg( Fround(Fmean('LoanAmount'),2).alias('Avg Loan Amount'), Fround(Fmean('CreditScore'),2).alias('Avg Credit score'), Fround(Fmean(
'InterestRate'),2).alias('Avg Interest Rate'), Fround(Fmean('TransactionTime'),2).alias('Avg transaction Time')).orderBy(Fdesc('Avg Loan Amount')).limit(1)
df_account_holders.show()

+-------------+---------------+----------------+-----------------+--------------------+
|AccountHolder|Avg Loan Amount|Avg Credit score|Avg Interest Rate|Avg transaction Time|
+-------------+---------------+----------------+-----------------+--------------------+
|  Sarah Jones|       49898.46|           788.0|             1.07|                17.5|
+-------------+---------------+----------------+-----------------+--------------------+



Insights:

- **High Loan Amounts**: Account holders like Sarah Jones and Linda Garcia have the highest average loan amounts, indicating substantial borrowing.
- **Credit Score Trends**: Higher loan amounts often correlate with higher average credit scores, though some exceptions, like Barbara Miller, show lower credit scores.
- **Interest Rates and Transaction Times**: Interest rates vary widely, with some account holders paying significantly more, while transaction times remain fairly consistent across individuals.

##**Save the Transformed Dataframe**

In [32]:
# CSV format
df.write \
    .format("csv") \
    .option("header", "true") \
    .mode("overwrite") \
    .save("/content/drive/MyDrive/Py Spark/Banking Project/Final Dataframe/Banking_DF.csv")


In [33]:
# Parquet format
df.write \
    .format("parquet") \
    .mode("overwrite") \
    .save("/content/drive/MyDrive/Py Spark/Banking Project/Final Dataframe/Banking_DF.parquet")
