In [None]:
import os
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

In [None]:
categorical_columns = [
    "RequestID",
    "RatingYear",
    "WorkFlowStepName",
    "CustomerRefID",
    "ScoreModel",
    "FlowModelExisting",
    "FlowModelNew62",
]

date_columns = [
    "RequestDate",
    "RatingDate"
]

numeric_columns = ["adjCompositeScore"]

In [None]:
def import_file(file_path: str, **kwargs) -> pd.DataFrame:
    try:
        file_extension = file_path.split(".")[-1].lower()

        if file_extension == "csv":
            df = pd.read_csv(file_path, **kwargs)
        elif file_extension == "xlsx":
            df = pd.read_excel(file_path, **kwargs)
        elif file_extension == "parquet":
            df = pd.read_parquet(file_path, **kwargs)
        else:
            raise ValueError(f"Unsupported file extension: {file_extension}")

        return df

    except FileNotFoundError:
        print(f"Error: The file {file_path} does not exist.")
        raise
    except pd.errors.ParserError as e:
        print(f"Error: Parsing error for file {file_path} - {str(e)}")
        raise
    except Exception as e:
        print(f"An unexpected error occured: {str(e)}")
        raise
    
def import_multiple_files(
    directory_path: str, extension: str, **kwargs
) -> pd.DataFrame:
    files = [f for f in os.listdir(directory_path) if f.endswith(f".{extension}")]
    dataframes = []
    row_counts = {}
    for file in files:
        file_path = os.path.join(directory_path, file)

        if extension == "csv":
            df = pd.read_csv(file_path, **kwargs)
            columns = [c for c in df.columns if c.lower()[:7] != "unnamed"]
            df = df[columns]
        elif extension == "xlsx":
            df = pd.read_excel(file_path, **kwargs)
        elif extension == "parquet":
            df = pd.read_parquet(file_path, **kwargs)
        else:
            raise ValueError(f"Unsupport file extension: {extension}")
        row_counts[file] = len(df)
        dataframes.append(df)

    concatenate_df = pd.concat(dataframes, ignore_index=True)
    row_counts["total"] = len(concatenate_df)

    return concatenate_df

def convert_to_date(
    df: pd.DataFrame, column_names: list[str], date_format: str = "%Y-%m-%d"
) -> pd.DataFrame:
    for column_name in column_names:
        if column_name not in df.columns:
            raise ValueError(
                f"The column '{column_name}' does not exist in the DataFrame."
            )

        try:
            df[column_name] = pd.to_datetime(
                df[column_name], format=date_format, errors="coerce"
            )
        except Exception as e:
            raise ValueError(f"Error converting column '{column_name}' to date: {e}")

    return df

def categorical_columns_values(df: pd.DataFrame, str_columns: list[str], max_examples: int=10) -> None:
    categorical_columns = str_columns
    stats = {}
    
    for col in categorical_columns:
        unique_values = df[col].unique()
        unique_count = len(unique_values)
        if unique_count > max_examples:
            example_values = unique_values[:max_examples].tolist()
        else:
            example_values = unique_values.tolist()
            
        stats[col] = {
            'unique_count': unique_count,
            'example_values': example_values
        }
    
    return  stats
        
def date_columns_stats(df: pd.DataFrame, date_columns=date_columns) -> dict:
    date_columns = date_columns
    stats = {}
    
    for col in date_columns:
        min_date = df[col].min()
        max_date = df[col].max()
        null_count = df[col].isnull().sum()
        stats[col] = {'min': min_date, 'max': max_date, 'null_count': null_count}
    
    return stats

def numeric_columns_outlier(df: pd.DataFrame, numeric_columns: list[str]) -> dict:
    numeric_columns = numeric_columns
    stats = {}
    
    for col in numeric_columns:
        Q1 = df[col].quantile(0.25)
        Q3 = df[col].quantile(0.75)        
        IQR = Q3 - Q1
        lower_bound = max(Q1 - 1.5 * IQR, 0)
        upper_bound = min(Q3 + 1.5 * IQR, 100)
        outliers = df[(df[col] < lower_bound) | (df[col] > upper_bound)][col]
        outlier_count = outliers.count()
        
        stats[col] = {
            'Q1': Q1,
            'Q3': Q3,
            'IQR': IQR,
            'lower_bound': lower_bound,
            'upper_bound': upper_bound,
            'outlier_count': outlier_count,
        }
    
    return stats

def check_duplicates(df: pd.DataFrame) -> None:
    duplicates = df[df.duplicated(keep=False)]
    duplicate_count = duplicates.shape[0]
    
    return duplicate_count, duplicates

def check_missing_values(df: pd.DataFrame):
    missing_values = df.isnull().sum()
    total_values = len(df)
    missing_percentage = (missing_values / total_values) * 100
    
    stats = pd.DataFrame({
        'missing_values': missing_values,
        'missing_percentage': missing_percentage
    }).sort_values(by='missing_values', ascending=False)
    
    return stats

