In [0]:
from pyspark.sql import SparkSession

# Initializing SparkSession 
spark = SparkSession.builder \
    .appName("Loan Data Processing") \
    .getOrCreate()

# File path
file_path = "/FileStore/tables/txn.csv"

# Load the CSV file into a PySpark DataFrame
loans_df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(file_path)

# Display the DataFrame to verify
loans_df.show(5)

+-------------+--------------------+----------+----------------+-------------+-----------+
|   Account No| TRANSACTION DETAILS|VALUE DATE| WITHDRAWAL AMT | DEPOSIT AMT |BALANCE AMT|
+-------------+--------------------+----------+----------------+-------------+-----------+
|409000611074'|TRF FROM  Indiafo...| 29-Jun-17|            NULL|    1000000.0|  1000000.0|
|409000611074'|TRF FROM  Indiafo...|  5-Jul-17|            NULL|    1000000.0|  2000000.0|
|409000611074'|FDRL/INTERNAL FUN...| 18-Jul-17|            NULL|     500000.0|  2500000.0|
|409000611074'|TRF FRM  Indiafor...|  1-Aug-17|            NULL|    3000000.0|  5500000.0|
|409000611074'|FDRL/INTERNAL FUN...| 16-Aug-17|            NULL|     500000.0|  6000000.0|
+-------------+--------------------+----------+----------------+-------------+-----------+
only showing top 5 rows



In [0]:
# Finding the maximum withdrawal amount
max_withdrawal = loans_df.agg({" WITHDRAWAL AMT ": "max"}).collect()[0][0]

print(f"Maximum withdrawal amount: {max_withdrawal}")


Maximum withdrawal amount: 459447546.4


In [0]:
max_withdrawal_sql = spark.sql("""
    SELECT CAST(MAX(` WITHDRAWAL AMT `) AS DECIMAL(20, 2)) AS Max_Withdrawal
    FROM transactions
""")
max_withdrawal_sql.show()


+--------------+
|Max_Withdrawal|
+--------------+
|  459447546.40|
+--------------+



In [0]:
min_withdrawal_per_account = loans_df.groupBy("Account No").agg({" WITHDRAWAL AMT ": "min"})

min_withdrawal_per_account = min_withdrawal_per_account.withColumnRenamed("min( WITHDRAWAL AMT )", "Min_Withdrawal")

min_withdrawal_per_account.show()


+-------------+--------------+
|   Account No|Min_Withdrawal|
+-------------+--------------+
|409000438611'|           0.2|
|     1196711'|          0.25|
|     1196428'|          0.25|
|409000493210'|          0.01|
|409000611074'|         120.0|
|409000425051'|          1.25|
|409000405747'|          21.0|
|409000493201'|           2.1|
|409000438620'|          0.34|
|409000362497'|          0.97|
+-------------+--------------+



In [0]:
# Createing SQL view
loans_df.createOrReplaceTempView("transactions")

#minimum withdrawal amount for each account
min_withdrawal_per_account_sql = spark.sql("""
    SELECT `Account No`, MIN(` WITHDRAWAL AMT `) AS Min_Withdrawal
    FROM transactions
    GROUP BY `Account No`
""")
min_withdrawal_per_account_sql.show()


+-------------+--------------+
|   Account No|Min_Withdrawal|
+-------------+--------------+
|409000438611'|           0.2|
|     1196711'|          0.25|
|     1196428'|          0.25|
|409000493210'|          0.01|
|409000611074'|         120.0|
|409000425051'|          1.25|
|409000405747'|          21.0|
|409000493201'|           2.1|
|409000438620'|          0.34|
|409000362497'|          0.97|
+-------------+--------------+



In [0]:
from pyspark.sql.functions import format_number

# Calculate the maximum deposit per account
max_deposit_per_account = loans_df.groupBy("Account No").agg({" DEPOSIT AMT ": "max"})

# Rename the column for readability
max_deposit_per_account = max_deposit_per_account.withColumnRenamed("max( DEPOSIT AMT )", "Max_Deposit")

