In [0]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession

from pyspark.sql.functions import translate	
from pyspark.sql.functions import col, substring

spark=SparkSession.builder.getOrCreate()

In [0]:
# Read credit_card CSV file into dataframe
# file_path is files path
#cred is dataframe
file_path="/FileStore/tables/Bank_credit_card_data.csv"

credit_card_data=spark.read.csv(file_path,header=True,inferSchema=True)

In [0]:
display(credit_card_data)

RowNumber,CustomerId,Surname,CreditScore,Geography,Gender,Age,Tenure,Balance,NumOfProducts,HasCrCard,IsActiveMember,EstimatedSalary,Exited
1,15634602,Hargrave,619,France,Female,42,2,0.0,1,1,1,101348.88,1
2,15647311,Hill,608,Spain,Female,41,1,83807.86,1,0,1,112542.58,0
3,15619304,Onio,502,France,Female,42,8,159660.8,3,1,0,113931.57,1
4,15701354,Boni,699,France,Female,39,1,0.0,2,0,0,93826.63,0
5,15737888,Mitchell,850,Spain,Female,43,2,125510.82,1,1,1,79084.1,0
6,15574012,Chu,645,Spain,Male,44,8,113755.78,2,1,0,149756.71,1
7,15592531,Bartlett,822,France,Male,50,7,0.0,2,1,1,10062.8,0
8,15656148,Obinna,376,Germany,Female,29,4,115046.74,4,1,0,119346.88,1
9,15792365,He,501,France,Male,44,4,142051.07,2,0,1,74940.5,0
10,15592389,H?,684,France,Male,27,2,134603.88,1,1,1,71725.73,0


In [0]:
credit_card_data.printSchema()

root
 |-- RowNumber: integer (nullable = true)
 |-- CustomerId: integer (nullable = true)
 |-- Surname: string (nullable = true)
 |-- CreditScore: integer (nullable = true)
 |-- Geography: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tenure: integer (nullable = true)
 |-- Balance: double (nullable = true)
 |-- NumOfProducts: integer (nullable = true)
 |-- HasCrCard: integer (nullable = true)
 |-- IsActiveMember: integer (nullable = true)
 |-- EstimatedSalary: double (nullable = true)
 |-- Exited: integer (nullable = true)



Setup for Delta Lake

In [0]:
dbutils.fs.mkdirs("/creditd/")
working_dir= 'dbfs:/creditd'
output_path = f"{working_dir}/output"    

outputPathBronze = f"{output_path}/bronze"
outputPathSilver = f"{output_path}/silver"
outputPathGold   = f"{output_path}/gold"

dbutils.fs.rm(outputPathBronze,True)
dbutils.fs.rm(outputPathSilver,True)
dbutils.fs.rm(outputPathGold,True)

print(output_path)
print(outputPathBronze)
print(outputPathSilver)
print(outputPathGold)

dbfs:/creditd/output
dbfs:/creditd/output/bronze
dbfs:/creditd/output/silver
dbfs:/creditd/output/gold


In [0]:
#dbutils.fs.rm(outputPathGold,True)


In [0]:
# function to ingest dataframe to delta(bronze/silver/gold)
def writeToDelta(sourceDataframe, deltaPath):
    (sourceDataframe.write.format('delta').mode('overwrite').save(deltaPath))

Ingesting raw data and save it into a Delta Lake table (Bronze)

In [0]:
# Ingesting raw data to bronze layer of delta lake.
writeToDelta(credit_card_data,outputPathBronze)

In [0]:
# Register SQL table in the database
spark.sql("DROP TABLE IF EXISTS bronze_credit_card_data")
spark.sql(f"CREATE TABLE bronze_credit_card_data USING delta LOCATION '{outputPathBronze}'") 

Out[9]: DataFrame[]

In [0]:
# Optimizing the table
display(spark.sql("OPTIMIZE bronze_credit_card_data"))

