<a href="https://colab.research.google.com/github/balajikam/CapstoneProject/blob/main/Capstone.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import sqlite3
import pandas as pd

db = sqlite3.connect('customers.db')

query_create_table = 'CREATE TABLE customers(customer_id INTEGER, first_name VARCHAR, last_name VARCHAR, date_of_birth DATE)'
db.execute(query_create_table)

query_insert_data = '''
INSERT INTO customers (customer_id, first_name, last_name, date_of_birth) VALUES
(1, 'John', 'Doe', '1980-05-15'),
(2, 'Jane', 'Smith', '1992-08-21'),
(3, 'Alice', 'Johnson', '1975-02-10'),
(4, 'Sarah', 'Jones', '1988-12-03'),
(5, 'David', 'Brown', '1995-04-18'),
(6, 'Emma', 'Miller', '1982-07-25');
'''

db.execute(query_insert_data)

query_select_all = 'SELECT * FROM customers'
df_customers = pd.read_sql_query(query_select_all, db)
print(df_customers)

db.commit()

In [None]:
#install Apache Spark 3.0.1 with Hadoop 2.7 from here.
!wget https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop2.7.tgz

# Now, we just need to unzip that folder.
!tar -xvzf spark-3.0.0-bin-hadoop2.7.tgz
!pip install findspark


import os
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop2.7"
import findspark
findspark.init()

In [None]:
import json
from pyspark.sql import SparkSession
from pyspark.sql.types import *

spark = SparkSession.builder.appName("CapstoneProject").master("local").getOrCreate()

json_file_path = "/content/branches.json"

with open(json_file_path, 'r') as file:
    json_data = json.load(file)

schema = StructType([
    StructField("branch_id", IntegerType(), True),
    StructField("branch_name", StringType(), True),
    StructField("location", StringType(), True),
])

rows = [(branch["branch_id"], branch["branch_name"], branch["location"]) for branch in json_data["branches"]]
branches = spark.createDataFrame(rows, schema=schema)

branches.show(truncate=False)

In [None]:
import json
from pyspark.sql import SparkSession
from pyspark.sql.types import *

spark = SparkSession.builder.appName("CapstoneProject").master("local").getOrCreate()

json_file_path = "/content/accounts.json"

with open(json_file_path, 'r') as file:
    json_data = json.load(file)

