In [5]:
import pandas as pd

In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when, isnan, countDistinct
import pyspark.sql.functions as F

In [9]:
spark = SparkSession.builder.appName("LoanDataQuality").getOrCreate()

In [13]:
df_loans = spark.read.option('header','true').csv("LCloan_raw_data.csv")

In [15]:
print("num of columns:", len(df_loans.columns))


num of columns: 145


In [17]:
spark_schema = dict(df_loans.dtypes)

In [19]:
loans_mapping_spark = spark.read.csv('updated_loans_mapping.csv', header=True, inferSchema=True)

In [21]:
loans_mapping = pd.read_csv('updated_loans_mapping.csv')

In [25]:
loans_mapping.head()

Unnamed: 0,column_name,Description,source_sheet,type_status,column_type,is_required,actual_column_in_loans
0,amount requested,The total amount requested by the borrower,RejectStats,not_in_file,,False,
1,application date,The date which the borrower applied,RejectStats,not_in_file,,False,
2,loan title,The loan title provided by the borrower,RejectStats,not_in_file,,False,
3,risk_score,"For applications prior to November 5, 2013 the...",RejectStats,not_in_file,,False,
4,debt-to-income ratio,A ratio calculated using the borrower’s total ...,RejectStats,not_in_file,,False,


In [71]:
# business category mapping function
def map_business_category(col_name):
    col = col_name.lower()
    

    if any(keyword in col for keyword in ['loan', 'application', 'purpose', 'policy']):
        return 'loan_info'
    elif any(keyword in col for keyword in ['fico', 'delinq', 'inq', 'credit', 'risk']):
        return 'credit_info'
    elif any(keyword in col for keyword in ['income', 'amount', 'balance', 'funded', 'bc_util', 'cur_bal']):
        return 'financial_metrics'
    elif any(keyword in col for keyword in ['pymnt', 'installment', 'total', 'rec', 'out_prncp']):
        return 'payment_info'

    elif any(keyword in col for keyword in ['emp', 'job', 'employer', 'title']):
        return 'employment_info'
    elif any(keyword in col for keyword in ['date', 'month', 'issue', 'last', 'start', 'end']):
        return 'dates'
    elif any(keyword in col for keyword in ['state', 'zip', 'addr']):
        return 'geographic_info'
    elif any(keyword in col for keyword in ['hardship', 'settlement', 'debt', 'deferral']):
        return 'special_cases'
    else:
        return 'other'


In [75]:
loans_mapping['business_category'] = loans_mapping['Description'].apply(map_business_category)

In [77]:
print(loans_mapping[['column_name', 'business_category']])

                             column_name  business_category
0                       amount requested  financial_metrics
1                       application date              dates
2                             loan title          loan_info
3                             risk_score          loan_info
4                   debt-to-income ratio          loan_info
..                                   ...                ...
275                sec_app_num_rev_accts          loan_info
276     sec_app_chargeoff_within_12_mths          loan_info
277   sec_app_collections_12_mths_ex_med          loan_info
278  sec_app_mths_since_last_major_derog          loan_info
279                  disbursement_method          loan_info

[280 rows x 2 columns]


In [79]:
len(loans_mapping[loans_mapping['business_category'] == 'other'])

31

In [81]:
loans_mapping.to_csv("new_loans_mapping.csv", index=False, encoding="utf-8-sig")


### trying a few test cases before mapping check type

In [83]:
def check_nulls(df,column_name):
    logs = ""
    null_count = df.filter(col(column_name).isNull()).count() # counting the null values
    if null_count > 0:
        logs += f"Column '{column_name}' contains {null_count} null value(s)."
        return True, logs
    else:
        logs += f"Column '{column_name}' does not contain any null values."
        return False, logs

In [85]:
check_nulls(df_loans,'title')

(True, "Column 'title' contains 23323 null value(s).")

In [94]:
def count_records(df, expected_records_num):
    """
    :param: df: spark data frame of the parquet file
    :param: expected_records_num: expected number of records
    """
    logs = ""  # Initialize logs list
    records_count = df.count()
    logs+= f"Num of records in the file: {records_count}"

    if records_count == expected_records_num:
        logs+= f"Success: the DataFrame has the expected number of records: {expected_records_num}"
        return True,logs
    else:
        logs+= f" Mismatch: the DatatFrame has {records_count} records, but {expected_records_num} were expected."
        return False, logs

In [96]:
count_records(df_loans, 800000)

(False,
 'Num of records in the file: 2260668 Mismatch: the DatatFrame has 2260668 records, but 800000 were expected.')

