# 1- Imports

In [1]:
import numpy as np
import pandas as pd
import yaml
import sys
import os
from pathlib import Path
import re
import json

from pyspark.sql import functions as F
from pyspark.sql.types import DateType, ArrayType, StringType
from pyspark.sql.functions import udf, broadcast
from datetime import datetime

In [2]:
# paths and custom modules
cwd_path = Path(os.getcwd())
data_path = cwd_path / 'data'
sys.path.append(str(cwd_path))

from utils import return_catalog, dataFetchingFunc, strip_strings, stnd_missing_values_type, build_spark_session

data_catalog, models_catalog = return_catalog()

In [3]:
# set the pyspark path to env excutable
conda_env_path = sys.prefix
os.environ['PYSPARK_PYTHON'] = os.path.join(conda_env_path, 'python')

# build spark session
spark = build_spark_session()
spark

# 2- Fetch

In [4]:
data_date = '20240810'

# fetching data
risk_cust_attrib_df = pd.read_parquet(data_path/f"feed/{data_date}_risk_cust_attributes.parquet")

# commented because we are not using btech sales for this iteration
# risk_cust_trx_df = pd.read_parquet(data_path+f"feed/{data_date}_risk_cust_trx.parquet")
# risk_cust_attrib_df_sp = spark.read.parquet(data_path+f"feed/{data_date}_risk_cust_attributes.parquet")
# risk_cust_trx_df_sp= spark.read.parquet(data_path+f"feed/{data_date}_risk_cust_trx.parquet")

ssn_gov_map_df = pd.read_excel(data_path/f"feed/live_lists/20240527_ssn_governorate_mapping.xlsx", sheet_name='main_translated')

In [5]:
# standrizing missing values for pandas
risk_cust_attrib_df = stnd_missing_values_type(risk_cust_attrib_df)

#risk_cust_trx_df = stnd_missing_values_type(risk_cust_trx_df)

# creating a L0-df replica
risk_cust_attrib_df_replica = risk_cust_attrib_df.copy()
#risk_cust_trx_df_replica = risk_cust_trx_df.copy()

# 3- Build

## Process Phone Numbers

In [6]:
def clean_phone_numbers(df, col_name, mode='pandas'):
    if mode == 'pandas':
        df = df.copy()
        df[col_name] = (df[col_name]
                        .str.replace(r"\D+", "", regex=True)
                        .str[-11:]
                        .apply(lambda x: "0" + x if len(str(x)) == 10 else x))
        df.loc[df[col_name].str.len() != 11, col_name] = np.nan
        df.loc[df[col_name].apply(lambda x: len(set(str(x))) == 1), col_name] = np.nan
    elif mode == 'spark':
        df = df.withColumn(col_name, F.regexp_replace(F.col(col_name), r"\D+", ""))
        df = df.withColumn(col_name, F.expr(f"substring({col_name}, -11, 11)"))
        df = df.withColumn(col_name, F.when(F.length(F.col(col_name)) == 10, F.concat(F.lit("0"), F.col(col_name))).otherwise(F.col(col_name)))
        df = df.withColumn(col_name, F.when(F.length(F.col(col_name)) != 11, None).otherwise(F.col(col_name)))

        # Define a UDF to convert a string into a list of characters
        string_to_list = udf(lambda x: list(x) if x else [], ArrayType(StringType()))
        df = df.withColumn(col_name, F.when(F.size(F.array_distinct(string_to_list(F.col(col_name)))) == 1, None).otherwise(F.col(col_name)))
    else:
        raise ValueError("Invalid mode. Choose either 'pandas' or 'spark'.")
    return df


# Clean phones for merges
risk_cust_attrib_df = clean_phone_numbers(risk_cust_attrib_df, 'phone_number_1')
risk_cust_attrib_df = clean_phone_numbers(risk_cust_attrib_df, 'phone_number_2')
#risk_cust_trx_df = clean_phone_numbers(risk_cust_trx_df, 'customer_phone')

# risk_cust_attrib_df_sp = clean_phone_numbers(risk_cust_attrib_df_sp, 'phone_number_1', mode='spark')
# risk_cust_attrib_df_sp = clean_phone_numbers(risk_cust_attrib_df_sp, 'phone_number_2', mode='spark')
# risk_cust_trx_df_sp = clean_phone_numbers(risk_cust_trx_df_sp, 'customer_phone', mode='spark')

## Process iScore

In [7]:
def select_relevant_iscore_columns(df):
    return df[['client_id','is_iscore', 'iscore_score', 'iscore_report']]

def update_no_hit_consumers(df):
    no_hit_mask = df['iscore_report'].str.contains('iScoreNohitConsumer') | df['iscore_report'].str.contains('Response NoHit')
    df.loc[no_hit_mask, 'is_iscore'] = 0
    df = df[df['is_iscore'] == 1]
    return df

def update_iscore_report_format(df):
    data_packet_mask = df['iscore_report'].str.contains('<DATAPACKET>')
    df['is_iscore_new_format'] = (~data_packet_mask).astype('int16')
    iscore_nohit_mask = df['is_iscore'] == 0
    df.loc[iscore_nohit_mask, 'is_iscore_new_format'] = np.nan
    return df

def process_open_accounts_old_format(df):
    def extract_open_accounts_old(report):
        try:
            CONSUMER_CREDIT_FACILITY = report.split('CONSUMER_CREDIT_FACILITY')[1]
        except IndexError:
            return pd.Series({
                'OpenAccounts_count_old': 0,
                'OpenAccounts_ApprovalAmount_sum_old': 0,
                'OpenAccounts_BalanceAmount_sum_old': 0,
                'OpenAccounts_InstallmentAmount_sum_old': 0,
                'OpenAccounts_MaxDaysDue_sum_old': 0,
            }, dtype='float64')

        def extract_sum(pattern, data):
            matches = re.findall(pattern, data)
            return sum(int(x) for x in matches if x.isdigit()) if matches else 0

        OpenAccounts_count_old = len(re.findall(r'CREDIT_DETAILS ID', CONSUMER_CREDIT_FACILITY))
        OpenAccounts_ApprovalAmount_sum_old = extract_sum(r'<APPROVAL_AMOUNT>(.*?)</APPROVAL_AMOUNT>', CONSUMER_CREDIT_FACILITY)
        OpenAccounts_BalanceAmount_sum_old = extract_sum(r'<CURRENT_BALANCE>(.*?)</CURRENT_BALANCE>', CONSUMER_CREDIT_FACILITY)
        OpenAccounts_InstallmentAmount_sum_old = extract_sum(r'<AMT_OF_INSTALMENT>(.*?)</AMT_OF_INSTALMENT>', CONSUMER_CREDIT_FACILITY)
        OpenAccounts_MaxDaysDue_sum_old = extract_sum(r'<MAX_NUM_DAYS_DUE>(.*?)</MAX_NUM_DAYS_DUE>', CONSUMER_CREDIT_FACILITY)

        return pd.Series({
            'OpenAccounts_count_old': float(OpenAccounts_count_old),
            'OpenAccounts_ApprovalAmount_sum_old': float(OpenAccounts_ApprovalAmount_sum_old),
            'OpenAccounts_BalanceAmount_sum_old': float(OpenAccounts_BalanceAmount_sum_old),
            'OpenAccounts_InstallmentAmount_sum_old': float(OpenAccounts_InstallmentAmount_sum_old),
            'OpenAccounts_MaxDaysDue_sum_old': float(OpenAccounts_MaxDaysDue_sum_old),
        })

    columns = [
        'OpenAccounts_count_old',
        'OpenAccounts_ApprovalAmount_sum_old',
        'OpenAccounts_BalanceAmount_sum_old',
        'OpenAccounts_InstallmentAmount_sum_old',
        'OpenAccounts_MaxDaysDue_sum_old'
    ]
    for col in columns:
        df[col] = np.nan

    mask = (df['is_iscore'] == 1) & (df['is_iscore_new_format'] == 0)
    df.loc[mask, columns] = df.loc[mask, 'iscore_report'].apply(extract_open_accounts_old)
    df[columns] = df[columns].fillna(0).astype('int64')

    return df