schema = StructType([
    StructField("account_id", IntegerType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("employee_id", IntegerType(), True),
    StructField("account_type", StringType(), True),
    StructField("balance", IntegerType(), True),
])

rows = [
    (
        account["account_id"],
        account["customer_id"],
        account["employee_id"],
        account["account_type"],
        account["balance"]
    ) for account in json_data["accounts"]
]
accounts = spark.createDataFrame(rows, schema=schema)

accounts.show(truncate=False)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
spark = SparkSession.builder.appName("CapstoneProject").master("local").getOrCreate()

transaction = spark.read.csv(path = "/content/transactions.csv", header=True)
transaction.show(truncate=False)

In [None]:
import xml.etree.ElementTree as ET
import pandas as pd

tree = ET.parse('/content/employees.xml')
root = tree.getroot()

data = []
for child in root:
    record = {}
    for subchild in child:
        record[subchild.tag] = subchild.text
    data.append(record)

employees = pd.DataFrame(data)
print(employees)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
spark = SparkSession.builder.appName("CapstoneProject").master("local").getOrCreate()


loans = spark.read.csv(path = "/content/loans.csv", header=True)
loans.show(truncate=False)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import monotonically_increasing_id
spark = SparkSession.builder.appName("CapstoneProject").master("local").getOrCreate()

payment_history = spark.read.csv(path = "/content/payment_history.csv", header=True)
payment_history = payment_history.withColumn("payment_id", monotonically_increasing_id())
payment_history.show(truncate=False)

In [61]:
#Basic Reports
# 1. Write a spark dataframe to show the balance amount for an account_id = 1

result_df = accounts.filter(accounts["account_id"] == 1).select("balance")
result_df.show(truncate=False)

+-------+
|balance|
+-------+
|5000   |
+-------+



In [62]:
#2: List Transactions for an account_id = 1:

account_transactions = transaction.filter(transaction["account_id"] == 1)
account_transactions.show(truncate=False)

+--------------+----------+----------------+------+-------------------+
|transaction_id|account_id|transaction_type|amount|transaction_date   |
+--------------+----------+----------------+------+-------------------+
|1             |1         |Deposit         |1000  |2023-01-15 08:30:00|
|2             |1         |Withdrawal      |500   |2023-02-02 12:45:00|
|10            |1         |Deposit         |1200  |2023-10-01 10:00:00|
+--------------+----------+----------------+------+-------------------+



In [63]:
#3: List Accounts with a zero balance:

zero_balance_accounts = accounts.filter(accounts["balance"] == 0)
zero_balance_accounts.show(truncate=False)

+----------+-----------+-----------+------------+-------+
|account_id|customer_id|employee_id|account_type|balance|
+----------+-----------+-----------+------------+-------+
+----------+-----------+-----------+------------+-------+



In [64]:
#4: Find the Oldest Customer

df_customers.sort_values(by=['date_of_birth'], ascending=True).head(1)

Unnamed: 0,customer_id,first_name,last_name,date_of_birth
2,3,Alice,Johnson,1975-02-10


In [65]:
# Question 5: Calculate the Total Interest Earned Across All Accounts:
from pyspark.sql.functions import *

interest_earned = transaction.agg(sum('amount').alias('total_interest'))
interest_earned.show()

+--------------+
|total_interest|
+--------------+
|       14500.0|
+--------------+



In [66]:
#Accounts Reports
#1:- List All Accounts with Customer Information:

from pyspark.sql import *
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("SparkDemoApp").getOrCreate()

df = spark.createDataFrame(df_customers)

df_new = accounts.join(df, on = 'customer_id', how = 'inner')
result = df_new.select('customer_id', 'first_name', 'last_name', 'account_id', 'account_type', 'balance')
result.show()

  for column, series in pdf.iteritems():


+-----------+----------+---------+----------+------------+-------+
|customer_id|first_name|last_name|account_id|account_type|balance|
+-----------+----------+---------+----------+------------+-------+
|          5|     David|    Brown|         8|     Savings|   3000|
|          1|      John|      Doe|         1|     Savings|   5000|
|          1|      John|      Doe|         2|    Checking|   1000|
|          3|     Alice|  Johnson|         4|    Checking|   3000|
|          3|     Alice|  Johnson|         6|     Savings|   6000|
|          2|      Jane|    Smith|         3|     Savings|   8000|
|          2|      Jane|    Smith|         5|    Checking|   2500|
|          4|     Sarah|    Jones|         7|    Checking|  12000|
+-----------+----------+---------+----------+------------+-------+



In [21]:
#2. Calculate Total Balance for Each Customer:
import pyspark.sql.functions as F
test = accounts.groupBy("customer_id").agg(F.sum('balance').alias('total_bal'))
test1 = test.join(df, "customer_id", 'inner')
test_new= test1.orderBy(F.desc("total_bal")).show(5)

+-----------+---------+----------+---------+-------------+
|customer_id|total_bal|first_name|last_name|date_of_birth|
+-----------+---------+----------+---------+-------------+
|          4|    12000|     Sarah|    Jones|   1988-12-03|
|          2|    10500|      Jane|    Smith|   1992-08-21|
|          3|     9000|     Alice|  Johnson|   1975-02-10|
|          1|     6000|      John|      Doe|   1980-05-15|
|          5|     3000|     David|    Brown|   1995-04-18|
+-----------+---------+----------+---------+-------------+



In [22]:
#3:- Find Customers with Multiple Accounts
from pyspark.sql.types import *

multi_df = accounts.groupBy('customer_id').agg(F.count('account_id').alias('total_accounts'))
tb_df = multi_df.filter(F.col('total_accounts')>1)
tb_df.show()

+-----------+--------------+
|customer_id|total_accounts|
+-----------+--------------+
|          1|             2|
|          3|             2|
|          2|             2|
+-----------+--------------+



In [39]:
#Customer Transactions Reports
#1-List Transactions with Account and Customer Information:
df_customernew = spark.createDataFrame(df_customers)
df_customernew.show()
joined_df = transaction.join(accounts, "account_id").join(df_customernew, "customer_id")
new_df = joined_df.select('transaction_id','transaction_type','amount','transaction_date','customer_id','first_name','last_name','account_id','account_type')
new_df.show(truncate=False)


+-----------+----------+---------+-------------+
|customer_id|first_name|last_name|date_of_birth|
+-----------+----------+---------+-------------+
|          1|      John|      Doe|   1980-05-15|
|          2|      Jane|    Smith|   1992-08-21|
|          3|     Alice|  Johnson|   1975-02-10|
|          4|     Sarah|    Jones|   1988-12-03|
|          5|     David|    Brown|   1995-04-18|
|          6|      Emma|   Miller|   1982-07-25|
+-----------+----------+---------+-------------+

+--------------+----------------+------+-------------------+-----------+----------+---------+----------+------------+
|transaction_id|transaction_type|amount|transaction_date   |customer_id|first_name|last_name|account_id|account_type|
+--------------+----------------+------+-------------------+-----------+----------+---------+----------+------------+
|10            |Deposit         |1200  |2023-10-01 10:00:00|1          |John      |Doe      |1         |Savings     |
|2             |Withdrawal      |500 

In [40]:
# Question 2: Calculate Average Transaction Amount

average_transaction_amount = transaction.groupBy().agg(avg("amount").alias("average_transaction_amount"))

average_transaction_amount.show(truncate=False)

+--------------------------+
|average_transaction_amount|
+--------------------------+
|1450.0                    |
+--------------------------+



In [55]:
#3. Identify High-Value Customers with Total Balance:
import pyspark.sql.functions as F
test = accounts.groupBy("customer_id").agg(F.sum('balance').alias('total_bal'))
test1 = test.join(df_customernew, "customer_id", 'inner')
test_new= test1.orderBy(F.desc("total_bal")).show(5)

+-----------+---------+----------+---------+-------------+
|customer_id|total_bal|first_name|last_name|date_of_birth|
+-----------+---------+----------+---------+-------------+
|          4|    12000|     Sarah|    Jones|   1988-12-03|
|          2|    10500|      Jane|    Smith|   1992-08-21|
|          3|     9000|     Alice|  Johnson|   1975-02-10|
|          1|     6000|      John|      Doe|   1980-05-15|
|          5|     3000|     David|    Brown|   1995-04-18|
+-----------+---------+----------+---------+-------------+



In [60]:
#4:- List Employees and Their Assigned Customers:
from pyspark.sql import *
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("SparkDemoApp").getOrCreate()
df_emp2 = spark.createDataFrame(employees)
df_test = accounts.join(df_customernew, on ='customer_id', how ='inner')
lec_df = df_test.join(df_emp2, on = 'employee_id', how = 'inner')
lec_df.show()

  for column, series in pdf.iteritems():


+-----------+-----------+----------+------------+-------+----------+---------+-------------+---------+----------+---------+--------+
|employee_id|customer_id|account_id|account_type|balance|first_name|last_name|date_of_birth|branch_id|first_name|last_name|position|
+-----------+-----------+----------+------------+-------+----------+---------+-------------+---------+----------+---------+--------+
|          1|          3|         4|    Checking|   3000|     Alice|  Johnson|   1975-02-10|        2|      Mike|  Johnson| Manager|
|          3|          1|         2|    Checking|   1000|      John|      Doe|   1980-05-15|        2|    Robert|    Davis|  Teller|
|          3|          3|         6|     Savings|   6000|     Alice|  Johnson|   1975-02-10|        2|    Robert|    Davis|  Teller|
|          4|          2|         3|     Savings|   8000|      Jane|    Smith|   1992-08-21|        3|    Olivia|   Wilson|  Teller|
|          4|          4|         7|    Checking|  12000|     Sarah| 

In [42]:
#5. Calculate the Total Number of Transactions for Each Account Type:
joined_df = transaction.join(accounts, "account_id", "inner")

total_transactions_by_type = joined_df.groupBy("account_type").agg(count("transaction_id").alias("total_transactions"))

total_transactions_by_type.show()

+------------+------------------+
|account_type|total_transactions|
+------------+------------------+
|     Savings|                 5|
|    Checking|                 5|
+------------+------------------+



In [47]:
#6. Find Customers with No Accounts:
#7. Find Customers with No Accounts:
customers_alias = df_customernew.alias("c")
accounts_alias = accounts.alias("a")
customers_with_no_accounts = customers_alias.join(accounts_alias, col("c.customer_id") == col("a.customer_id"), "left_anti")
customers_with_no_accounts.show()

+-----------+----------+---------+-------------+
|customer_id|first_name|last_name|date_of_birth|
+-----------+----------+---------+-------------+
|          6|      Emma|   Miller|   1982-07-25|
+-----------+----------+---------+-------------+



In [48]:
#8. List the Latest Transaction for Each Account:
window_spec = Window.partitionBy("account_id").orderBy(F.desc("transaction_date"))
df_transactions_with_row_number = transaction.withColumn("row_number", F.row_number().over(window_spec))
latest_transactions = df_transactions_with_row_number.filter("row_number = 1")
latest_transactions = latest_transactions.drop("row_number")
latest_transactions.show()

+--------------+----------+----------------+------+-------------------+
|transaction_id|account_id|transaction_type|amount|   transaction_date|
+--------------+----------+----------------+------+-------------------+
|             7|         3|      Withdrawal|   800|2023-07-08 14:15:00|
|             6|         5|         Deposit|  2000|2023-06-12 11:30:00|
|            10|         1|         Deposit|  1200|2023-10-01 10:00:00|
|             9|         4|      Withdrawal|  1500|2023-09-14 09:30:00|
|             8|         2|         Deposit|  3000|2023-08-22 16:45:00|
+--------------+----------+----------------+------+-------------------+



In [51]:
#9.Calculate the Total Withdrawals for Each Customer:
trans_df = df_new.join(transaction, 'account_id')
trans_df2 = trans_df[trans_df['transaction_type'] == 'Withdrawal']
trans_df3 = trans_df2.select('customer_id', 'first_name', 'last_name', 'amount')
trans_df4 = trans_df3.groupBy('customer_id', 'first_name', 'last_name').agg(F.sum('amount').alias('total_withdrwals'))
trans_df4.show()



+-----------+----------+---------+----------------+
|customer_id|first_name|last_name|total_withdrwals|
+-----------+----------+---------+----------------+
|          3|     Alice|  Johnson|          1500.0|
|          2|      Jane|    Smith|          1800.0|
|          1|      John|      Doe|           500.0|
+-----------+----------+---------+----------------+



In [53]:
#10-Find Duplicate Transactions:
df = transaction.groupby('transaction_id').count().alias("duplicate_count")
df_new = df.filter(col("count" )> 1).alias("duplicate")
df_new.show()

+--------------+-----+
|transaction_id|count|
+--------------+-----+
+--------------+-----+

