In [0]:
from pyspark.sql.functions import desc
from pyspark.sql.functions import when

In [0]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Bankingdata').getOrCreate()

In [0]:
Bank_cust_data = spark.read.csv('/FileStore/tables/bank_customer.csv', header = True, inferSchema = True) 

In [0]:
display(Bank_cust_data)

CUST_ID,CUST_FNAME,INITIALS,CUST_LNAME,CUST_SEX,CUST_DOB,CUST_TYPE
1001,John,,Smith,M,21-OCT-78,IND
1002,Franklin,J,Dang,M,21-OCT-78,IND
1003,Alice,M,Powell,F,09-APR-76,IND
1004,Ramesh,R,Narayan,M,08-DEC-71,IND
1005,Nancy,S,Taylor,F,17-AUG-79,IND
1006,Ahmad,M A,Jabbar,M,25-JUN-72,IND
1007,DealWell Co.,,,,,CORP
1008,Aman,K,Mathur,M,30-OCT-80,IND
1009,Smitha,,Ramesh,F,03-MAR-76,IND
1010,Popular Enterprises,,,,,CORP


In [0]:
Bank_fd_data = spark.read.csv("/FileStore/tables/bank_fd_account.csv", header = True, inferSchema = True)

In [0]:
Bank_fd_data.display()

ACCOUNT_NO,CUST_ID,START_DT,FD_TERM_MNTH,INITIAL_AMT,ACC_STATUS
5212340001,1002,22-JUN-00,30,15000,Closed
5212340002,1002,11-MAY-02,66,25000,Active
5212340003,1007,26-NOV-03,36,80555,Closed
5212340004,1003,29-NOV-04,60,30000,Active
5212340005,1009,23-JUN-04,120,45000,Active
5212340006,1001,26-SEP-05,24,45000,Active
5212340007,1010,18-OCT-05,84,15000000,Active
5212340008,1008,18-DEC-06,36,98765,Active
5212340009,1006,14-FEB-07,120,25000,Active
5212340010,1007,21-SEP-08,60,2500000,Active


In [0]:
Bank_rd_data = spark.read.csv("/FileStore/tables/bank_rd_account.csv", header = True, inferSchema = True)

In [0]:
display(Bank_rd_data)

ACCOUNT_NO,RD_CUST_ID,RD_START_DT,RD_TERM_YRS,INSTALLMENT_AMT,CURR_BALANCE,ACC_STATUS
6000010001,1006,13-JUN-06,10,1000,75000,Active
6000010002,1003,26-NOV-06,6,3000,208000,Active
6000010003,1008,03-OCT-07,5,2500,148000,Active
6000010004,1009,29-APR-07,4,6000,0,Closed
6000010005,1001,17-FEB-08,1,10000,0,Closed
6000010006,1007,16-JUL-08,9,20000,1000012,Active
6000010007,1001,26-MAY-08,4,5000,0,Closed
6000010008,1010,18-DEC-09,7,50000,1550000,Active
6000010009,1007,20-APR-09,4,14000,560000,Active
6000010010,1010,08-MAY-10,7,25000,700000,Active


In [0]:
Bank_sb_data = spark.read.csv("/FileStore/tables/bank_sb_account.csv", header = True, inferSchema = True)

In [0]:
display(Bank_sb_data)

ACCOUNT_NO,PRIMARY_CUST_ID,SECONDARY_CUST_ID,CURR_BAL_AMT,ACC_STATUS,START_DATE,END_DATE
1000012003,1005,1001.0,0.0,Closed,20-NOV-06,23-AUG-11
1000012004,1008,,84924.0,Active,11-MAR-06,
1000012005,1009,1004.0,209844.03,Active,20-NOV-06,
1000012006,1004,,0.0,Closed,12-SEP-07,12-SEP-10
1000012007,1003,,12300.0,Inactive,09-DEC-08,
1000012008,1005,,10235.0,Active,23-MAY-09,
1000012009,1006,,73535.0,Active,10-OCT-10,
1000012010,1004,,54674.0,Active,09-OCT-11,
1000012001,1001,,25000.0,Active,06-SEP-04,
1000012002,1002,,31109.43,Active,16-DEC-05,