path,metrics
dbfs:/creditd/output/bronze,"List(0, 0, List(null, null, 0.0, 0, 0), List(null, null, 0.0, 0, 0), 0, null, 0, 1, 1, true)"


In [0]:
# Read the bronze table
credit_card_data_bronze = spark.read.format("delta").load(outputPathBronze)
display(credit_card_data_bronze)

RowNumber,CustomerId,Surname,CreditScore,Geography,Gender,Age,Tenure,Balance,NumOfProducts,HasCrCard,IsActiveMember,EstimatedSalary,Exited
1,15634602,Hargrave,619,France,Female,42,2,0.0,1,1,1,101348.88,1
2,15647311,Hill,608,Spain,Female,41,1,83807.86,1,0,1,112542.58,0
3,15619304,Onio,502,France,Female,42,8,159660.8,3,1,0,113931.57,1
4,15701354,Boni,699,France,Female,39,1,0.0,2,0,0,93826.63,0
5,15737888,Mitchell,850,Spain,Female,43,2,125510.82,1,1,1,79084.1,0
6,15574012,Chu,645,Spain,Male,44,8,113755.78,2,1,0,149756.71,1
7,15592531,Bartlett,822,France,Male,50,7,0.0,2,1,1,10062.8,0
8,15656148,Obinna,376,Germany,Female,29,4,115046.74,4,1,0,119346.88,1
9,15792365,He,501,France,Male,44,4,142051.07,2,0,1,74940.5,0
10,15592389,H?,684,France,Male,27,2,134603.88,1,1,1,71725.73,0


In [0]:
#credit_card_data_bronze=(credit_card_data_bronze.with)

# Modifying and cleaning the bronze table.

#credit_card_data_bronze=credit_card_data_bronze.withColumn("Surname", translate(col("Surname"), "?,'", "").alias("Surname"))
credit_card_data_bronze=credit_card_data_bronze.withColumn("Surname", translate(col("Surname"), "?","").alias("Surname"))


display(credit_card_data_bronze)

RowNumber,CustomerId,Surname,CreditScore,Geography,Gender,Age,Tenure,Balance,NumOfProducts,HasCrCard,IsActiveMember,EstimatedSalary,Exited
1,15634602,Hargrave,619,France,Female,42,2,0.0,1,1,1,101348.88,1
2,15647311,Hill,608,Spain,Female,41,1,83807.86,1,0,1,112542.58,0
3,15619304,Onio,502,France,Female,42,8,159660.8,3,1,0,113931.57,1
4,15701354,Boni,699,France,Female,39,1,0.0,2,0,0,93826.63,0
5,15737888,Mitchell,850,Spain,Female,43,2,125510.82,1,1,1,79084.1,0
6,15574012,Chu,645,Spain,Male,44,8,113755.78,2,1,0,149756.71,1
7,15592531,Bartlett,822,France,Male,50,7,0.0,2,1,1,10062.8,0
8,15656148,Obinna,376,Germany,Female,29,4,115046.74,4,1,0,119346.88,1
9,15792365,He,501,France,Male,44,4,142051.07,2,0,1,74940.5,0
10,15592389,H,684,France,Male,27,2,134603.88,1,1,1,71725.73,0


In [0]:
from pyspark.sql.functions import udf
age_range = udf(lambda Age: '10s' if Age < 20 else 
                       '20s' if (Age >= 20 and Age < 30) else
                       '30s' if (Age >= 30 and Age < 40) else
                       '40s' if (Age >= 40 and Age < 50) else
                       '50s' if (Age >= 50 and Age < 60) else
                       '60s' if (Age >= 60 and Age < 70) else
                       '70s' if (Age >= 70 and Age < 80) else
                       '80s' if (Age >= 80 and Age < 90) else
                       '90s' if (Age >= 90 and Age < 100) else
                       '100+' if (Age >= 100) else '')
 
