In [None]:
import findspark
findspark.init()
print(findspark.find())

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from difflib import SequenceMatcher

In [None]:
# Initialize Spark session
spark = SparkSession.builder.appName("KYCDataMatching").getOrCreate()

In [None]:
# Load the data into DataFrames
bankdf = spark.read.csv("Synthetic_kyc_data.csv", header=True, inferSchema=True)
dnbdf = spark.read.csv("dnb_third_party_synthetic_banking_kyc_data.csv", header=True, inferSchema=True)
faxdf = spark.read.csv("equifax_third_party_synthetic_banking_kyc_data.csv", header=True, inferSchema=True)

In [None]:
new_column_names = [col.replace(" ", "_") for col in bankdf.columns]
bankdf = bankdf.toDF(*new_column_names)

In [None]:
new_column_names = [col.replace(" ", "_") for col in dnbdf.columns]
dnbdf = dnbdf.toDF(*new_column_names)

In [None]:
new_column_names = [col.replace(" ", "_") for col in faxdf.columns]
faxdf = faxdf.toDF(*new_column_names)

In [None]:
dnbdf = dnbdf.withColumn("Gender", when(dnbdf["Gender"] == "Male", "M").otherwise(dnbdf["Gender"]))
dnbdf = dnbdf.withColumn("Gender", when(dnbdf["Gender"] == "Female", "F").otherwise(dnbdf["Gender"]))

In [None]:
dnbdf = dnbdf.withColumn("Blood_Group", when(dnbdf["Blood_Group"] == "A Positive", "A+").otherwise(dnbdf["Blood_Group"]))
dnbdf = dnbdf.withColumn("Blood_Group", when(dnbdf["Blood_Group"] == "B Positive", "B+").otherwise(dnbdf["Blood_Group"]))
dnbdf = dnbdf.withColumn("Blood_Group", when(dnbdf["Blood_Group"] == "A Negative", "A-").otherwise(dnbdf["Blood_Group"]))
dnbdf = dnbdf.withColumn("Blood_Group", when(dnbdf["Blood_Group"] == "B Negative", "A+").otherwise(dnbdf["Blood_Group"]))

In [None]:
def standardize_date(date_str):
    if date_str is not None:
        if len(date_str.split('-')[0]) == 2:
            return '19' + date_str if int(date_str.split('-')[0]) >= 50 else '20' + date_str
    return date_str

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

standardize_date_udf = udf(standardize_date, StringType())

# Apply the UDF and convert to date format
dnbdf = dnbdf.withColumn("Date_of_Birth", standardize_date_udf("Date_of_Birth"))
dnbdf = dnbdf.withColumn("Date_of_Birth", to_date(dnbdf["Date_of_Birth"], "yyyy-MM-dd"))

In [None]:
faxdf = faxdf.withColumn("Gender", when(faxdf["Gender"] == "Male", "M").otherwise(faxdf["Gender"]))
faxdf = faxdf.withColumn("Gender", when(faxdf["Gender"] == "Female", "F").otherwise(faxdf["Gender"]))

In [None]:
faxdf = faxdf.withColumn("Blood_Group", when(faxdf["Blood_Group"] == "A Positive", "A+").otherwise(faxdf["Blood_Group"]))
faxdf = faxdf.withColumn("Blood_Group", when(faxdf["Blood_Group"] == "B Positive", "B+").otherwise(faxdf["Blood_Group"]))
faxdf = faxdf.withColumn("Blood_Group", when(faxdf["Blood_Group"] == "A Negative", "A-").otherwise(faxdf["Blood_Group"]))
faxdf = faxdf.withColumn("Blood_Group", when(faxdf["Blood_Group"] == "B Negative", "A+").otherwise(faxdf["Blood_Group"]))

In [None]:
faxdf = faxdf.withColumn("Date_of_Birth", standardize_date_udf("Date_of_Birth"))
faxdf = faxdf.withColumn("Date_of_Birth", to_date(faxdf["Date_of_Birth"], "yyyy-MM-dd"))

In [None]:
# Convert all columns to strings
bankdf = bankdf.select([col(c).cast(StringType()).alias(c) for c in bankdf.columns])
dnbdf = dnbdf.select([col(c).cast(StringType()).alias(c) for c in dnbdf.columns])
faxdf = faxdf.select([col(c).cast(StringType()).alias(c) for c in faxdf.columns])