def process_closed_accounts_old_format(df):
    def extract_closed_accounts_old(report):
        parts = report.split('CONSUMER_CLOSED_ACCOUNT')
        if len(parts) == 3:
            CONSUMER_CLOSED_ACCOUNT = parts[1]
        else:
            return pd.Series({
                'ClosedAccounts_count_old': 0,
                'ClosedAccounts_ApprovalAmount_sum_old': 0,
                'ClosedAccounts_InstallmentAmount_sum_old': 0,
                'ClosedAccounts_MaxDaysDue_sum_old': 0,
            }, dtype='float64')

        def extract_sum(pattern, data):
            matches = re.findall(pattern, data)
            return sum(int(x) for x in matches if x.isdigit()) if matches else 0

        ClosedAccounts_count_old = len(re.findall(r'CLOSED_ACCOUNTS ID', CONSUMER_CLOSED_ACCOUNT))
        ClosedAccounts_ApprovalAmount_sum_old = extract_sum(r'<APPROVAL_AMOUNT>(.*?)</APPROVAL_AMOUNT>', CONSUMER_CLOSED_ACCOUNT)
        ClosedAccounts_InstallmentAmount_sum_old = extract_sum(r'<AMT_OF_INSTALMENT>(.*?)</AMT_OF_INSTALMENT>', CONSUMER_CLOSED_ACCOUNT)
        ClosedAccounts_MaxDaysDue_sum_old = extract_sum(r'<MAX_NUM_DAYS_DUE>(.*?)</MAX_NUM_DAYS_DUE>', CONSUMER_CLOSED_ACCOUNT)

        return pd.Series({
            'ClosedAccounts_count_old': float(ClosedAccounts_count_old),
            'ClosedAccounts_ApprovalAmount_sum_old': float(ClosedAccounts_ApprovalAmount_sum_old),
            'ClosedAccounts_InstallmentAmount_sum_old': float(ClosedAccounts_InstallmentAmount_sum_old),
            'ClosedAccounts_MaxDaysDue_sum_old': float(ClosedAccounts_MaxDaysDue_sum_old),
        })

    columns = [
        'ClosedAccounts_count_old',
        'ClosedAccounts_ApprovalAmount_sum_old',
        'ClosedAccounts_InstallmentAmount_sum_old',
        'ClosedAccounts_MaxDaysDue_sum_old'
    ]
    for col in columns:
        df[col] = np.nan

    mask = (df['is_iscore'] == 1) & (df['is_iscore_new_format'] == 0)
    df.loc[mask, columns] = df.loc[mask, 'iscore_report'].apply(extract_closed_accounts_old)
    df[columns] = df[columns].fillna(0).astype('int64')

    return df


def process_open_accounts_new_format(df):
    def extract_open_accounts_new(report):  
        # Try to load JSON data, fallback to raw report on failure
        try:  
            report = json.loads(report)
        except json.JSONDecodeError:
            return pd.Series({
                'OpenAccounts_count_new': 0,
                'OpenAccounts_ApprovalAmount_sum_new': 0,
                'OpenAccounts_BalanceAmount_sum_new': 0,
                'OpenAccounts_InstallmentAmount_sum_new': 0,
                'OpenAccounts_MaxDaysDue_sum_new': 0,
            }, dtype='float64')

        # Initialize variables
        CreditFacilities = []
        OpenAccount = []

        # Extract CreditFacilities from the report
        if isinstance(report, list):
            for module in report:
                if isinstance(module, dict) and module.get('ModuleId') == 'iScoreCreditProfileOverviewConsumer':
                    content = module.get('Content', {})
                    data = content.get('DATA', [{}])
                    CreditFacilities = data[0].get('CreditFacilities', [])
                    OpenAccount = data[0].get('OpenAccount', [])

        # Define helper function for parsing integers
        def try_parse_int(s):
            try:
                return int(s)
            except ValueError:
                return 0

        # Calculate results
        result = {
            'OpenAccounts_count_new': sum(try_parse_int(x.get('NoOfAccounts', 0)) for x in CreditFacilities),
            'OpenAccounts_ApprovalAmount_sum_new': sum(try_parse_int(x.get('TotalApprovalAmt', 0)) for x in CreditFacilities),
            'OpenAccounts_BalanceAmount_sum_new': sum(try_parse_int(x.get('TotalBalanceAmount', 0)) for x in CreditFacilities),
            'OpenAccounts_InstallmentAmount_sum_new': sum(try_parse_int(x.get('TotalMonthlyInstallmentAmt', 0)) for x in CreditFacilities),
            'OpenAccounts_MaxDaysDue_sum_new': sum(try_parse_int(x.get('MAX_NUM_DAYS_DUE', 0)) for x in OpenAccount) if OpenAccount else 0,
        }
        
        return pd.Series(result)

    # Define columns
    columns = [
        'OpenAccounts_count_new',
        'OpenAccounts_ApprovalAmount_sum_new',
        'OpenAccounts_BalanceAmount_sum_new',
        'OpenAccounts_InstallmentAmount_sum_new',
        'OpenAccounts_MaxDaysDue_sum_new'
    ]

    # Initialize columns with NaN values
    for col in columns:
        df[col] = np.nan

    # Apply extraction logic
    mask = (df['is_iscore'] == 1) & (df['is_iscore_new_format'] == 1)
    result = df.loc[mask, 'iscore_report'].apply(extract_open_accounts_new)
    result_df = pd.DataFrame(result, index=df.loc[mask].index)
    
    # Update the DataFrame with results
    df.loc[mask, columns] = result_df.fillna(0).astype('int64')

    # Fill NaN values for all rows in the specified columns
    df[columns] = df[columns].fillna(0).astype('int64')

    return df