In [0]:
temp_bank_fd = "bank_fd_account_csv"

Bank_fd_data.createOrReplaceTempView(temp_bank_fd)

In [0]:
temp_bank_cust = "bank_cust_account_csv"

Bank_cust_data.createOrReplaceTempView(temp_bank_cust)

In [0]:
temp_bank_rd = "bank_rd_account_csv"

Bank_rd_data.createOrReplaceTempView(temp_bank_rd)

In [0]:
temp_bank_sb = "bank_sb_account_csv"

Bank_sb_data.createOrReplaceTempView(temp_bank_sb)

Get a list of customers who have active FD account with more than 25000 as the initial amount. The list should have the customer id, first name, FD account  number and initial amount; the list should be sorted on the initial amount with highest amount first.

In [0]:
%sql

select  cust_fname, account_no, bank_cust.cust_id, initial_amt from `bank_cust_account_csv` bank_cust inner join `bank_fd_account_csv` 
where initial_amt >= 25000 and acc_status == 'Active'
order by initial_amt 

cust_fname,account_no,cust_id,initial_amt
John,5212340002,1001,25000
Franklin,5212340002,1002,25000
Alice,5212340002,1003,25000
Ramesh,5212340002,1004,25000
Nancy,5212340002,1005,25000
Ahmad,5212340002,1006,25000
DealWell Co.,5212340002,1007,25000
Aman,5212340002,1008,25000
Smitha,5212340002,1009,25000
Popular Enterprises,5212340002,1010,25000


In [0]:
Bank_cust_data.join(Bank_fd_data, Bank_cust_data.CUST_ID == Bank_fd_data.CUST_ID, 'inner').select(Bank_cust_data["cust_id"],'cust_fname','account_no', 'initial_amt').where((Bank_fd_data.INITIAL_AMT >= 25000) & (Bank_fd_data.ACC_STATUS == 'Active')).orderBy("INITIAL_AMT").show(truncate = False)

+-------+-------------------+----------+-----------+
|cust_id|cust_fname         |account_no|initial_amt|
+-------+-------------------+----------+-----------+
|1002   |Franklin           |5212340002|25000      |
|1006   |Ahmad              |5212340009|25000      |
|1003   |Alice              |5212340004|30000      |
|1009   |Smitha             |5212340005|45000      |
|1001   |John               |5212340006|45000      |
|1008   |Aman               |5212340008|98765      |
|1007   |DealWell Co.       |5212340010|2500000    |
|1010   |Popular Enterprises|5212340007|15000000   |
+-------+-------------------+----------+-----------+



Get a list of customers who have active accounts in all three types FD, RD and DB, along
with their net worth. Net worth is calculated as the total of FD Initial Amount, RD Current
Balance and SB Current Balance. The list should have the customer id, first name, FD
account number, FD initial amount, RD Current Balance, SB Current Balance and Net worth
for all active accounts of the customers; the list should be sorted on net worth with highest
amount appearing first.

In [0]:
%sql

select cust.cust_id, cust.cust_fname, fd.account_no, fd.initial_amt, rd.curr_balance, sb.curr_bal_amt, (fd.initial_amt+rd.curr_balance+sb.curr_bal_amt) as networth
from bank_cust_account_csv cust
inner join bank_fd_account_csv fd
on cust.cust_id = fd.cust_id
inner join bank_rd_account_csv rd
on fd.cust_id = rd.RD_CUST_ID
inner join bank_sb_account_csv sb
on rd.RD_CUST_ID = sb.primary_cust_id
order by networth desc

cust_id,cust_fname,account_no,initial_amt,curr_balance,curr_bal_amt,networth
1008,Aman,5212340008,98765,148000,84924.0,331689.0
1009,Smitha,5212340005,45000,0,209844.03,254844.03
1003,Alice,5212340004,30000,208000,12300.0,250300.0
1006,Ahmad,5212340009,25000,75000,73535.0,173535.0
1001,John,5212340006,45000,0,25000.0,70000.0
1001,John,5212340006,45000,0,25000.0,70000.0