# Format the Max_Deposit column to 2 decimal places
max_deposit_per_account = max_deposit_per_account.withColumn("Max_Deposit", format_number("Max_Deposit", 2))
    
max_deposit_per_account.show()


+-------------+--------------+
|   Account No|   Max_Deposit|
+-------------+--------------+
|409000438611'|170,250,000.00|
|     1196711'|500,000,000.00|
|     1196428'|211,959,442.20|
|409000493210'| 15,000,000.00|
|409000611074'|  3,000,000.00|
|409000425051'| 15,000,000.00|
|409000405747'|202,100,000.00|
|409000493201'|  1,000,000.00|
|409000438620'|544,800,000.00|
|409000362497'|200,000,000.00|
+-------------+--------------+



In [0]:
#find the maximum deposit amount for each account with formatted result
max_deposit_per_account_sql = spark.sql("""
    SELECT `Account No`, 
           FORMAT_NUMBER(MAX(` DEPOSIT AMT `), 2) AS Max_Deposit
    FROM transactions
    GROUP BY `Account No`
""")
max_deposit_per_account_sql.show()


+-------------+--------------+
|   Account No|   Max_Deposit|
+-------------+--------------+
|409000438611'|170,250,000.00|
|     1196711'|500,000,000.00|
|     1196428'|211,959,442.20|
|409000493210'| 15,000,000.00|
|409000611074'|  3,000,000.00|
|409000425051'| 15,000,000.00|
|409000405747'|202,100,000.00|
|409000493201'|  1,000,000.00|
|409000438620'|544,800,000.00|
|409000362497'|200,000,000.00|
+-------------+--------------+



In [0]:
sum_balance_per_account = loans_df.groupBy("Account No").agg({"BALANCE AMT": "sum"})

sum_balance_per_account = sum_balance_per_account.withColumnRenamed("sum(BALANCE AMT)", "Total_Balance")

sum_balance_per_account.show()


+-------------+--------------------+
|   Account No|       Total_Balance|
+-------------+--------------------+
|409000438611'|-2.49486577068339...|
|     1196711'|-1.60476498101275E13|
|     1196428'| -8.1418498130721E13|
|409000493210'|-3.27584952132095...|
|409000611074'|       1.615533622E9|
|409000425051'|-3.77211841164998...|
|409000405747'|-2.43108047067000...|
|409000493201'|1.0420831829499985E9|
|409000438620'|-7.12291867951358...|
|409000362497'| -5.2860004792808E13|
+-------------+--------------------+



In [0]:
loans_df.createOrReplaceTempView("transactions")

sum_balance_per_account_sql = spark.sql("""
    SELECT `Account No`, SUM(`BALANCE AMT`) AS Total_Balance
    FROM transactions
    GROUP BY `Account No`
""")
sum_balance_per_account_sql.show()


+-------------+--------------------+
|   Account No|       Total_Balance|
+-------------+--------------------+
|409000438611'|-2.49486577068339...|
|     1196711'|-1.60476498101275E13|
|     1196428'| -8.1418498130721E13|
|409000493210'|-3.27584952132095...|
|409000611074'|       1.615533622E9|
|409000425051'|-3.77211841164998...|
|409000405747'|-2.43108047067000...|
|409000493201'|1.0420831829499985E9|
|409000438620'|-7.12291867951358...|
|409000362497'| -5.2860004792808E13|
+-------------+--------------------+



In [0]:
# Count the number of transactions on each date
transactions_per_date = loans_df.groupBy("VALUE DATE").count()

transactions_per_date = transactions_per_date.withColumnRenamed("count", "Num_Transactions")

transactions_per_date.show()