def process_closed_accounts_new_format(df):
    def extract_closed_accounts_new(report):
        # Try to load JSON data, fallback to raw report on failure
        try:  
            report = json.loads(report)
        except json.JSONDecodeError:
            return pd.Series({
                'ClosedAccounts_count_new': 0,
                'ClosedAccounts_ApprovalAmount_sum_new': 0,
                'ClosedAccounts_InstallmentAmount_sum_new': 0,
                'ClosedAccounts_MaxDaysDue_sum_new': 0,
            }, dtype='float64')

        # Initialize variables
        ClosedAccountDetails = []

        # Extract ClosedAccountDetails from the report
        if isinstance(report, list):
            for module in report:
                if isinstance(module, dict) and module.get('ModuleId') == 'iScoreClosedFacilitiesConsumer':
                    content = module.get('Content', {})
                    data = content.get('DATA', [{}])
                    ClosedAccountDetails = data[0].get('ClosedAccountDetails', [])

        # Define helper function for parsing integers
        def try_parse_int(s):
            try:
                return int(s)
            except ValueError:
                return 0

        # Calculate results
        if ClosedAccountDetails:
            result = {
                'ClosedAccounts_count_new': len(ClosedAccountDetails),
                'ClosedAccounts_ApprovalAmount_sum_new': sum(try_parse_int(x.get('SANCTION_AMT', 0)) for x in ClosedAccountDetails),
                'ClosedAccounts_InstallmentAmount_sum_new': sum(try_parse_int(x.get('AMT_OF_INSTALMENT', 0)) for x in ClosedAccountDetails if x.get('AMT_OF_INSTALMENT')),
                'ClosedAccounts_MaxDaysDue_sum_new': sum(try_parse_int(x.get('MAX_NUM_DAYS_DUE', 0)) for x in ClosedAccountDetails)
            }
        else:
            result = {
                'ClosedAccounts_count_new': 0,
                'ClosedAccounts_ApprovalAmount_sum_new': 0,
                'ClosedAccounts_InstallmentAmount_sum_new': 0,
                'ClosedAccounts_MaxDaysDue_sum_new': 0
            }
        
        return pd.Series(result)

    # Define columns
    columns = [
        'ClosedAccounts_count_new',
        'ClosedAccounts_ApprovalAmount_sum_new',
        'ClosedAccounts_InstallmentAmount_sum_new',
        'ClosedAccounts_MaxDaysDue_sum_new'
    ]

    # Initialize columns with NaN values
    for col in columns:
        df[col] = np.nan

    # Apply extraction logic
    mask = (df['is_iscore'] == 1) & (df['is_iscore_new_format'] == 1)
    result = df.loc[mask, 'iscore_report'].apply(extract_closed_accounts_new)
    result_df = pd.DataFrame(result, index=df.loc[mask].index)

    # Update the DataFrame with results
    df.loc[mask, columns] = result_df.fillna(0).astype('int64')

    # Fill NaN values for all rows in the specified columns
    df[columns] = df[columns].fillna(0).astype('int64')

    return df


def combine_old_and_new_formats(df):
    # Create a copy of the DataFrame to avoid modifying the original
    df = df.copy()
    
    # Define the column mappings for open and closed accounts
    open_accounts_old = {
        'OpenAccounts_count': 'OpenAccounts_count_old',
        'OpenAccounts_ApprovalAmount_sum': 'OpenAccounts_ApprovalAmount_sum_old',
        'OpenAccounts_BalanceAmount_sum': 'OpenAccounts_BalanceAmount_sum_old',
        'OpenAccounts_InstallmentAmount_sum': 'OpenAccounts_InstallmentAmount_sum_old',
        'OpenAccounts_MaxDaysDue_sum': 'OpenAccounts_MaxDaysDue_sum_old'
    }
    
    open_accounts_new = {
        'OpenAccounts_count': 'OpenAccounts_count_new',
        'OpenAccounts_ApprovalAmount_sum': 'OpenAccounts_ApprovalAmount_sum_new',
        'OpenAccounts_BalanceAmount_sum': 'OpenAccounts_BalanceAmount_sum_new',
        'OpenAccounts_InstallmentAmount_sum': 'OpenAccounts_InstallmentAmount_sum_new',
        'OpenAccounts_MaxDaysDue_sum': 'OpenAccounts_MaxDaysDue_sum_new'
    }
    
    closed_accounts_old = {
        'ClosedAccounts_count': 'ClosedAccounts_count_old',
        'ClosedAccounts_ApprovalAmount_sum': 'ClosedAccounts_ApprovalAmount_sum_old',
        'ClosedAccounts_InstallmentAmount_sum': 'ClosedAccounts_InstallmentAmount_sum_old',
        'ClosedAccounts_MaxDaysDue_sum': 'ClosedAccounts_MaxDaysDue_sum_old'
    }
    
    closed_accounts_new = {
        'ClosedAccounts_count': 'ClosedAccounts_count_new',
        'ClosedAccounts_ApprovalAmount_sum': 'ClosedAccounts_ApprovalAmount_sum_new',
        'ClosedAccounts_InstallmentAmount_sum': 'ClosedAccounts_InstallmentAmount_sum_new',
        'ClosedAccounts_MaxDaysDue_sum': 'ClosedAccounts_MaxDaysDue_sum_new'
    }

    # Fill NaN values with 0
    df = df.fillna(0)

    # Combine old and new format columns
    for combined_col, old_col in open_accounts_old.items():
        new_col = open_accounts_new[combined_col]
        df[combined_col] = df[old_col] + df.get(new_col, 0)

    for combined_col, old_col in closed_accounts_old.items():
        new_col = closed_accounts_new[combined_col]
        df[combined_col] = df[old_col] + df.get(new_col, 0)
    
    # Drop old and new format columns
    old_new_columns = [col for col in df.columns if 'old' in col or 'new' in col]
    df = df.drop(columns=old_new_columns)
    
    return df