credit_card_data_bronze = credit_card_data_bronze.withColumn('Age_Group', age_range(credit_card_data_bronze.Age))
display(credit_card_data_bronze)

RowNumber,CustomerId,Surname,CreditScore,Geography,Gender,Age,Tenure,Balance,NumOfProducts,HasCrCard,IsActiveMember,EstimatedSalary,Exited,Age_Group
1,15634602,Hargrave,619,France,Female,42,2,0.0,1,1,1,101348.88,1,40s
2,15647311,Hill,608,Spain,Female,41,1,83807.86,1,0,1,112542.58,0,40s
3,15619304,Onio,502,France,Female,42,8,159660.8,3,1,0,113931.57,1,40s
4,15701354,Boni,699,France,Female,39,1,0.0,2,0,0,93826.63,0,30s
5,15737888,Mitchell,850,Spain,Female,43,2,125510.82,1,1,1,79084.1,0,40s
6,15574012,Chu,645,Spain,Male,44,8,113755.78,2,1,0,149756.71,1,40s
7,15592531,Bartlett,822,France,Male,50,7,0.0,2,1,1,10062.8,0,50s
8,15656148,Obinna,376,Germany,Female,29,4,115046.74,4,1,0,119346.88,1,20s
9,15792365,He,501,France,Male,44,4,142051.07,2,0,1,74940.5,0,40s
10,15592389,H,684,France,Male,27,2,134603.88,1,1,1,71725.73,0,20s


Save our cleaned and conformed table as a Silver table in Delta Lake

In [0]:
# After cleaning(if any) our data, we save result as silver table
writeToDelta(credit_card_data_bronze, outputPathSilver)

In [0]:
# Register SQL table in the database
spark.sql("DROP TABLE IF EXISTS silver_credit_card_data")
spark.sql(f"CREATE TABLE silver_credit_card_data USING delta LOCATION '{outputPathSilver}'") 

Out[15]: DataFrame[]

In [0]:
# Optimizing the table
display(spark.sql("OPTIMIZE silver_credit_card_data"))

path,metrics
dbfs:/creditd/output/silver,"List(0, 0, List(null, null, 0.0, 0, 0), List(null, null, 0.0, 0, 0), 0, null, 0, 1, 1, true)"


In [0]:
# Read the silver table
credit_card_data_silver= spark.read.format("delta").load(outputPathSilver)
display(credit_card_data_silver)

RowNumber,CustomerId,Surname,CreditScore,Geography,Gender,Age,Tenure,Balance,NumOfProducts,HasCrCard,IsActiveMember,EstimatedSalary,Exited,Age_Group
1,15634602,Hargrave,619,France,Female,42,2,0.0,1,1,1,101348.88,1,40s
2,15647311,Hill,608,Spain,Female,41,1,83807.86,1,0,1,112542.58,0,40s
3,15619304,Onio,502,France,Female,42,8,159660.8,3,1,0,113931.57,1,40s
4,15701354,Boni,699,France,Female,39,1,0.0,2,0,0,93826.63,0,30s
5,15737888,Mitchell,850,Spain,Female,43,2,125510.82,1,1,1,79084.1,0,40s
6,15574012,Chu,645,Spain,Male,44,8,113755.78,2,1,0,149756.71,1,40s
7,15592531,Bartlett,822,France,Male,50,7,0.0,2,1,1,10062.8,0,50s
8,15656148,Obinna,376,Germany,Female,29,4,115046.74,4,1,0,119346.88,1,20s
9,15792365,He,501,France,Male,44,4,142051.07,2,0,1,74940.5,0,40s
10,15592389,H,684,France,Male,27,2,134603.88,1,1,1,71725.73,0,20s


KPIs/queries

In [0]:

#1.	Customer details with Maximum and Minimum credit score
def detMaxMinCreditScore(cc_data):
    row1 = cc_data.agg({"creditScore": "max"}).collect()[0]
    maxCreditScore=(row1["max(creditScore)"])
    row2 = cc_data.agg({"creditScore": "min"}).collect()[0]
    minCreditScore=(row2["min(creditScore)"])
    df_KPI1=cc_data.filter(cc_data.CreditScore==maxCreditScore)
    df_KPI2=cc_data.filter(cc_data.CreditScore==minCreditScore)
    df_KPI=df_KPI1.union(df_KPI2)
    #display(df_KPI)
    return(df_KPI)

#detMaxCreditScore(credit_card_data_silver)







In [0]:
#2.	Select the customer details between age 20-60

def ageBetween(cc_data):
    ageBetween_df= cc_data.filter(cc_data.Age.between(20, 60))
    return(ageBetween_df)
#kpi_2=ageBetween(df)




In [0]:
#3.	Number of male & female customers
def countOfMaleAndFemale(cc_data):
    countOfMaleAndFemale_df=(cc_data.groupBy('Gender')).count().withColumnRenamed("count","number_of_FM_cust")
    #display(countOfMaleAndFemale_df)
    return(countOfMaleAndFemale_df)
   
#kpi_3=countOfMaleAndFemale(credit_card_data_silver)




In [0]:
#4.	Number of customers with same surname

def withSameSurname(cc_data):
    withSameSurname_df=(cc_data.groupBy('Surname')).count().withColumnRenamed("count","no_of_cust_same_surname")
    #display(withSameSurname_df)
    return(withSameSurname_df)
#kpi_4=withSameSurname(credit_card_data_silver)




In [0]:
#5.	Number of customers from each country

from pyspark.sql.functions import *
def customersCountryWise(cc_data):
    #df_KPI=(cc_data.filter(cc_data.RowNumber.count).groupBy("Geography"))
    df_KPI=(cc_data.groupBy('Geography')).count().withColumnRenamed("count","number_of_customers")
    #display(df_KPI)
    return df_KPI
    
#customersCountryWise(credit_card_data_silver)

In [0]:
#6.	Average of Age, Balance, Estimated Salary
from pyspark.sql.functions import avg
def avgAge_Balance_Estimated_Salary(cc_data):
    #df_KPI=cc_data.agg({'Age':'avg','Balance':'avg','EstimatedSalary':'avg'})
    #df_KPI=cc_data.agg({'Age':'avg','Balance':'avg','EstimatedSalary':'avg'})
    #df_KPI=cc_data.select(avg('age').alias('avg'))
    df_KPI=cc_data.select(avg('age').alias('avg'),avg('Balance').alias('Bal'),avg('EstimatedSalary').alias('EstSal'))
    #display(df_KPI)
    return df_KPI



    
#avgAge_Balance_Estimated_Salary(credit_card_data_silver)


In [0]:
#7 Count of Gender who have balance more than 2000
def countOfGenderWithBalanceMoreThan2k(cc_data):
    df_KPI=((cc_data.filter(cc_data.Balance>2000)).groupBy('Gender')).count().withColumnRenamed("count","count_having_bal_gt_2k")
    display(df_KPI)
    #df_KPI.printSchema()
    return df_KPI
    
display(countOfGenderWithBalanceMoreThan2k(credit_card_data_silver))


Gender,count_having_bal_gt_2k
Female,2889
Male,3494


Gender,count_having_bal_gt_2k
Female,2889
Male,3494


In [0]:
#8.	Details of customers who have balance more than 2000
def customerDetailsHavingBalanceMoreThan2k(cc_data):
    df_KPI=cc_data.filter(cc_data.Balance>2000)
    #display(df_KPI)
    return df_KPI
    
#customerDetailsHavingBalanceMoreThan2k(credit_card_data_silver)

In [0]:
#9.	Customers who have tenure between 8-10 and are active members with balance more than 0
def activemem(cc_data):
    df_tenture = cc_data.filter((cc_data.Tenure >= 8) & (cc_data.IsActiveMember == 1) & (cc_data.Balance > 0))
    return df_tenture
    