+----------+----------------+
|VALUE DATE|Num_Transactions|
+----------+----------------+
| 23-Dec-16|             143|
|  7-Feb-19|              98|
| 21-Jul-15|              80|
|  9-Sep-15|              91|
| 17-Jan-15|              16|
| 18-Nov-17|              53|
| 21-Feb-18|              77|
| 20-Mar-18|              71|
| 19-Apr-18|              71|
| 21-Jun-16|              97|
| 17-Oct-17|             101|
|  3-Jan-18|              70|
|  8-Jun-18|             223|
| 15-Dec-18|              62|
|  8-Aug-16|              97|
| 17-Dec-16|              74|
|  3-Sep-15|              83|
| 21-Jan-16|              76|
|  4-May-18|              92|
|  7-Sep-17|              94|
+----------+----------------+
only showing top 20 rows



In [0]:
loans_df.createOrReplaceTempView("transactions")

#query to count the number of transactions on each date
transactions_per_date_sql = spark.sql("""
    SELECT `VALUE DATE`, COUNT(*) AS Num_Transactions
    FROM transactions
    GROUP BY `VALUE DATE`
""")
transactions_per_date_sql.show()


+----------+----------------+
|VALUE DATE|Num_Transactions|
+----------+----------------+
| 23-Dec-16|             143|
|  7-Feb-19|              98|
| 21-Jul-15|              80|
|  9-Sep-15|              91|
| 17-Jan-15|              16|
| 18-Nov-17|              53|
| 21-Feb-18|              77|
| 20-Mar-18|              71|
| 19-Apr-18|              71|
| 21-Jun-16|              97|
| 17-Oct-17|             101|
|  3-Jan-18|              70|
|  8-Jun-18|             223|
| 15-Dec-18|              62|
|  8-Aug-16|              97|
| 17-Dec-16|              74|
|  3-Sep-15|              83|
| 21-Jan-16|              76|
|  4-May-18|              92|
|  7-Sep-17|              94|
+----------+----------------+
only showing top 20 rows



In [0]:
# Filter customers with withdrawal amount greater than 1 lakh (100,000)
customers_with_high_withdrawal = loans_df.filter(loans_df[" WITHDRAWAL AMT "] > 100000)

# Select the relevant columns (Account No and WITHDRAWAL AMT)
customers_with_high_withdrawal = customers_with_high_withdrawal.select("Account No", " WITHDRAWAL AMT ")

customers_with_high_withdrawal.show()


+-------------+----------------+
|   Account No| WITHDRAWAL AMT |
+-------------+----------------+
|409000611074'|          133900|
|409000611074'|          195800|
|409000611074'|          143800|
|409000611074'|          331650|
|409000611074'|          129000|
|409000611074'|          230013|
|409000611074'|          367900|
|409000611074'|          108000|
|409000611074'|          141000|
|409000611074'|          206000|
|409000611074'|          242300|
|409000611074'|          113250|
|409000611074'|          206900|
|409000611074'|          276000|
|409000611074'|          171000|
|409000611074'|          189800|
|409000611074'|          271323|
|409000611074'|          200600|
|409000611074'|          176900|
|409000611074'|          150050|
+-------------+----------------+
only showing top 20 rows



In [0]:
loans_df.createOrReplaceTempView("transactions")

#query to get customers with withdrawal amount greater than 1 lakh
customers_with_high_withdrawal_sql = spark.sql("""
    SELECT `Account No`, ` WITHDRAWAL AMT `
    FROM transactions
    WHERE ` WITHDRAWAL AMT ` > 100000
""")
customers_with_high_withdrawal_sql.show()


+-------------+----------------+
|   Account No| WITHDRAWAL AMT |
+-------------+----------------+
|409000611074'|          133900|
|409000611074'|          195800|
|409000611074'|          143800|
|409000611074'|          331650|
|409000611074'|          129000|
|409000611074'|          230013|
|409000611074'|          367900|
|409000611074'|          108000|
|409000611074'|          141000|
|409000611074'|          206000|
|409000611074'|          242300|
|409000611074'|          113250|
|409000611074'|          206900|
|409000611074'|          276000|
|409000611074'|          171000|
|409000611074'|          189800|
|409000611074'|          271323|
|409000611074'|          200600|
|409000611074'|          176900|
|409000611074'|          150050|
+-------------+----------------+
only showing top 20 rows