def process_iscore(df, full_df=False):
    df = df.copy()
    
    # Preserve original DataFrame without specific columns
    df_org = df.drop(columns=['is_iscore', 'iscore_score', 'iscore_report'])
    df = df[df['is_iscore'] == 1]

    df = select_relevant_iscore_columns(df)
    df = update_no_hit_consumers(df)
    df = update_iscore_report_format(df)
    df = process_open_accounts_old_format(df)
    df = process_closed_accounts_old_format(df)
    df = process_open_accounts_new_format(df)
    df = process_closed_accounts_new_format(df)
    df = combine_old_and_new_formats(df)
    
    # Fill missing values and drop unnecessary columns
    df = df.fillna(0)
    df = df.drop(columns=['iscore_report'], errors='ignore')  # Use errors='ignore' to handle cases where the column may not exist
    
    # Merge with the original DataFrame if full_df is True
    if full_df:
        df_final = df_org.merge(df, on='client_id', how='left')
        df_final['is_iscore'] = df_final['is_iscore'].fillna(0)
    else:
        df_final = df
    
    return df_final

rca_iscore_df = process_iscore(risk_cust_attrib_df, full_df=False)
rca_iscore_df

Unnamed: 0,client_id,is_iscore,iscore_score,OpenAccounts_count,OpenAccounts_ApprovalAmount_sum,OpenAccounts_BalanceAmount_sum,OpenAccounts_InstallmentAmount_sum,OpenAccounts_MaxDaysDue_sum,ClosedAccounts_count,ClosedAccounts_ApprovalAmount_sum,ClosedAccounts_InstallmentAmount_sum,ClosedAccounts_MaxDaysDue_sum
4,635295,1,772.0,1,10000,6333,0,0,0,0,0,0
13,3681181,1,766.0,1,30500,15770,710,0,0,0,0,0
20,2453628,1,774.0,1,232600,134023,3852,0,0,0,0,0
30,2550322,1,714.0,2,7363,8360,0,0,1,5121,0,0
35,2820151,1,765.0,0,0,0,0,0,1,30000,0,0
...,...,...,...,...,...,...,...,...,...,...,...,...
797588,1608719,1,758.0,1,30820,24390,643,0,0,0,0,0
797594,1757281,1,724.0,1,54000,32924,878,1,0,0,0,0
797596,2589209,1,742.0,1,190000,160268,5076,0,0,0,0,0
797597,3593852,1,701.0,3,8074,3954,588,0,2,4638,0,0


## Process Cash Trx

In [8]:
# def adjust_date_range(df, column):
#     """Adjusts the date range of a given column in a DataFrame."""
#     df = df.withColumn(column, 
#                        F.when(F.year(df[column]) < 1970, F.lit("1970-01-01").cast(DateType()))
#                        .otherwise(df[column]))
#     df = df.withColumn(column, 
#                        F.when(F.year(df[column]) > 2038, F.lit("2038-12-31").cast(DateType()))
#                        .otherwise(df[column]))
#     df = df.withColumn(column, df[column].cast(DateType()))
#     return df

# def filter_transactions(df, df_cust):
#     """Filters out past transactions based on customer data."""
#     df_cust_1 = df_cust.selectExpr("client_id as client_id_1", "phone_number_1 as phone", "contract_date as contract_date_1").dropna(subset=["phone"])
#     df_cust_2 = df_cust.selectExpr("client_id as client_id_2", "phone_number_2 as phone", "contract_date as contract_date_2").dropna(subset=["phone"])

#     df = df.join(broadcast(df_cust_1), df.customer_phone == df_cust_1.phone, how='left')
#     df = df.join(broadcast(df_cust_2), df.customer_phone == df_cust_2.phone, how='left')

#     df = df.withColumn("client_id", F.coalesce(df.client_id_1, df.client_id_2))
#     df = df.drop("client_id_1", "client_id_2")

#     df = df.withColumn("contract_date", F.coalesce(df.contract_date_1, df.contract_date_2))
#     df = df.drop("contract_date_1", "contract_date_2", "phone")

#     df = df.withColumn("trx_date", df["trx_date"].cast(DateType()))
#     df = df.withColumn("contract_date", df["contract_date"].cast(DateType()))

#     df = df.filter(df.trx_date < df.contract_date)
#     df = df.withColumn("days_from_last_trx", F.datediff(df["contract_date"], df["trx_date"]))
#     df = df.filter(df.days_from_last_trx > 1)
#     return df

# def aggregate_transactions(df):
#     """Aggregates transaction data by client id."""
#     df = df.groupBy("client_id").agg(
#         F.datediff(F.min("contract_date"), F.min("trx_date")).alias("days_from_first_trx"),
#         F.datediff(F.max("contract_date"), F.max("trx_date")).alias("days_from_last_trx"),
#         F.sum(F.when(df.online_vs_branch == 1, 1).otherwise(0)).alias("n_trx_online"),
#         F.sum(F.when(df.online_vs_branch == 2, 1).otherwise(0)).alias("n_trx_branch"),
#         F.sum(F.when(df.net_sales > 0, df.net_sales).otherwise(0)).alias("net_sales_pos"),
#         F.sum(F.when(df.net_sales < 0, df.net_sales).otherwise(0)).alias("net_sales_neg"),
#         F.sum(F.when(df.net_qty > 0, df.net_qty).otherwise(0)).alias("net_qty_pos"),
#         F.sum(F.when(df.net_qty < 0, df.net_qty).otherwise(0)).alias("net_qty_neg")
#     )
#     return df

# def process_transactions_data(df, df_cust):
#     """Processes transactions data"""
#     df = adjust_date_range(df, "trx_date")
#     df = filter_transactions(df, df_cust)
#     df = aggregate_transactions(df)

#     # Get numeric columns
#     numeric_columns = [col_name for col_name, dtype in df.dtypes if dtype in ['int', 'double']]
#     # Apply abs and round functions to numeric columns
#     for column in numeric_columns:
#         df = df.withColumn(column, F.round(F.abs(F.col(column)), 1))
#     df = df.toPandas()

#     return df

In [9]:
# rca_trx_df = process_transactions_data(risk_cust_trx_df_sp, risk_cust_attrib_df_sp)
# rca_trx_df

## Process SSN

In [10]:
def adjust_ssn(df):
    df['ssn'] = df['ssn'].str[-14:]
    return df

def calculate_age(df, data_date):
    df['ssn_century_birth'] = df['ssn'].str[0]
    df['ssn_date_birth'] = df['ssn'].str[1:7]
    df['ssn_date_birth'] = np.where(
        df['ssn_century_birth'] == '3',
        '20' + df['ssn_date_birth'],
        '19' + df['ssn_date_birth']
    )
    data_date = datetime.strptime(data_date, '%Y%m%d')
    df['ssn_date_birth'] = pd.to_datetime(df['ssn_date_birth'], errors='coerce')
    df['contract_date'] = pd.to_datetime(df['contract_date'])
    df['age_at_contract'] = df['contract_date'].dt.year - df['ssn_date_birth'].dt.year
    df['age_at_data_date'] = data_date.year - df['ssn_date_birth'].dt.year
    return df