#val = activemem(df)
#display(val)

In [0]:
#10.	Customers having more than average balance in their account.

def avgbal(cc_data):
    BalAverage = str(cc_data.select(avg("Balance")).collect()[0][0])
    df_avgbalance = cc_data.filter(cc_data.Balance>BalAverage)
    return df_avgbalance
#val1 = avgbal(df)
#display(val1)


In [0]:
#11.	 Customers having more than average credit score.
def avgcrscr(cc_data):
    Average = str(cc_data.select(avg("CreditScore")).collect()[0][0])
    df_avgscore = cc_data.filter(cc_data.CreditScore>Average)
    return df_avgscore
#val2 = avgcrscr(df)
#display(val2)


In [0]:
#12.	  Customers who have the highest and lowest Tenure with the bank.

def minMaxTenure(cc_data):
    # df=bnk1.select(min("Tenure")).show()
    # df=bnk1.select(max("Tenure")).show()
    cst=cc_data.select(min("Tenure").alias('minTen'), max("Tenure").alias('maxTen'))
    #display(cst)
    return cst
#minMaxTenure(credit_card_data_silver)

In [0]:
#13.	 Customers with highest and lowest number of products enrolled.

def minMaxNumOfProducts(cc_data):
    # df=bnk1.select(max("NumOfProducts")).show()
    # df=bnk1.select(min("NumOfProducts")).show()
    cst=cc_data.select(min('NumOfProducts').alias("cust_lowest_prodct"),max('NumOfProducts').alias("cust_highest_prodct"))
    display(cst)
    return cst
minMaxNumOfProducts(credit_card_data_silver)


cust_lowest_prodct,cust_highest_prodct
1,4


Out[30]: DataFrame[cust_lowest_prodct: int, cust_highest_prodct: int]

In [0]:
#14.	 Number of customers that exited, has a credit card or is a active member
def customerExited(cc_data):
    df=cc_data.filter((cc_data.Exited == 1)&((cc_data.HasCrCard == 1) | (cc_data.IsActiveMember == 1)))
    return df
#customerExited(bnk1)


In [0]:
#15.	 Total Number of Customers with Zero Balance
def totCustWithZeroBal(cc_data):
    #df=(cc_data.filter(cc_data.Balance == 0)).count().alias("zero_bal")
    #df=cc_data.select(count('RowNumber') having cc_data.Balance == 0 groupBy('Balance'))
    df=(cc_data.select('Balance').where(cc_data.Balance==0).groupBy('Balance')).count().withColumnRenamed("count","number_of_custs_with_bal_0")
    #.withColumnRenamed("count","no_of_cust_with_bal_zero")
    #display(df)
    return df
#totCustWithZeroBal(credit_card_data_silver)
#select grpby bal a=and having 


In [0]:
#16.	 Details of Customers with Zero Balance who have not exited the plan
def custMoreThan2k(cc_data):
    df=cc_data.filter((cc_data.Balance==0) & (cc_data.Exited==0))
    return df
#custMoreThan2k(bnk1)


In [0]:
#17.	 Customers that have more than average Credit Score, Tenure, Balance & doesn't have a Credit Card
def avgOfCreditTenBal(cc_data):
    avg_credit_score = cc_data.select(avg(col("CreditScore"))).collect()[0][0]
    # display(avg_credit_score)
    avg_tenure = int(cc_data.select(avg(col("Tenure"))).collect()[0][0])
    # display(avg_tenure)
    avg_balance = cc_data.select(avg(col("Balance"))).collect()[0][0]
    # display(avg_balance)
    resultDf = cc_data.filter( (cc_data.CreditScore>avg_credit_score) & (cc_data.Tenure>avg_tenure) & (cc_data.Balance>avg_balance) & (cc_data.HasCrCard=="false"))
    return resultDf
#avgOfCreditTenBal(bnk1)

#display(function17(silver))