In [98]:
def check_duplicates(df, key_columns):
    logs_value = ""
    
    # Step 1: Group by the key columns and count occurrences
    grouped_df = df.groupBy(key_columns).count()
    
    # Step 2: Filter rows where the count is greater than 1 (these are duplicates)
    duplicates = grouped_df.filter(col("count") > 1)
    
    # Check if any duplicates were found
    if duplicates.count() > 0:
        # Collect duplicate records for logging
        duplicates_list = duplicates.collect()
        logs_value += f"Duplicate records found based on keys {key_columns}: {len(duplicates_list)} duplicates."
        return True, logs_value
    else:
        logs_value += "No duplicates found in the specified key columns."
        return False, logs_value

In [106]:
df_loans.columns

['id',
 'member_id',
 'loan_amnt',
 'funded_amnt',
 'funded_amnt_inv',
 'term',
 'int_rate',
 'installment',
 'grade',
 'sub_grade',
 'emp_title',
 'emp_length',
 'home_ownership',
 'annual_inc',
 'verification_status',
 'issue_d',
 'loan_status',
 'pymnt_plan',
 'url',
 'desc',
 'purpose',
 'title',
 'zip_code',
 'addr_state',
 'dti',
 'delinq_2yrs',
 'earliest_cr_line',
 'inq_last_6mths',
 'mths_since_last_delinq',
 'mths_since_last_record',
 'open_acc',
 'pub_rec',
 'revol_bal',
 'revol_util',
 'total_acc',
 'initial_list_status',
 'out_prncp',
 'out_prncp_inv',
 'total_pymnt',
 'total_pymnt_inv',
 'total_rec_prncp',
 'total_rec_int',
 'total_rec_late_fee',
 'recoveries',
 'collection_recovery_fee',
 'last_pymnt_d',
 'last_pymnt_amnt',
 'next_pymnt_d',
 'last_credit_pull_d',
 'collections_12_mths_ex_med',
 'mths_since_last_major_derog',
 'policy_code',
 'application_type',
 'annual_inc_joint',
 'dti_joint',
 'verification_status_joint',
 'acc_now_delinq',
 'tot_coll_amt',
 'tot_cur_

In [110]:
check_duplicates(df_loans, [
 'loan_amnt',
 'funded_amnt'])

(True,
 "Duplicate records found based on keys ['loan_amnt', 'funded_amnt']: 1852 duplicates.")

In [112]:
def compute_column_checksum(df, col_name):
    """
   checksum - compute sum of the hash values in each column .
    """
    # comput sum in each column and perform hash
    checksum = df.select(F.sum(F.hash(F.col(col_name))).alias("checksum")).collect()[0]["checksum"]
    return checksum

def find_duplicate_columns(df):
    """
 scan all columns in df and return dictionary with checsum as key, and list with same checksum as the value.
 if the columns has same checksum - they are duplicated.
    """
    checksums = {}
    duplicates = {}

    for col_name in df.columns:
        cs = compute_column_checksum(df, col_name)
        if cs in checksums:
            # אם כבר קיימת עמודה עם אותו checksum, נוסיף את השם לרשימה
            duplicates.setdefault(cs, [checksums[cs]]).append(col_name)
        else:
            checksums[cs] = col_name

    # סינון – רק אותם checksum שמופיע יותר מפעם אחת
    duplicates = {cs: cols for cs, cols in duplicates.items() if len(cols) > 1}
    return duplicates

In [None]:
find_duplicate_columns(df_loans)

In [None]:
spark.stop()

### Mapping check type

df_merged["test_function"] = ""
df_merged["test_type"] = ""
df_merged["required"] = ""
df_merged["business_category"] = ""


df_merged['business_category'] = df_merged['Description'].apply(classify_column)
df_merged['business_Category']

df_merged.drop(columns=['business_Category'], inplace=True)


### Saving the file as main mapping file


df_merged.to_csv("test_mapping_master.csv", index=False, header=True)


df_merged.to_csv("test_mapping_master_clean.csv", index=False, encoding="utf-8", sep=",")


### mapping the testing functions 

### testing functions names:


['check_consistency', 'check_duplicates', 'check_foreign_key', 'check_nulls', 'count_records', 'dtype_consistency', 'duplicated_files_by_name_and_size', 'duplicated_files_current_path', 'file_timeliness', 'moving_average', 'validate_allowed_values', 'validate_date_format', 'validate_date_range', 'validate_numeric_range', 'validate_sum', 'validate_values_length']