def map_governorate(df, df2):
    df['ssn_governorate_code'] = df['ssn'].str[7:9]
    df['ssn_governorate_code'] = df['ssn_governorate_code'].astype(int)
    df = df.merge(df2, how='left', on='ssn_governorate_code')
    return df

def determine_gender(df):
    df['ssn_gender_code'] = df['ssn'].str[12]
    df['ssn_gender'] = np.where(df['ssn_gender_code'].astype(int) % 2 == 0, 'female', 'male')
    df['ssn_gender'] = df['ssn_gender'].map({'male': 1, 'female': 0})
    df = df.rename(columns={'ssn_gender': 'ssn_is_male'})
    return df

def process_ssn_data(df, ssn_gov_map_df, data_date):
    df = df[['client_id', 'ssn', 'contract_date']].copy()
    df = adjust_ssn(df)
    df = calculate_age(df, data_date)
    df = map_governorate(df, ssn_gov_map_df)
    df = determine_gender(df)
    df = df[['client_id', 'ssn_is_male', 'ssn_governorate', 'age_at_contract']]
    return df

rca_ssn_df = process_ssn_data(risk_cust_attrib_df, ssn_gov_map_df, data_date)
rca_ssn_df

Unnamed: 0,client_id,ssn_is_male,ssn_governorate,age_at_contract
0,3235569,1,Giza,54.0
1,1405151,1,Kafr El-Sheikh,35.0
2,3753420,0,Alexandria,19.0
3,2083981,1,Beni Suef,57.0
4,635295,1,Cairo,60.0
...,...,...,...,...
797601,2462322,1,Gharbia,39.0
797602,2885905,1,Cairo,58.0
797603,1689512,1,Alexandria,43.0
797604,1026329,1,Sohag,35.0


## Process Income

In [11]:
def adj_income_data(df, ssn_df):
    """
    Adjusts income data by renaming columns and merging with another DataFrame.
    # """
    df['net_income'] = df['net_income_last'] # line only for training not prod
    df['net_burden'] = df['net_burden_last'] # line only for training not prod
    df = df.merge(ssn_df[['client_id', 'age_at_contract']], on='client_id', how='left')
    
    return df

def apply_inflation(df, data_date, inflation_rate=0.20):
    """
    Applies an inflation factor to all value columns in a DataFrame based on the year of each record.
    """
    df['last_income_up_date'] = pd.to_datetime(df['last_income_up_date'])
    df['Year'] = df['last_income_up_date'].dt.year
    
    data_date = datetime.strptime(data_date, '%Y%m%d')
    data_date_year = data_date.year
    df['YearsFromNow'] = data_date_year - df['Year']

    # account for retirement age
    df['YearsFromNow'] = np.where(df['age_at_contract'] + df['YearsFromNow'] > 60, 
                                  60 - df['age_at_contract'], 
                                  df['YearsFromNow'])
    # If 'YearsFromNow' is negative, set it to 0
    df['YearsFromNow'] = np.where(df['YearsFromNow'] < 0, 0, df['YearsFromNow'])

    # calculate inflation based on compounded interest
    df['InflationFactor'] = (1 + inflation_rate) ** df['YearsFromNow']

    # Only inflate 'net_income' and 'net_burden' columns
    value_cols = ['net_income', 'net_burden']

    for value_col in value_cols:
        inflated_value = (df[value_col] * df['InflationFactor']).round(0)
        # Drop rows with non-finite values
        inflated_value = inflated_value.replace([np.inf, -np.inf], np.nan).dropna()
        df[value_col + '_inflated'] = inflated_value.astype(int)

    df = df.drop(columns=['Year', 'YearsFromNow', 'InflationFactor'])

    return df

def add_ss_data(df):
#     """
#     Adds a new column indicating whether 'net_income_inflated' is within the range of 'ss_min_income' and 'ss_max_income'.
#     """
#     df['is_net_income_inflated_in_range'] = np.where((df['net_income_inflated'] >= df['ss_min_income']) & 
#                                                      (df['net_income_inflated'] <= df['ss_max_income']), 
#                                                      'within', 
#                                                      np.where(df['net_income_inflated'] < df['ss_min_income'], 
#                                                               'lower', 
#                                                               'higher'))


    df = df[df['ss_min_income'].notna() & (df['ss_min_income'] != 0)]
    df.loc[df['net_income_inflated'] < df['ss_min_income'], 'net_income_inflated'] = df['ss_min_income']
    df.loc[(df['net_income_inflated'] > df['ss_max_income']) & (df['net_income_inflated'] > 100000), 'net_income_inflated'] = 100000
    
    return df

def process_income_data(df, ssn_df, data_date, inflation_rate=0.20):
    """
    Processes income data by adjusting income data, applying inflation, and adding social security data.
    """
    df = df[['client_id', 'net_income_first','net_income', 'net_burden',
            'net_income_last',  'net_burden_last', 'last_income_up_date',
            'ss_min_income', 'ss_max_income',
            'job_map_min_salary', 'job_map_max_salary']].copy()

    df = adj_income_data(df, ssn_df)
    df = apply_inflation(df, data_date, inflation_rate)
    df = add_ss_data(df)
    df = df[['client_id', 'net_income_first','net_income', 'net_burden',
             #'last_income_up_date','age_at_contract',
             'net_income_inflated', 'net_burden_inflated',
             #'is_net_income_inflated_in_range','ss_min_income', 'ss_max_income',
             'job_map_min_salary', 'job_map_max_salary']]
    
    df = df[df['job_map_max_salary'].notna()]
    return df

rca_income_df = process_income_data(risk_cust_attrib_df, rca_ssn_df, data_date=data_date, inflation_rate=0.20)
rca_income_df

Unnamed: 0,client_id,net_income_first,net_income,net_burden,net_income_inflated,net_burden_inflated,job_map_min_salary,job_map_max_salary
0,3235569,1000.0,8750.0,0.0,8750,0,9167.0,24167.0
1,1405151,3200.0,3200.0,0.0,7000,0,9167.0,24167.0
2,3753420,1250.0,1250.0,0.0,2500,0,2500.0,2500.0
3,2083981,2700.0,7875.0,0.0,7875,0,9167.0,24167.0
4,635295,10239.0,13125.0,0.0,13125,0,3750.0,7250.0
...,...,...,...,...,...,...,...,...
797601,2462322,2750.0,8750.0,0.0,8750,0,8500.0,22500.0
797602,2885905,3346.0,3500.0,0.0,7000,0,8500.0,22500.0
797603,1689512,8200.0,13000.0,0.0,13000,0,9167.0,24167.0
797604,1026329,2700.0,7500.0,0.0,7500,0,9167.0,24167.0


## Process Train-only Features