In [0]:
Bankcust = Bank_cust_data.alias("Bankcust")

Bankfd = Bank_fd_data.alias('Bankfd')


Bankrd = Bank_rd_data.alias('Bankrd')


Banksb = Bank_sb_data.alias('Banksb')

In [0]:
Bankcust.join(Bankfd, Bankcust.CUST_ID == Bankfd.CUST_ID, 'inner').join(Bankrd, Bankfd.CUST_ID == Bankrd.RD_CUST_ID,'inner').join(Banksb, Bankrd.RD_CUST_ID == Banksb.PRIMARY_CUST_ID,'inner').select(Bankcust["CUST_ID"],Bankcust["CUST_FNAME"], Bankfd["ACCOUNT_NO"], Bankrd["CURR_BALANCE"], Banksb["CURR_BAL_AMT"], (Bankfd["INITIAL_AMT"] + Bankrd["CURR_BALANCE"] + Banksb["CURR_BAL_AMT"]).alias("NETWORTH")).where(Bankfd.ACC_STATUS == 'Active').orderBy(desc("NETWORTH")).show(truncate = False)

+-------+----------+----------+------------+------------+---------+
|CUST_ID|CUST_FNAME|ACCOUNT_NO|CURR_BALANCE|CURR_BAL_AMT|NETWORTH |
+-------+----------+----------+------------+------------+---------+
|1008   |Aman      |5212340008|148000      |84924.0     |331689.0 |
|1009   |Smitha    |5212340005|0           |209844.03   |254844.03|
|1003   |Alice     |5212340004|208000      |12300.0     |250300.0 |
|1006   |Ahmad     |5212340009|75000       |73535.0     |173535.0 |
|1001   |John      |5212340006|0           |25000.0     |70000.0  |
|1001   |John      |5212340006|0           |25000.0     |70000.0  |
+-------+----------+----------+------------+------------+---------+



a new DataFrame that has a column “CUST_TYP_INDEX” which is a unique number
assigned to each customer type. For example: 1.0 for Corporate type of customer (CORP),
2 for individual customer (IND).

In [0]:
Bankcust.display()

CUST_ID,CUST_FNAME,INITIALS,CUST_LNAME,CUST_SEX,CUST_DOB,CUST_TYPE
1001,John,,Smith,M,21-OCT-78,IND
1002,Franklin,J,Dang,M,21-OCT-78,IND
1003,Alice,M,Powell,F,09-APR-76,IND
1004,Ramesh,R,Narayan,M,08-DEC-71,IND
1005,Nancy,S,Taylor,F,17-AUG-79,IND
1006,Ahmad,M A,Jabbar,M,25-JUN-72,IND
1007,DealWell Co.,,,,,CORP
1008,Aman,K,Mathur,M,30-OCT-80,IND
1009,Smitha,,Ramesh,F,03-MAR-76,IND
1010,Popular Enterprises,,,,,CORP


In [0]:
cust_type_col = Bankcust.select('CUST_TYPE')    

In [0]:
cust_type_index = cust_type_col.withColumn("cust_type",  when(Bankcust.CUST_TYPE == "IND", 1).when(Bankcust.CUST_TYPE == "CORP", 2)).withColumnRenamed("CUST_TYPE","CUST_TYPE_INDEX").show()
    
    
        

+---------------+
|CUST_TYPE_INDEX|
+---------------+
|              1|
|              1|
|              1|
|              1|
|              1|
|              1|
|              2|
|              1|
|              1|
|              2|
+---------------+



Get a list of customer ids along with the account number and amount of their active
accounts of each type of account they have.
For example, if a customer has all 3 types of accounts active, then the list should contain
customer id, active FD account number, initial amount, status, active RD account number,
balance amount, status, active SB account number, balance amount, status.
If a customer has only 2 types of account
active then the list should have details of only
these 2 accounts. The list should be sorted by the customer id and for each customer id it
should be sorted on account number.

In [0]:
%sql