In [0]:
#18. Average Credit_Scores of Customers based on age groups
def avg_credit_score(age_df):
    avg_cs = age_df.groupBy("Age_Group").agg(avg("CreditScore").alias("Average_Credit_Score")).orderBy("Age_Group")
    cs= avg_cs.select("Age_Group", round(col('Average_Credit_Score'),2).alias("Average_Credit_Score"))
    return cs

In [0]:
#19.Distribution of customers exited among age groups
def exited_customers(age_df):
    exited_cust = age_df.filter("Exited == 1").orderBy("Age_Group")
    return exited_cust

In [0]:
#20. Number of Customers owning Credit Card under each age group
def own_credit_card(age_df):
    has_credit_card = age_df.where("HasCrCard == 1").groupBy("Age_Group").count().orderBy("Age_Group")
    return has_credit_card

In [0]:
#21. Number of Customers that are active based upon their age groups
def cust_active_grp(age_df):
    active_grp = age_df.where ("IsActiveMember == 1").groupBy("Age_Group").count().orderBy("Age_Group")
    return active_grp

In [0]:
#22. Average Balances of Customers based on age groups
def avg_cust_balance(age_df):
    avg_balance = age_df.groupBy("Age_Group").agg(avg("Balance").alias("Average_Balance")).orderBy("Age_Group")
    av=avg_balance.select("Age_Group", round(col('Average_Balance'),2).alias("Average_Balance"))
    return av

In [0]:
#23. Average Tenure periods of Customers based on age groups
def cust_avg_tenure(age_df):
    avg_tenure = age_df.groupBy("Age_Group").agg(avg("Tenure").alias("Average_Tenure")).orderBy("Age_Group")
    cvt = avg_tenure.select("Age_Group", round(col('Average_Tenure'),2).alias("Average_Tenure"))
    return cvt

In [0]:
#24. Average Estimated Salaries of Customers based on age groups 
def cust_est_salary(age_df):
    est_salary = age_df.groupBy("Age_Group").agg(avg("EstimatedSalary").alias("Avg_Est_Salary")).orderBy("Age_Group")
    es=est_salary.select("Age_Group", round(col('Avg_Est_Salary'),2).alias("Avg_Est_Salary"))
    return es


In [0]:
#25. Details of customers who have enrolled in more than two bank products
def cust_bank_prod(age_df):
    bank_prod = age_df.filter(age_df.NumOfProducts>2)
    return bank_prod

In [0]:
#26. Number of Male and Female customer who are Active Member
def cust_active_count(age_df):
    male_cust = age_df.filter((age_df.IsActiveMember  == 1) & (age_df.Gender  == "Male")).groupBy("Gender").count()
    female_cust = age_df.filter((age_df.IsActiveMember  == 1) & (age_df.Gender  == "Female")).groupBy("Gender").count()
    res = male_cust.union(female_cust)
    return res


In [0]:
#27. Number of Customers from Countries that have more than 5 years of Tenure
def cust_tenure(age_df):
    ctn = age_df.where ("Tenure > 5").groupBy("Geography").count()
    return ctn

In [0]:
#28. List the customers who are below 40 and are active member
def cust_active_mem(age_df):
    cam = age_df.filter((age_df.Age < 40) & (age_df.IsActiveMember == 1)).orderBy('RowNumber')
    return cam

In [0]:
#29
def cust_zero_balance(age_df):
    ccb = age_df.where ("Balance == 0").groupBy("Geography").count()
    return ccb

In [0]:
#30
def max_min_balance(age_df):
    maxi=age_df.agg({"Balance" : 'max'}).collect()[0][0]
    mx = age_df.filter(age_df.Balance == maxi)
    mini=age_df.agg({"Balance" : 'min'}).collect()[0][0]
    m = age_df.filter(age_df.Balance == mini)
    res= mx.unionByName(m)
    return res
   