In [12]:
def return_dbr(df, df_iscore):
    df = df.merge(df_iscore, on='client_id', how='left')
    df['OpenAccounts_InstallmentAmount_sum'] = df['OpenAccounts_InstallmentAmount_sum'].fillna(0)
    df['first_ord_monthly_collect'] = (df['first_ord_amount'] + df['first_ord_benefit']) / df['first_ord_tenor']
    df['first_ord_dbr'] =(
        (df['first_ord_monthly_collect'] + df['net_burden_first'] + df['OpenAccounts_InstallmentAmount_sum'])\
        / df['net_income_first']
        )
    df['first_ord_dbr'] = round(df['first_ord_dbr'], 2)
    return df
    
def process_train_only_data(df, df_iscore):
    df = df[['client_id', 'first_ord_amount', 'first_ord_benefit', 'first_ord_tenor',
             'net_income_first', 'net_burden_first', 'fo_par90_flag']].copy()
    df_iscore = df_iscore[['client_id', 'OpenAccounts_InstallmentAmount_sum']].copy()
    df = return_dbr(df, df_iscore)
    df = df[['client_id','first_ord_dbr', 'first_ord_tenor', 'fo_par90_flag']]
    return df

In [13]:
rca_train_df = process_train_only_data(risk_cust_attrib_df, rca_iscore_df)
rca_train_df

Unnamed: 0,client_id,first_ord_dbr,first_ord_tenor,fo_par90_flag
0,3235569,0.49,24,0
1,1405151,0.09,12,0
2,3753420,0.50,36,0
3,2083981,0.10,36,0
4,635295,0.15,6,0
...,...,...,...,...
797601,2462322,0.27,12,0
797602,2885905,0.31,48,0
797603,1689512,0.07,12,0
797604,1026329,0.36,12,0


## Process All Others 

In [14]:
def select_features(df):
    # silly condition that should be removed
    if 'sport_club_level' in df.columns:
        df.rename(columns={'sport_club_level': 'club_level'}, inplace=True)
    selected_features = [
        'client_id', 'ssn', 'phone_number_1', 'phone_number_2', 'flag_is_mc_customer',
        'flag_is_prv_cash_trx', 'flag_is_rescore', 'contract_date',
        'job_name_map', 'job_type', 'insurance_type', 'marital_status', 'children_count',
        'address_governorate', 'address_city', 'address_area', 'house_type', 'car_type_id',
        'car_model_year', 'club_level', 'mobile_os_type'
    ]
    df = df.loc[:, selected_features].copy()
    return df

def tranform_features(df, data_date):
    data_date = datetime.strptime(data_date, '%Y%m%d')
    current_year = data_date.year
    df['contract_date'] = df['contract_date'].astype(str)
    df['contract_date'] = df['contract_date'].str.slice(0, 10)
    df['contract_date'] = pd.to_datetime(df['contract_date'], format='%Y-%m-%d') 
    contract_year = df['contract_date'].dt.year
    df['days_since_contract'] = (data_date - df['contract_date']).dt.days
    df = df.drop(columns=['contract_date']).copy()
    
    df['car_type_id'] = pd.to_numeric(df['car_type_id'], errors='coerce')
    df['have_car'] = np.where(df['car_type_id'].fillna(0) > 0, 1, 0)
    df = df.drop(columns=['car_type_id']).copy()

    df['car_model_year'] = pd.to_datetime(df['car_model_year'], format='%Y', errors='coerce').dt.year
    df['car_age'] = contract_year - df['car_model_year']

    def categorize_year(age):
        if pd.isnull(age):
            return np.nan
        elif age > 23:
            return 'Antique'
        elif 7 < age <= 23:
            return 'Mid'
        elif age <= 7:
            return 'Modern'

    df['car_model_category'] = df['car_age'].apply(categorize_year)
    df = df.drop(columns=['car_model_year', 'car_age']).copy()
    
    df['club_level'] = pd.to_numeric(df['club_level'], errors='coerce')
    df['have_club_id'] = np.where(df['club_level'].fillna(0) > 0, 1, 0)
    df = df.drop(columns=['club_level']).copy()

    return df

def process_other_attr_data(df, data_date):
    df = df.copy()
    df = select_features(df)
    df = tranform_features(df, data_date)
    return df

In [15]:
rca_others_df = process_other_attr_data(risk_cust_attrib_df, data_date=data_date)
rca_others_df

Unnamed: 0,client_id,ssn,phone_number_1,phone_number_2,flag_is_mc_customer,flag_is_prv_cash_trx,flag_is_rescore,job_name_map,job_type,insurance_type,...,children_count,address_governorate,address_city,address_area,house_type,mobile_os_type,days_since_contract,have_car,car_model_category,have_club_id
0,3235569,26906052101716,01205040486,01104646590,1,0,0,"Other, employed",Gov,default_value,...,3,Cairo,Imbaba,Ard El-Haddad,Old Rent,default_value,559,0,,0
1,1405151,28409091502314,01016635020,01092085940,1,0,0,"Other, employed",Gov,default_value,...,1,Kafr El-Sheikh,Desouk,Desouq,Owned,default_value,2011,0,,0
2,3753420,30503150200764,01220955775,01224328234,1,0,0,Student,Student,default_value,...,0,,,,Owned,default_value,7,0,,0
3,2083981,26308292201055,01271054735,01271054735,1,0,0,"Other, employed",Gov,default_value,...,4,,,,Owned,default_value,1644,0,,0
4,635295,26209090103231,01004014651,01002550651,1,0,0,Retired,Pension,default_value,...,1,Alexandria,Al-Ajami,Beytash,Old Rent,default_value,793,0,,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
797601,2462322,28205011601894,01223946813,01223946813,1,0,0,Teaching,Priv,default_value,...,3,,,,Owned,default_value,1225,0,,0
797602,2885905,26412170102872,01276423223,,1,0,0,Teaching,Priv,default_value,...,2,Cairo,Faisal,altaaawon faisal,Old Rent,default_value,874,0,,0
797603,1689512,27611250201291,01221132053,,1,0,0,"Other, employed",Gov,default_value,...,2,,,,Owned,default_value,1979,0,,0
797604,1026329,28810012611878,01286957977,01118661557,1,0,0,"Other, employed",Priv,default_value,...,0,Red Sea,Hurghada,El-Dahar,New Rent,default_value,226,0,,0


# 4- S1 Governance

In [16]:
def gove_s1_rca_others_df(rca_others_df):
    rca_others_df = rca_others_df.astype({'days_since_contract': 'int32'})
    rca_others_df = rca_others_df.astype({'have_car': 'int16'})
    rca_others_df = rca_others_df.astype({'have_club_id': 'float32'})
    return rca_others_df