def create_boxplot(df: pd.DataFrame, column: str) -> None:
    col = column
    
    plt.figure(figsize=(10, 2))
    sns.boxplot(x=df[col], showfliers=True)
    plt.title(f'Boxplot for {col}')
    plt.xlabel(col)
    plt.show

def plot_monthly_observation(df: pd.DataFrame, countby=str) -> None:
    
    if "RatingDate" not in df.columns or f"{countby}" not in df.columns:
        raise ValueError(f"DataFrame must contain 'RatingDate' and {countby} columns")
    
    df['year_month'] = df["RatingDate"].dt.to_period('M')
    
    monthly_counts = df.groupby('year_month').size()
    
    plt.figure(figsize=(10, 3))
    monthly_counts.plot(kind='bar')
    plt.title('Monthly Observation')
    plt.xlabel('Month')
    plt.ylabel('Number of Observation')
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.show()
    
def create_data_by_time(df: pd.DataFrame, by: str = 'M') -> pd.DataFrame:
    if "RatingDate" not in df.columns:
        raise ValueError(f"DataFrame must contain 'RatingDate' columns")
    
    df['YearMonth'] = df["RatingDate"].dt.to_period(f'{by}')
    monthly_counts = df.groupby('YearMonth').size().reset_index(name='Count')
    
    return monthly_counts

def adj_rsme_df(df: pd.DataFrame) -> pd.DataFrame:
    adj_df = df.copy()
    adj_df["RequestID"] = np.where(df["RequestID"].isnull(), df["รหัสเอกสาร"], df["RequestID"])
    return adj_df

In [None]:
corporate_path = '../data/processed/02_data_sampling/corporate_customer_data.parquet'
sme_path =  '../data/processed/02_data_sampling/sme_customer_data.parquet'
rsme_path = "../data/processed/02_data_sampling/Retail_202005_202403.parquet"

In [None]:
corporate_df = import_file(file_path=corporate_path)
sme_df = import_file(file_path=sme_path)
rsme_df_tmp = import_file(file_path=rsme_path)
rsme_df = adj_rsme_df(rsme_df_tmp)

In [None]:
rsme_df.dtypes.reset_index().shape

## Duplication

In [None]:
print("Corporate Portfolio:")
duplicate_count_corp, duplicates_corp = check_duplicates(corporate_df)

print(f" Total Observation: " + str(corporate_df.shape[0]))
print(f" Number of duplicate rows: {duplicate_count_corp}")
print(f" Unique RequestID: " + str(len(corporate_df['RequestID'].unique())))
if duplicate_count_corp > 0:
    print("Duplicate rows:")
    print(duplicates_corp)
print()

print("SMEs Portfolio:")
duplicate_count_sme, duplicates_sme = check_duplicates(sme_df)

print(f" Total Observation: " + str(sme_df.shape[0]))
print(f" Number of duplicate rows: {duplicate_count_sme}")
print(f" Unique RequestID: " + str(len(sme_df['RequestID'].unique())))
if duplicate_count_sme > 0:
    print("Duplicate rows:")
    print(duplicates_sme)
print()

print("Retail SMEs Portfolio:")
duplicate_count_rsme, duplicates_rsme = check_duplicates(rsme_df)

print(f" Total Observation: " + str(rsme_df.shape[0]))
print(f" Number of duplicate rows: {duplicate_count_rsme}")
print(f" Unique RequestID: " + str(len(rsme_df['RequestID'].unique())))
if duplicate_count_rsme > 0:
    print("Duplicate rows:")
    print(duplicates_rsme)
print()

## Category Field Validation

#### Corporate

In [None]:
categorical_stats = categorical_columns_values(corporate_df, categorical_columns)

for col, stat in categorical_stats.items():
    print(f'Column: {col}')
    print(f' Unique Count: {stat['unique_count']}')
    print(f' Example Values: {stat['example_values']}')
    print()

In [None]:
missing_value_stats = check_missing_values(corporate_df[categorical_columns])
print(missing_value_stats)

#### SMEs

In [None]:
categorical_stats = categorical_columns_values(sme_df, categorical_columns)

for col, stat in categorical_stats.items():
    print(f'Column: {col}')
    print(f' Unique Count: {stat['unique_count']}')
    print(f' Example Values: {stat['example_values']}')
    print()

In [None]:
missing_value_stats = check_missing_values(sme_df[categorical_columns])
print(missing_value_stats)

#### Retail SMEs

In [None]:
categorical_stats = categorical_columns_values(rsme_df, categorical_columns)

for col, stat in categorical_stats.items():
    print(f'Column: {col}')
    print(f' Unique Count: {stat['unique_count']}')
    print(f' Example Values: {stat['example_values']}')
    print()

In [None]:
missing_value_stats = check_missing_values(rsme_df[categorical_columns])
print(missing_value_stats)

## Date Field Validation

#### Corporate

In [None]:
date_stats = date_columns_stats(df=corporate_df)

for col, stat in date_stats.items():
    print(f"Column: {col}")
    print(f" Min: {stat['min']}")
    print(f" Max: {stat['max']}")
    print(f" Count Unique: {len(corporate_df[col].unique())}")
    print(f" Null Count: {stat['null_count']}")
    print()