In [0]:
#31 Estimated High Income Customers vs Low Income Customers
from pyspark.sql.types import FloatType
def high_low_es(age_df):
    
    max_sal = age_df.agg({"EstimatedSalary": 'max'}).collect()[0][0]
    min_sal= age_df.agg({"EstimatedSalary": 'min'}).collect()[0][0]
    res= max_sal/min_sal
    df=spark.createDataFrame([res],FloatType())
    return df


In [0]:
#32 Customers with Max Tenure, Max CreditScore, and does not have a Card
def max_tenure_cs_nocard(age_df):
    max_Ten = age_df.agg({"Tenure": 'max'}).collect()[0][0]
    max_Cred = age_df.agg({"CreditScore": 'max'}).collect()[0][0]
    res = age_df.filter((age_df.HasCrCard == 0) & (age_df.Tenure == max_Ten) & (age_df.CreditScore == max_Cred))
    return res
    

In [0]:
#33
def max_tenure_cs_hascard(age_df):
    max_Ten = age_df.agg({"Tenure": 'max'}).collect()[0][0]
    max_Cred = age_df.agg({"CreditScore": 'max'}).collect()[0][0]
    res = age_df.filter((age_df.HasCrCard == 1) & (age_df.Tenure == max_Ten) & (age_df.CreditScore == max_Cred))
    return res

In [0]:
#34 The youngest and oldest Customer having a Credit Card
def youngest_oldest_cust(age_df):
    min_Age = age_df.agg({"Age": 'min'}).collect()[0][0]
    max_Age = age_df.agg({"Age": 'max'}).collect()[0][0]
    Custcred_minage = age_df.where((age_df.HasCrCard == 1) & (age_df.Age == min_Age))
    Custcred_maxage = age_df.where((age_df.HasCrCard == 1) & (age_df.Age == max_Age))
    res = Custcred_minage.union(Custcred_maxage)
    return res

In [0]:
credit_card_data_silver.printSchema()

root
 |-- RowNumber: integer (nullable = true)
 |-- CustomerId: integer (nullable = true)
 |-- Surname: string (nullable = true)
 |-- CreditScore: integer (nullable = true)
 |-- Geography: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tenure: integer (nullable = true)
 |-- Balance: double (nullable = true)
 |-- NumOfProducts: integer (nullable = true)
 |-- HasCrCard: integer (nullable = true)
 |-- IsActiveMember: integer (nullable = true)
 |-- EstimatedSalary: double (nullable = true)
 |-- Exited: integer (nullable = true)
 |-- Age_Group: string (nullable = true)



Generating KPI results

In [0]:
kpi_1=detMaxMinCreditScore(credit_card_data_silver)
kpi_2=ageBetween(credit_card_data_silver)
kpi_3=countOfMaleAndFemale(credit_card_data_silver)
kpi_4=withSameSurname(credit_card_data_silver)
kpi_5 = customersCountryWise(credit_card_data_silver)
kpi_6 = avgAge_Balance_Estimated_Salary(credit_card_data_silver)
kpi_7 = countOfGenderWithBalanceMoreThan2k(credit_card_data_silver)
kpi_8 = customerDetailsHavingBalanceMoreThan2k(credit_card_data_silver)
kpi_9 = activemem(credit_card_data_silver)
kpi_10= avgbal(credit_card_data_silver)
kpi_11=avgcrscr(credit_card_data_silver)
kpi_12=minMaxTenure(credit_card_data_silver)
kpi_13=minMaxNumOfProducts(credit_card_data_silver)
kpi_14=customerExited(credit_card_data_silver)
kpi_15=totCustWithZeroBal(credit_card_data_silver)
kpi_16=custMoreThan2k(credit_card_data_silver)
kpi_17=avgOfCreditTenBal(credit_card_data_silver)
kpi_18=avg_credit_score(credit_card_data_silver)
kpi_19= exited_customers(credit_card_data_silver)
kpi_20=own_credit_card(credit_card_data_silver)
kpi_21=cust_active_grp(credit_card_data_silver)
kpi_22= avg_cust_balance(credit_card_data_silver)
kpi_23=cust_avg_tenure(credit_card_data_silver)
kpi_24=cust_est_salary(credit_card_data_silver)
kpi_25=cust_bank_prod(credit_card_data_silver)
kpi_26=cust_active_count(credit_card_data_silver)
kpi_27=cust_tenure(credit_card_data_silver)
kpi_28=cust_active_mem(credit_card_data_silver)
kpi_29=cust_zero_balance(credit_card_data_silver)
kpi_30=max_min_balance(credit_card_data_silver)
kpi_31=high_low_es(credit_card_data_silver)
kpi_32=max_tenure_cs_nocard(credit_card_data_silver)
kpi_33=max_tenure_cs_hascard(credit_card_data_silver)
kpi_34=youngest_oldest_cust(credit_card_data_silver)