def gove_s1_rca_ssn_df(rca_ssn_df):
    rca_ssn_df['age_at_contract'] = rca_ssn_df['age_at_contract'].fillna(30)
    rca_ssn_df['age_at_contract'] = np.where((rca_ssn_df['age_at_contract'] < 15) | (rca_ssn_df['age_at_contract'] > 100), 30, rca_ssn_df['age_at_contract'])
    rca_ssn_df['age_at_contract'] = rca_ssn_df['age_at_contract'].astype('int16')
    return rca_ssn_df

def gove_s1_rca_income_df(rca_income_df):
    rca_income_df = rca_income_df.astype({'net_income_inflated': 'float32'})
    rca_income_df = rca_income_df.astype({'net_burden_inflated': 'float32'})
    rca_income_df = rca_income_df.astype({'job_map_min_salary': 'float32'})
    rca_income_df = rca_income_df.astype({'job_map_max_salary': 'float32'})
    return rca_income_df

def gove_s1_rca_iscore_df(rca_iscore_df):
    rca_iscore_df = rca_iscore_df.astype({'iscore_score': 'int16',
                                        #   'is_iscore_new_format': 'int16',
                                          'OpenAccounts_count': 'int16',
                                          'ClosedAccounts_count': 'int16', 'OpenAccounts_MaxDaysDue_sum': 'int16', 'ClosedAccounts_MaxDaysDue_sum': 'int16'})
    rca_iscore_df = rca_iscore_df.astype({'ClosedAccounts_InstallmentAmount_sum': 'float32', 'ClosedAccounts_ApprovalAmount_sum': 'float32',
                                          'OpenAccounts_InstallmentAmount_sum': 'float32', 'OpenAccounts_BalanceAmount_sum': 'float32',
                                          'OpenAccounts_ApprovalAmount_sum': 'float32'})
    return rca_iscore_df


# def gove_s1_rca_trx_df(rca_trx_df):
#     rca_trx_df = rca_trx_df.astype({'net_sales_pos': 'float32', 'net_sales_neg': 'float32'})
#     rca_trx_df = rca_trx_df.astype({'days_from_first_trx': 'int16', 'days_from_last_trx': 'int16', 'n_trx_online': 'int16',
#                                     'n_trx_branch': 'int16', 'net_qty_pos': 'int16', 'net_qty_neg': 'int16'})
#     return rca_trx_df

def gove_s1_rca_train_df(rca_train_df):
    rca_train_df = rca_train_df.astype({'first_ord_dbr': 'float32'})
    return rca_train_df


rca_others_df_s1 = gove_s1_rca_others_df(rca_others_df.copy())
rca_ssn_df_s1 = gove_s1_rca_ssn_df(rca_ssn_df.copy())
rca_income_df_s1 = gove_s1_rca_income_df(rca_income_df.copy())
rca_iscore_df_s1 = gove_s1_rca_iscore_df(rca_iscore_df.copy())
# rca_trx_df_s1 = gove_s1_rca_trx_df(rca_trx_df.copy())
rca_train_df_s1 = gove_s1_rca_train_df(rca_train_df.copy())

# 5- Merge

In [17]:
def merge_dataframes(dfs, merge_key='client_id'):
    if not all(merge_key in df for df in dfs):
        raise ValueError(f"All dataframes must contain the merge key: {merge_key}")

    merged_df = dfs[0].copy()
    merged_df['merge_00'] = 'both'

    for i, df in enumerate(dfs[1:], 1):
        if df.columns.duplicated().any():
            raise ValueError(f"Dataframe {i} contains duplicate columns")
        merged_df = merged_df.merge(df.copy(), on=merge_key, how='left', indicator=f'merge_{i:02d}')

    return merged_df

dfs_to_merge = [rca_others_df_s1,
                rca_ssn_df_s1,
                rca_income_df_s1,
                rca_iscore_df_s1,
                #rca_trx_df_s1,
                rca_train_df_s1]
rca_all_df_s1 = merge_dataframes(dfs=dfs_to_merge)
rca_all_df_s1

Unnamed: 0,client_id,ssn,phone_number_1,phone_number_2,flag_is_mc_customer,flag_is_prv_cash_trx,flag_is_rescore,job_name_map,job_type,insurance_type,...,OpenAccounts_MaxDaysDue_sum,ClosedAccounts_count,ClosedAccounts_ApprovalAmount_sum,ClosedAccounts_InstallmentAmount_sum,ClosedAccounts_MaxDaysDue_sum,merge_03,first_ord_dbr,first_ord_tenor,fo_par90_flag,merge_04
0,3235569,26906052101716,01205040486,01104646590,1,0,0,"Other, employed",Gov,default_value,...,,,,,,left_only,0.49,24,0,both
1,1405151,28409091502314,01016635020,01092085940,1,0,0,"Other, employed",Gov,default_value,...,,,,,,left_only,0.09,12,0,both
2,3753420,30503150200764,01220955775,01224328234,1,0,0,Student,Student,default_value,...,,,,,,left_only,0.50,36,0,both
3,2083981,26308292201055,01271054735,01271054735,1,0,0,"Other, employed",Gov,default_value,...,,,,,,left_only,0.10,36,0,both
4,635295,26209090103231,01004014651,01002550651,1,0,0,Retired,Pension,default_value,...,0.0,0.0,0.0,0.0,0.0,both,0.15,6,0,both
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
797601,2462322,28205011601894,01223946813,01223946813,1,0,0,Teaching,Priv,default_value,...,,,,,,left_only,0.27,12,0,both
797602,2885905,26412170102872,01276423223,,1,0,0,Teaching,Priv,default_value,...,0.0,0.0,0.0,0.0,0.0,both,0.31,48,0,both
797603,1689512,27611250201291,01221132053,,1,0,0,"Other, employed",Gov,default_value,...,,,,,,left_only,0.07,12,0,both
797604,1026329,28810012611878,01286957977,01118661557,1,0,0,"Other, employed",Priv,default_value,...,,,,,,left_only,0.36,12,0,both


# 6- EOD Transformations

In [18]:
rca_all_df_s2 = rca_all_df_s1.copy()
rca_all_df_s2 = rca_all_df_s2.dropna(subset=['job_name_map'])
rca_all_df_s2 = rca_all_df_s2.dropna(subset=['net_income_inflated'])
rca_all_df_s2 = rca_all_df_s2.dropna(subset=['marital_status'])
rca_all_df_s2 = rca_all_df_s2.astype({'have_club_id': 'int16'})
rca_all_df_s2 = rca_all_df_s2.astype({'ssn_is_male': 'int16'})
rca_all_df_s2 = rca_all_df_s2.drop(columns=['merge_00'])
rca_all_df_s2 = rca_all_df_s2.drop(columns=['merge_01'])
rca_all_df_s2 = rca_all_df_s2.drop(columns=['merge_02'])
rca_all_df_s2 = rca_all_df_s2.fillna({'is_iscore': 0})
rca_all_df_s2 = rca_all_df_s2.drop(columns=['merge_03'])
#rca_all_df_s2['flag_is_prv_cash_trx'] = np.where(rca_all_df_s2['merge_04'] == 'both', 1, 0)
rca_all_df_s2 = rca_all_df_s2.drop(columns=['merge_04'])
#rca_all_df_s2 = rca_all_df_s2.drop(columns=['merge_05'])