#### SMEs

In [None]:
date_stats = date_columns_stats(df=sme_df)

for col, stat in date_stats.items():
    print(f"Column: {col}")
    print(f" Min: {stat['min']}")
    print(f" Max: {stat['max']}")
    print(f" Count Unique: {len(sme_df[col].unique())}")
    print(f" Null Count: {stat['null_count']}")
    print()

#### Retail SMEs

In [None]:
date_stats = date_columns_stats(df=rsme_df)

for col, stat in date_stats.items():
    print(f"Column: {col}")
    print(f" Min: {stat['min']}")
    print(f" Max: {stat['max']}")
    print(f" Count Unique: {len(rsme_df[col].unique())}")
    print(f" Null Count: {stat['null_count']}")
    print()

## Numerical Field Validation

In [None]:
outlier_stats = numeric_columns_outlier(corporate_df, numeric_columns=numeric_columns)

for col, stat in outlier_stats.items():
    print(f'Column: {col}')
    print(f' Min: {corporate_df["adjCompositeScore"].min()}')
    print(f' Q1: {stat['Q1']}')
    print(f' Q3: {stat['Q3']}')
    print(f' Max: {corporate_df["adjCompositeScore"].max()}')
    print(f' IQR: {stat['IQR']}')
    print(f' Lower Bound: {stat['lower_bound']}')
    print(f' Upper Bound: {stat['upper_bound']}')
    print(f' Outlier Count: {stat['outlier_count']}')
    print()

print("Missing Value: " + str(corporate_df["adjCompositeScore"].isnull().sum()))
print("Count Unique: " + str(len(corporate_df["adjCompositeScore"].unique())))
print(f"{corporate_df["adjCompositeScore"].unique()}")

In [None]:
outlier_stats = numeric_columns_outlier(sme_df, numeric_columns=numeric_columns)

for col, stat in outlier_stats.items():
    print(f'Column: {col}')
    print(f' Min: {sme_df["adjCompositeScore"].min()}')
    print(f' Q1: {stat['Q1']}')
    print(f' Q3: {stat['Q3']}')
    print(f' Max: {sme_df["adjCompositeScore"].max()}')
    print(f' IQR: {stat['IQR']}')
    print(f' Lower Bound: {stat['lower_bound']}')
    print(f' Upper Bound: {stat['upper_bound']}')
    print(f' Outlier Count: {stat['outlier_count']}')
    print()

print("Missing Value: " + str(sme_df["adjCompositeScore"].isnull().sum()))
print("Count Unique: " + str(len(sme_df["adjCompositeScore"].unique())))
print(f"{sme_df["adjCompositeScore"].unique()}")

In [None]:
outlier_stats = numeric_columns_outlier(rsme_df, numeric_columns=numeric_columns)

for col, stat in outlier_stats.items():
    print(f'Column: {col}')
    print(f' Min: {rsme_df["adjCompositeScore"].min()}')
    print(f' Q1: {stat['Q1']}')
    print(f' Q3: {stat['Q3']}')
    print(f' Max: {rsme_df["adjCompositeScore"].max()}')
    print(f' IQR: {stat['IQR']}')
    print(f' Lower Bound: {stat['lower_bound']}')
    print(f' Upper Bound: {stat['upper_bound']}')
    print(f' Outlier Count: {stat['outlier_count']}')
    print()

print("Missing Value: " + str(rsme_df["adjCompositeScore"].isnull().sum()))
print("Count Unique: " + str(len(rsme_df["adjCompositeScore"].unique())))
print(f"{rsme_df["adjCompositeScore"].unique()}")

## Summation Check

In [None]:
def check_equality(df: pd.DataFrame, column1: str, column2: str, precision=2) -> None:
    
    rounded_col1 = df[column1].round(precision)
    rounded_col2 = df[column2].round(precision)
    
    equality = rounded_col1 == rounded_col2
    
    total_count = len(df)
    equal_count = equality.sum()
    unequal_count = total_count - equal_count
    
    print(f"Total values compared: {total_count}")
    print(f"Number of equal values: {equal_count}")
    print(f"Number of unequal values: {unequal_count}")

In [None]:
def create_sum_score_column(df: pd.DataFrame) -> pd.DataFrame:
    score_columns = [
        "CompositeScore",
        "adjFinancialScore",
        "BusinessScore",
        "IndustryScore"
    ]
    
    df_1 = df[score_columns].copy()
    sumScore = (df_1["adjFinancialScore"] + df_1["BusinessScore"] + df_1["IndustryScore"])
    # df_1["sumScore"] = np.where(sumScore > 100, sumScore / 10, sumScore)
    df_1["sumScore"] = sumScore
    
    return df_1

In [None]:
print("Corporate Portfolio: ")
check_equality(create_sum_score_column(corporate_df), "CompositeScore", "sumScore")
print()
print("SMEs Portfolio: ")
check_equality(create_sum_score_column(sme_df), "CompositeScore", "sumScore")
print()
print("RSMEs Portfolio:")
check_equality(create_sum_score_column(rsme_df), "CompositeScore", "sumScore")