Gender,count_having_bal_gt_2k
Female,2889
Male,3494


cust_lowest_prodct,cust_highest_prodct
1,4


Saving the specific business use cases i.e. KPIs as Gold table for efficient visualization

In [0]:
#ALTER TABLE table_name SET TBLPROPERTIES ('delta.minReaderVersion' = '2','delta.minWriterVersion' = '5','delta.columnMapping.mode' = 'name')

kpiList = [kpi_1,kpi_2,kpi_3,kpi_4,kpi_5,kpi_6,kpi_7,kpi_8,kpi_9,kpi_10,kpi_11,kpi_12,kpi_13,kpi_14,kpi_15,kpi_16,kpi_17,kpi_18,kpi_19,kpi_20,kpi_21,kpi_22,kpi_23,kpi_24,kpi_25,kpi_26,kpi_27,kpi_28,kpi_29,kpi_30,kpi_31,kpi_32,kpi_33,kpi_34]
index=1
for kpi in kpiList:
                #print(kpiList[index])
                kpiOutputPathGold=outputPathGold+"/kpi"+str(index)
                table_name="gold_kpi_"+str(index)
                print(table_name)
                writeToDelta(kpi,kpiOutputPathGold)
                spark.sql(f"DROP TABLE IF EXISTS {table_name}")
                spark.sql(f"CREATE TABLE {table_name} USING delta LOCATION '{kpiOutputPathGold}'")
                spark.sql(f"OPTIMIZE {table_name}")
                index+=1;
#ALTER TABLE table_name SET TBLPROPERTIES ('delta.minReaderVersion' = '2','delta.minWriterVersion' = '5','delta.columnMapping.mode' = 'name')
    

gold_kpi_1
gold_kpi_2
gold_kpi_3
gold_kpi_4
gold_kpi_5
gold_kpi_6
gold_kpi_7
gold_kpi_8
gold_kpi_9
gold_kpi_10
gold_kpi_11
gold_kpi_12
gold_kpi_13
gold_kpi_14
gold_kpi_15
gold_kpi_16
gold_kpi_17
gold_kpi_18
gold_kpi_19
gold_kpi_20
gold_kpi_21
gold_kpi_22
gold_kpi_23
gold_kpi_24
gold_kpi_25
gold_kpi_26
gold_kpi_27
gold_kpi_28
gold_kpi_29
gold_kpi_30
gold_kpi_31
gold_kpi_32
gold_kpi_33
gold_kpi_34


In [0]:
def delete_mounted_dir(dirname):
    files=dbutils.fs.ls(dirname)
    for f in files:
        if f.isDir():
            delete_mounted_dir(f.path)
        dbutils.fs.rm(f.path, True)

In [0]:
delete_mounted_dir("/mnt/creditdemo/output")

In [0]:
dbutils.fs.cp(outputPathGold, "/mnt/creditdemo/output", True)

Out[64]: True

Exiting the notebook

In [0]:
dbutils.notebook.exit("KPI Notebook Ran Successfully") 