numeric_cols = rca_all_df_s2.select_dtypes(include=[np.number]).columns
rca_all_df_s2[numeric_cols] = rca_all_df_s2[numeric_cols].round(2).copy()

In [19]:
def normalize_columns(df, ref_col, cols_to_normalize, keep_original=True):
    for col in cols_to_normalize:
        normalized_col = df[col] / df[ref_col]
        normalized_col = normalized_col.round(2)
        df.insert(df.columns.get_loc(col) + 1, f'{col}_pofi', normalized_col)

    if not keep_original:
        df = df.drop(columns=cols_to_normalize)

    return df

def convert_dtypes(df):
    for col in df.columns:
        if df[col].dtype == np.int16 or df[col].dtype == np.int32:
            df[col] = df[col].astype(np.int64)
        elif df[col].dtype == np.float64:
            df[col] = df[col].astype(np.float32)
    return df


rca_all_df_s3 = normalize_columns(rca_all_df_s2.copy(), 'net_income_first',\
    ['OpenAccounts_ApprovalAmount_sum', 'OpenAccounts_BalanceAmount_sum', 'OpenAccounts_InstallmentAmount_sum',
        'ClosedAccounts_ApprovalAmount_sum', 'ClosedAccounts_InstallmentAmount_sum',
        #'net_sales_pos', 'net_sales_neg'
        ]
       )

rca_all_df_s3 = convert_dtypes(rca_all_df_s3.copy())

rca_all_df_s3.head()

Unnamed: 0,client_id,ssn,phone_number_1,phone_number_2,flag_is_mc_customer,flag_is_prv_cash_trx,flag_is_rescore,job_name_map,job_type,insurance_type,...,OpenAccounts_MaxDaysDue_sum,ClosedAccounts_count,ClosedAccounts_ApprovalAmount_sum,ClosedAccounts_ApprovalAmount_sum_pofi,ClosedAccounts_InstallmentAmount_sum,ClosedAccounts_InstallmentAmount_sum_pofi,ClosedAccounts_MaxDaysDue_sum,first_ord_dbr,first_ord_tenor,fo_par90_flag
0,3235569,26906052101716,1205040486,1104646590,1,0,0,"Other, employed",Gov,default_value,...,,,,,,,,0.49,24,0
1,1405151,28409091502314,1016635020,1092085940,1,0,0,"Other, employed",Gov,default_value,...,,,,,,,,0.09,12,0
2,3753420,30503150200764,1220955775,1224328234,1,0,0,Student,Student,default_value,...,,,,,,,,0.5,36,0
3,2083981,26308292201055,1271054735,1271054735,1,0,0,"Other, employed",Gov,default_value,...,,,,,,,,0.1,36,0
4,635295,26209090103231,1004014651,1002550651,1,0,0,Retired,Pension,default_value,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.15,6,0


In [20]:
# convert all str values to slug function
def convert_columns_to_slug(df, columns):
    def to_slug(value):
        if isinstance(value, str):
            # Remove non-ASCII characters
            value = re.sub(r'[^\x00-\x7F]+', '', value)
            # Convert to lowercase
            value = value.lower()
            # Replace any non-alphanumeric characters with dashes
            value = re.sub(r'[^a-z0-9]+', '-', value)
            # Remove leading and trailing dashes
            value = value.strip('-')
            return value
        return value
    
    for column in columns:
        if (column in df.columns) and (df[column].dtype == 'object'):
            df[column] = df[column].apply(to_slug)
    return df


# apply 
columns_to_convert = ['job_name_map',
                      'mobile_os_type',
                      'house_type',
                      'marital_status',
                      'address_area',
                      'address_city',
                      'address_governorate',
                      'ssn_governorate']
rca_all_df_s3 = convert_columns_to_slug(rca_all_df_s3.copy(), columns_to_convert)

In [21]:
rca_all_df_s3 = stnd_missing_values_type(rca_all_df_s3)

# 7- Divide & Stream

In [22]:
df_to_all_models = rca_all_df_s3.copy()
df_to_all_models.to_parquet(data_path/f'features_store/{data_date}_L1_processed_features.parquet')

In [23]:
with pd.option_context('display.max_rows', None):
    print(df_to_all_models.dtypes)

client_id                                      int64
ssn                                           object
phone_number_1                                object
phone_number_2                                object
flag_is_mc_customer                             int8
flag_is_prv_cash_trx                            int8
flag_is_rescore                                 int8
job_name_map                                  object
job_type                                      object
insurance_type                                object
marital_status                                object
children_count                                  int8
address_governorate                           object
address_city                                  object
address_area                                  object
house_type                                    object
mobile_os_type                                object
days_since_contract                            int64
have_car                                      

In [24]:
print(risk_cust_attrib_df.shape)
#print(risk_cust_trx_df.shape)
print(df_to_all_models.shape)

(797606, 43)
(793565, 50)


-----------------------------

# 8- Check Data Drift

In [25]:
# import datacompy
# import pandas as pd
# import numpy as np
# import nannyml as nml
# import os
# from IPython.display import display

# from evidently import ColumnMapping
# from evidently.report import Report
# from evidently.metrics.base_metric import generate_column_metrics
# from evidently.metric_preset import DataDriftPreset, TargetDriftPreset
# from evidently.metrics import *
# from evidently.test_suite import TestSuite
# from evidently.tests.base_test import generate_column_tests
# from evidently.test_preset import DataStabilityTestPreset, NoTargetPerformanceTestPreset
# from evidently.tests import *

In [26]:
# cwd_path = os.getcwd()
# data_path = cwd_path+'/data/'
# new_df = pd.read_parquet(
#     data_path+f"features_store/20240810_L1_processed_features.parquet")
# ref_df = pd.read_parquet(
#     data_path+f"features_store/20240623_L1_processed_features.parquet")

In [27]:
# common_cols = ref_df.columns.intersection(new_df.columns)
# calc = nml.UnivariateDriftCalculator(
#     column_names=common_cols,
#     chunk_number=2
# )
# calc.fit(ref_df)
# results = calc.calculate(new_df)
# display(results.filter(period='analysis').to_df())

In [28]:
# data_drift_report = Report(metrics=[
#     DataDriftPreset(), 
# ])
# data_drift_report.run(reference_data=ref_df[['net_income_inflated', 'fo_par90_flag']], current_data=new_df[['net_income_inflated', 'fo_par90_flag']])
# data_drift_report