In [None]:
bankdf_alias = bankdf.alias("bank")
dnbdf_alias = dnbdf.alias("dnb")
faxdf_alias = faxdf.alias("fax")

In [None]:
joined_df1 = bankdf_alias.join(dnbdf_alias, on = "Passport_Number", how = "outer")
joined_df2 = bankdf_alias.join(faxdf_alias, on = "Passport_Number", how = "outer")

In [None]:
def match_ratio(str1, str2):
    return SequenceMatcher(None, str1, str2).ratio()
match_ratio_udf = udf(match_ratio, FloatType())

In [None]:
result_df1 = joined_df1.withColumn("Passport_Number MR", match_ratio_udf(col("bank.Passport_Number"),col("dnb.Passport_Number"))) \
.withColumn("First_Name MR", match_ratio_udf(col("bank.First_Name"),col("dnb.First_Name"))) \
.withColumn("Last_Name MR", match_ratio_udf(col("bank.Last_Name"),col("dnb.Last_Name"))) \
.withColumn("Date_of_Birth MR", match_ratio_udf(col("bank.Date_of_Birth"),col("dnb.Date_of_Birth"))) \
.withColumn("Blood_Group MR", match_ratio_udf(col("bank.Blood_Group"),col("dnb.Blood_Group"))) \
.withColumn("Gender MR", match_ratio_udf(col("bank.Gender"),col("dnb.Gender"))) \
.withColumn("Marital_Status MR", match_ratio_udf(col("bank.Marital_Status"),col("dnb.Marital_Status"))) \
.withColumn("Education_Level MR", match_ratio_udf(col("bank.Education_Level"),col("dnb.Education_Level"))) \
.withColumn("Adress MR", match_ratio_udf(col("bank.Address"),col("dnb.Address"))) \
.withColumn("City MR", match_ratio_udf(col("bank.City"),col("dnb.City"))) \
.withColumn("Postal_Code MR", match_ratio_udf(col("bank.Postal_Code"),col("dnb.Postal_Code"))) \
.withColumn("Country MR", match_ratio_udf(col("bank.Country"),col("dnb.Country"))) \
.withColumn("Country_Code MR", match_ratio_udf(col("bank.Country_Code"),col("dnb.Country_Code"))) \
.withColumn("Phone_Number MR", match_ratio_udf(col("bank.Phone_Number"),col("dnb.Phone_Number"))) \
.withColumn("Company_Name MR", match_ratio_udf(col("bank.Company_Name"),col("dnb.Company_Name"))) \
.withColumn("Email MR", match_ratio_udf(col("bank.Email"),col("dnb.Email")))

In [None]:
result_df2 = joined_df2.withColumn("Passport_Number MR", match_ratio_udf(col("bank.Passport_Number"),col("fax.Passport_Number"))) \
.withColumn("First_Name MR", match_ratio_udf(col("bank.First_Name"),col("fax.First_Name"))) \
.withColumn("Last_Name MR", match_ratio_udf(col("bank.Last_Name"),col("fax.Last_Name"))) \
.withColumn("Date_of_Birth MR", match_ratio_udf(col("bank.Date_of_Birth"),col("fax.Date_of_Birth"))) \
.withColumn("Blood_Group MR", match_ratio_udf(col("bank.Blood_Group"),col("fax.Blood_Group"))) \
.withColumn("Gender MR", match_ratio_udf(col("bank.Gender"),col("fax.Gender"))) \
.withColumn("Marital_Status MR", match_ratio_udf(col("bank.Marital_Status"),col("fax.Marital_Status"))) \
.withColumn("Education_Level MR", match_ratio_udf(col("bank.Education_Level"),col("fax.Education_Level"))) \
.withColumn("Adress MR", match_ratio_udf(col("bank.Address"),col("fax.Address"))) \
.withColumn("City MR", match_ratio_udf(col("bank.City"),col("fax.City"))) \
.withColumn("Postal_Code MR", match_ratio_udf(col("bank.Postal_Code"),col("fax.Postal_Code"))) \
.withColumn("Country MR", match_ratio_udf(col("bank.Country"),col("fax.Country"))) \
.withColumn("Country_Code MR", match_ratio_udf(col("bank.Country_Code"),col("fax.Country_Code"))) \
.withColumn("Phone_Number MR", match_ratio_udf(col("bank.Phone_Number"),col("fax.Phone_Number"))) \
.withColumn("Company_Name MR", match_ratio_udf(col("bank.Company_Name"),col("fax.Company_Name"))) \
.withColumn("Email MR", match_ratio_udf(col("bank.Email"),col("fax.Email")))