SELECT bank_fd_account_csv.CUST_ID, bank_fd_account_csv.ACCOUNT_NO, bank_fd_account_csv.INITIAL_AMT AS AMT, bank_fd_account_csv.ACC_STATUS FROM bank_fd_account_csv WHERE bank_fd_account_csv.ACC_STATUS = 'Active'
UNION
SELECT bank_rd_account_csv.RD_CUST_ID, bank_rd_account_csv.ACCOUNT_NO, bank_rd_account_csv.CURR_BALANCE, bank_rd_account_csv.ACC_STATUS FROM bank_rd_account_csv WHERE bank_rd_account_csv.ACC_STATUS = 'Active'
UNION
SELECT bank_sb_account_csv.PRIMARY_CUST_ID, bank_sb_account_csv.ACCOUNT_NO, bank_sb_account_csv.CURR_BAL_AMT, bank_sb_account_csv.ACC_STATUS FROM bank_sb_account_csv WHERE bank_sb_account_csv.ACC_STATUS = 'Active'
ORDER BY CUST_ID, ACCOUNT_NO

CUST_ID,ACCOUNT_NO,AMT,ACC_STATUS
1001,1000012001,25000.0,Active
1001,5212340006,45000.0,Active
1002,1000012002,31109.43,Active
1002,5212340002,25000.0,Active
1003,5212340004,30000.0,Active
1003,6000010002,208000.0,Active
1004,1000012010,54674.0,Active
1005,1000012008,10235.0,Active
1006,1000012009,73535.0,Active
1006,5212340009,25000.0,Active


d
the above to add the customer first name and also to add a column named acc_type
which should show
'FD' if it is an FD account
'RD' if it is an RD account
'SB' if it is an SB account

In [0]:
%sql


SELECT bank_cust_account_csv.CUST_FNAME, bank_fd_account_csv.CUST_ID, bank_fd_account_csv.ACCOUNT_NO, bank_fd_account_csv.INITIAL_AMT AS AMT, bank_fd_account_csv.ACC_STATUS, 'FD' AS ACC_TYPE
FROM bank_cust_account_csv 
JOIN bank_fd_account_csv 
ON bank_cust_account_csv.CUST_ID = bank_fd_account_csv.CUST_ID 
WHERE bank_fd_account_csv.ACC_STATUS = 'Active'

UNION

SELECT bank_cust_account_csv.CUST_FNAME, bank_rd_account_csv.RD_CUST_ID, bank_rd_account_csv.ACCOUNT_NO, bank_rd_account_csv.CURR_BALANCE, bank_rd_account_csv.ACC_STATUS, 'RD' AS ACC_TYPE
FROM bank_cust_account_csv 
JOIN bank_rd_account_csv 
ON bank_cust_account_csv.CUST_ID = bank_rd_account_csv.RD_CUST_ID 
WHERE bank_rd_account_csv.ACC_STATUS = 'Active'

UNION

SELECT bank_cust_account_csv.CUST_FNAME, bank_sb_account_csv.PRIMARY_CUST_ID, bank_sb_account_csv.ACCOUNT_NO, bank_sb_account_csv.CURR_BAL_AMT, bank_sb_account_csv.ACC_STATUS, 'SB' AS ACC_TYPE
FROM bank_cust_account_csv 
JOIN bank_sb_account_csv 
ON bank_cust_account_csv.CUST_ID = bank_sb_account_csv.PRIMARY_CUST_ID 
WHERE bank_sb_account_csv.ACC_STATUS = 'Active'

ORDER BY CUST_ID, ACCOUNT_NO

CUST_FNAME,CUST_ID,ACCOUNT_NO,AMT,ACC_STATUS,ACC_TYPE
John,1001,1000012001,25000.0,Active,SB
John,1001,5212340006,45000.0,Active,FD
Franklin,1002,1000012002,31109.43,Active,SB
Franklin,1002,5212340002,25000.0,Active,FD
Alice,1003,5212340004,30000.0,Active,FD
Alice,1003,6000010002,208000.0,Active,RD
Ramesh,1004,1000012010,54674.0,Active,SB
Nancy,1005,1000012008,10235.0,Active,SB
Ahmad,1006,1000012009,73535.0,Active,SB
Ahmad,1006,5212340009,25000.0,Active,FD