In [None]:
overallmatch_df1 = result_df1.select(avg(col("Passport_Number MR")*100).alias("Percentage Passport_Number"),
                                     avg(col("First_Name MR")*100).alias("Percentage First_Name"),
                                     avg(col("Last_Name MR")*100).alias("Percentage Last_Name MR"),
                                     avg(col("Date_of_Birth MR")*100).alias("Percentage Date_of_Birth MR"),
                                     avg(col("Blood_Group MR")*100).alias("Percentage Blood_Group MR"),
                                     avg(col("Gender MR")*100).alias("Percentage Gender MR"),
                                     avg(col("Marital_Status MR")*100).alias("Percentage Marital_Status MR"),
                                     avg(col("Education_Level MR")*100).alias("Percentage Education_Level MR"),
                                     avg(col("Adress MR")*100).alias("Percentage Adress MR"),
                                     avg(col("City MR")*100).alias("Percentage City MR"),
                                     avg(col("Postal_Code MR")*100).alias("Percentage Postal_Code MR"),
                                     avg(col("Country MR")*100).alias("Percentage Country MR"),
                                     avg(col("Country_Code MR")*100).alias("Percentage Country_Code MR"),
                                     avg(col("Phone_Number MR")*100).alias("Percentage Phone_Number MR"),
                                     avg(col("Company_Name MR")*100).alias("Percentage Company_Name MR"),
                                     avg(col("Email MR")*100).alias("Percentage Email MR"))

In [None]:
overallmatch_df2 = result_df2.select(avg(col("Passport_Number MR")*100).alias("Percentage Passport_Number"),
                                     avg(col("First_Name MR")*100).alias("Percentage First_Name"),
                                     avg(col("Last_Name MR")*100).alias("Percentage Last_Name MR"),
                                     avg(col("Date_of_Birth MR")*100).alias("Percentage Date_of_Birth MR"),
                                     avg(col("Blood_Group MR")*100).alias("Percentage Blood_Group MR"),
                                     avg(col("Gender MR")*100).alias("Percentage Gender MR"),
                                     avg(col("Marital_Status MR")*100).alias("Percentage Marital_Status MR"),
                                     avg(col("Education_Level MR")*100).alias("Percentage Education_Level MR"),
                                     avg(col("Adress MR")*100).alias("Percentage Adress MR"),
                                     avg(col("City MR")*100).alias("Percentage City MR"),
                                     avg(col("Postal_Code MR")*100).alias("Percentage Postal_Code MR"),
                                     avg(col("Country MR")*100).alias("Percentage Country MR"),
                                     avg(col("Country_Code MR")*100).alias("Percentage Country_Code MR"),
                                     avg(col("Phone_Number MR")*100).alias("Percentage Phone_Number MR"),
                                     avg(col("Company_Name MR")*100).alias("Percentage Company_Name MR"),
                                     avg(col("Email MR")*100).alias("Percentage Email MR"))

In [None]:
# Collect results as dictionaries
overall_match_df1 = overallmatch_df1.collect()[0].asDict()
overall_match_df2 = overallmatch_df2.collect()[0].asDict()

# Print overall matching percentages
print("Overall Matching Percentages for dnb:")
for key, value in overall_match_df1.items():
    print(f"{key}: {value}%")

print("\nOverall Matching Percentages for fax:")
for key, value in overall_match_df2.items():
    print(f"{key}: {value}%")

# Compare overall matching percentages
recommendation = {}
for attribute in overall_match_df1.keys():
    if overall_match_df1[attribute] >= overall_match_df2[attribute]:
        recommendation[attribute] = "Source 1"
    else:
        recommendation[attribute] = "Source 2"

# Print recommendation
print("\nRecommended Source for Each Attribute:")
for attribute, source in recommendation.items():
    print(f"{attribute}: {source}")

In [39]:
spark.stop()

In [None]:
spark