In [None]:
%pip install polars

import pandas as pd
import polars as pl
import lightgbm as lgb
import matplotlib.pyplot as plt
import json
from pandas.api.types import CategoricalDtype
from datetime import datetime
from datetime import date
from sklearn.model_selection import train_test_split
from sklearn.metrics import auc, roc_curve, roc_auc_score

def convert_to_ordinal(date_column, date_format):
    epoch_start = datetime(1970, 1, 1)
    date_parsed = date_column.str.strptime(pl.Date, date_format, strict=False)
    return pl.when(date_parsed.is_not_null()).then(
        (date_parsed - epoch_start).dt.total_days()
    ).otherwise(None)

class DataPipeline_Depth_0:
    def __init__(self, base_path, static_0_0_path, static_0_1_path, static_cb_0_path, schema_path):
        self.base_path = base_path
        self.static_0_0_path = static_0_0_path
        self.static_0_1_path = static_0_1_path
        self.static_cb_0_path = static_cb_0_path
        self.schema_path = schema_path
        self.global_schema = {}

    def load_data(self, path):
        try:
            df = pl.read_parquet(path)
            self.update_schema(df)
            return df
        except Exception as e:
            print(f"Error loading data from {path}: {e}")
            raise

    def update_schema(self, dataframe):
        for col, dtype in zip(dataframe.columns, dataframe.dtypes):
            if col not in self.global_schema:
                self.global_schema[col] = str(dtype)

    def save_schema(self):
        with open(self.schema_path, 'w') as file:
            json.dump(self.global_schema, file)

    def preprocess_base(self, data):
        data = data.with_columns(
            convert_to_ordinal(pl.col('date_decision'), '%Y-%m-%d').alias('date_decision_ordinal')
        )
        data = data.drop(['date_decision'])
        return data

    def preprocess_static(self, data):
        date_columns = [col for col in data.columns if col.endswith('D') and data[col].dtype == pl.Utf8]
        for col in date_columns:
            data = data.with_columns(
                convert_to_ordinal(pl.col(col), '%Y-%m-%d').alias(col)
            )
        for col in data.columns:
            if data[col].dtype == pl.Boolean:
                data = data.with_columns(data[col].cast(pl.Int32).alias(col))
        return data.select([col for col in data.columns if data[col].dtype != pl.Utf8 or col in date_columns])

    def preprocess_static_cb_0(self, data):
        date_columns = [col for col in data.columns if col.endswith('D') and data[col].dtype == pl.Utf8]
        for col in date_columns:
            data = data.with_columns(
                convert_to_ordinal(pl.col(col), '%Y-%m-%d').alias(col)
            )
        categorical_columns = ['education_1103M', 'maritalst_385M']
        for col in categorical_columns:
            if col in data.columns:
                data = data.with_columns(data[col].cast(pl.Categorical))
        columns_to_drop = [col for col in data.columns if data[col].dtype == pl.Utf8 and col not in date_columns and col not in categorical_columns]
        data = data.drop(columns_to_drop)
        return data

    def merge_data(self, data_base, data_static_0_0, data_static_0_1, data_static_cb_0):
        concatenated_data = pl.concat([data_static_0_0, data_static_0_1], how='vertical')
        merged_data = data_base.join(concatenated_data, on='case_id', how='left')
        merged_data = merged_data.join(data_static_cb_0, on='case_id', how='left')
        for col in ['education_1103M', 'maritalst_385M']:
            if merged_data[col].dtype != pl.Categorical:
                merged_data = merged_data.with_columns(merged_data[col].cast(pl.Categorical))
        dummies = merged_data[['education_1103M', 'maritalst_385M']].to_dummies()
        merged_data = merged_data.drop(['education_1103M', 'maritalst_385M'])
        merged_data = pl.concat([merged_data, dummies], how='horizontal')
        return merged_data

    def execute_pipeline(self):
        data_base = self.load_data(self.base_path)
        data_static_0_0 = self.load_data(self.static_0_0_path)
        data_static_0_1 = self.load_data(self.static_0_1_path)
        data_static_cb_0 = self.load_data(self.static_cb_0_path)
        self.save_schema()
        data_base = self.preprocess_base(data_base)
        data_static_0_0 = self.preprocess_static(data_static_0_0)
        data_static_0_1 = self.preprocess_static(data_static_0_1)
        data_static_cb_0 = self.preprocess_static_cb_0(data_static_cb_0)
        return self.merge_data(data_base, data_static_0_0, data_static_0_1, data_static_cb_0)

if __name__ == "__main__":
    schema_path = "C:/Users/afise/.git/CreditRiskModel/unified_schema.json"
    pipeline = DataPipeline_Depth_0(
        "C:/Users/afise/.git/CreditRiskModel/Data/parquet_files/train/train_base.parquet",
        "C:/Users/afise/.git/CreditRiskModel/Data/parquet_files/train/train_static_0_0.parquet",
        "C:/Users/afise/.git/CreditRiskModel/Data/parquet_files/train/train_static_0_1.parquet",
        "C:/Users/afise/.git/CreditRiskModel/Data/parquet_files/train/train_static_cb_0.parquet",
        schema_path
    )
    Depth_0 = pipeline.execute_pipeline()

def convert_to_ordinal(date_column, date_format):
    epoch_start = datetime(1970, 1, 1)
    date_parsed = date_column.str.strptime(pl.Date, date_format, strict=False)
    return pl.when(date_parsed.is_not_null()).then(
        (date_parsed - epoch_start).dt.total_days()
    ).otherwise(None)

def dtype_mapping(dtype_str):
    mapping = {
        'Int32': pl.Int32,
        'Int64': pl.Int64,
        'Float32': pl.Float32,
        'Float64': pl.Float64,
        'Utf8': pl.Utf8,
        'Boolean': pl.Boolean,
        'Date': pl.Date,
        'Categorical': pl.Categorical
    }
    return mapping.get(dtype_str, pl.Utf8)

class DataPipeline_Depth_0:
    def __init__(self, base_path, static_paths, static_cb_0_path, schema_path):
        self.base_path = base_path
        self.static_paths = static_paths
        self.static_cb_0_path = static_cb_0_path
        self.schema_path = schema_path
        self.global_schema = self.load_schema()

    def load_schema(self):
        with open(self.schema_path, 'r') as file:
            return json.load(file)

    def load_data(self, path):
        df = pl.read_parquet(path)
        return self.ensure_schema(df)

    def ensure_schema(self, dataframe):
        for col, expected_dtype in self.global_schema.items():
            expected_pl_dtype = dtype_mapping(expected_dtype)
            if col in dataframe.columns:
                if dataframe[col].dtype != expected_pl_dtype:
                    dataframe = dataframe.with_columns(dataframe[col].cast(expected_pl_dtype))
        return dataframe

    def preprocess_base(self, data):
        data = data.with_columns(
            convert_to_ordinal(pl.col('date_decision'), '%Y-%m-%d').alias('date_decision_ordinal')
        )
        data = data.drop(['date_decision'])
        return data

    def preprocess_static(self, data):
        date_columns = [col for col in data.columns if col.endswith('D') and data[col].dtype == pl.Utf8]
        for col in date_columns:
            data = data.with_columns(
                convert_to_ordinal(pl.col(col), '%Y-%m-%d').alias(col)
            )
        for col in data.columns:
            if data[col].dtype == pl.Boolean:
                data = data.with_columns(data[col].cast(pl.Int32).alias(col))
        columns_to_keep = [col for col in data.columns if data[col].dtype != pl.Utf8 or col in date_columns]
        data = data.select(columns_to_keep)
        return data

    def preprocess_static_cb_0(self, data):
        date_columns = [col for col in data.columns if col.endswith('D') and data[col].dtype == pl.Utf8]
        for col in date_columns:
            data = data.with_columns(
                convert_to_ordinal(pl.col(col), '%Y-%m-%d').alias(col)
            )
        categorical_columns = ['education_1103M', 'maritalst_385M']
        for col in categorical_columns:
            if col in data.columns:
                data = data.with_columns(data[col].cast(pl.Categorical))
        columns_to_drop = [col for col in data.columns if data[col].dtype == pl.Utf8 and col not in date_columns and col not in categorical_columns]
        data = data.drop(columns_to_drop)
        return data

    def merge_data(self, data_base, static_datas, data_static_cb_0):
        concatenated_static_data = pl.concat(static_datas, how='vertical')
        merged_data = data_base.join(concatenated_static_data, on='case_id', how='left')
        merged_data = merged_data.join(data_static_cb_0, on='case_id', how='left')
        for col in ['education_1103M', 'maritalst_385M']:
            if merged_data[col].dtype != pl.Categorical:
                merged_data = merged_data.with_columns(merged_data[col].cast(pl.Categorical))
        dummies = merged_data[['education_1103M', 'maritalst_385M']].to_dummies()
        merged_data = merged_data.drop(['education_1103M', 'maritalst_385M'])
        merged_data = pl.concat([merged_data, dummies], how='horizontal')
        return merged_data

    def execute_pipeline(self):
        data_base = self.load_data(self.base_path)
        static_datas = [self.load_data(path) for path in self.static_paths]
        data_static_cb_0 = self.load_data(self.static_cb_0_path)
        data_base = self.preprocess_base(data_base)
        static_datas = [self.preprocess_static(data) for data in static_datas]
        data_static_cb_0 = self.preprocess_static_cb_0(data_static_cb_0)
        return self.merge_data(data_base, static_datas, data_static_cb_0)

if __name__ == "__main__":
    schema_path = "C:/Users/afise/.git/CreditRiskModel/unified_schema.json"
    base_path = "C:/Users/afise/.git/CreditRiskModel/Data/parquet_files/test/test_base.parquet"
    static_paths = [
        "C:/Users/afise/.git/CreditRiskModel/Data/parquet_files/test/test_static_0_0.parquet",
        "C:/Users/afise/.git/CreditRiskModel/Data/parquet_files/test/test_static_0_1.parquet",
        "C:/Users/afise/.git/CreditRiskModel/Data/parquet_files/test/test_static_0_2.parquet"
    ]
    static_cb_0_path = "C:/Users/afise/.git/CreditRiskModel/Data/parquet_files/test/test_static_cb_0.parquet"
    
    pipeline = DataPipeline_Depth_0(base_path, static_paths, static_cb_0_path, schema_path)
    Depth_0_test = pipeline.execute_pipeline()

class DataPipeline_Depth_1:
    def __init__(self, applprev_paths, other_path, deposit_path, person_path, debitcard_path, tax_registry_a_1_path, tax_registry_b_1_path, 
                                    tax_registry_c_1_path, credit_bureau_a_1_paths, credit_bureau_b_1_path, schema_path):
        self.applprev_paths = applprev_paths
        self.other_path = other_path
        self.deposit_path = deposit_path
        self.person_path = person_path
        self.debitcard_path = debitcard_path
        self.tax_registry_a_1_path = tax_registry_a_1_path
        self.tax_registry_b_1_path = tax_registry_b_1_path
        self.tax_registry_c_1_path = tax_registry_c_1_path
        self.credit_bureau_a_1_paths = credit_bureau_a_1_paths
        self.credit_bureau_b_1_path = credit_bureau_b_1_path
        self.schema_path = schema_path
        self.global_schema = {}

    @staticmethod
    def try_parse_date(col, fmt1, fmt2):
        date1 = col.str.strptime(pl.Date, fmt1, strict=False)
        date2 = col.str.strptime(pl.Date, fmt2, strict=False)
        return pl.when(date1.is_not_null()).then(date1).otherwise(date2)

    @staticmethod
    def convert_to_ordinal(date):
        return pl.when(date.is_not_null()).then(
            (date.dt.year() * 365) + (date.dt.month() * 30) + date.dt.day()
        ).otherwise(None)

    def load_data(self):
        self.train_applprev_1 = self.batch_load_and_update_schema(self.applprev_paths)
        self.train_other_1 = self.load_and_update_schema(self.other_path).lazy()
        self.train_deposit_1 = self.load_and_update_schema(self.deposit_path).lazy()
        self.train_person_1 = self.load_and_update_schema(self.person_path).lazy()
        self.train_debitcard_1 = self.load_and_update_schema(self.debitcard_path).lazy()
        self.train_tax_registry_a_1 = self.load_and_update_schema(self.tax_registry_a_1_path).lazy()
        self.train_tax_registry_b_1 = self.load_and_update_schema(self.tax_registry_b_1_path).lazy()
        self.train_tax_registry_c_1 = self.load_and_update_schema(self.tax_registry_c_1_path).lazy()
        self.train_credit_bureau_a_1 = self.batch_load_and_update_schema(self.credit_bureau_a_1_paths)
        self.train_credit_bureau_b_1 = self.load_and_update_schema(self.credit_bureau_b_1_path).lazy()

    def load_and_update_schema(self, path):
        df = pl.read_parquet(path)
        self.update_schema(df)
        return df

    def batch_load_and_update_schema(self, paths):
        batch_size = 2
        lazy_frames = []
        for i in range(0, len(paths), batch_size):
            batch_paths = paths[i:i+batch_size]
            batch_frames = [self.load_and_update_schema(path) for path in batch_paths]
            lazy_frames.append(pl.concat(batch_frames).lazy())
        return pl.concat(lazy_frames)

    def update_schema(self, dataframe):
        for col, dtype in zip(dataframe.columns, dataframe.dtypes):
            if col not in self.global_schema:
                self.global_schema[col] = str(dtype)

    def save_schema(self):
        with open(self.schema_path, 'w') as file:
            json.dump(self.global_schema, file)

    def preprocess_data(self):

        date_formats = ("%m/%d/%Y", "%Y-%m-%d")
        date_columns = ["approvaldate_319D", "dateactivated_425D", "creationdate_885D", "dtlastpmt_581D", "employedfrom_700D", "dtlastpmtallstes_3545839D", "firstnonzeroinstldate_307D"]
        self.train_applprev_1 = self.train_applprev_1.with_columns([
            DataPipeline_Depth_1.try_parse_date(pl.col(col), *date_formats).alias(col) for col in date_columns
        ]).group_by("case_id").agg([
            pl.col("actualdpd_943P").mean().alias("actualdpd_943P_mean"),
            pl.col("annuity_853A").sum().alias("annuity_853A_sum"),
            pl.col("childnum_21L").sum().alias("childnum_21L_sum"),
            pl.col("credacc_actualbalance_314A").mean().alias("credacc_actualbalance_314A_mean"),
            pl.col("credacc_credlmt_575A").mean().alias("credacc_credlmt_575A_mean"),
            pl.col("credacc_maxhisbal_375A").max().alias("credacc_maxhisbal_375A_max"),
            pl.col("credacc_minhisbal_90A").min().alias("credacc_minhisbal_90A_min"),
            pl.col("credacc_transactions_402L").sum().alias("credacc_transactions_402L_sum"),
            pl.col("credamount_590A").mean().alias("credamount_590A_mean"),
            pl.col("currdebt_94A").mean().alias("currdebt_94A_mean"),
            pl.col("downpmt_134A").sum().alias("downpmt_134A_sum"),
            pl.col("mainoccupationinc_437A").mean().alias("mainoccupationinc_437A_mean"),
            pl.col("outstandingdebt_522A").sum().alias("outstandingdebt_522A_sum"),
            pl.col("pmtnum_8L").max().alias("pmtnum_8L_max"),
            pl.col("tenor_203L").min().alias("tenor_203L_min"),
            pl.col("isbidproduct_390L").cast(pl.UInt32).sum().alias("isbidproduct_390L_sum"),
            pl.col("isdebitcard_527L").cast(pl.UInt32).sum().alias("isdebitcard_527L_sum"),
            pl.col("credacc_status_367L").n_unique().alias("credacc_status_367L_n_unique"),
            pl.col("credtype_587L").n_unique().alias("credtype_587L_n_unique"),
            pl.col("education_1138M").n_unique().alias("education_1138M_n_unique"),
            pl.col("familystate_726L").n_unique().alias("familystate_726L_n_unique"),
            pl.col("postype_4733339M").n_unique().alias("postype_4733339M_n_unique"),
            pl.col("profession_152M").n_unique().alias("profession_152M_n_unique"),
            pl.col("rejectreason_755M").n_unique().alias("rejectreason_755M_n_unique"),
            pl.col("rejectreasonclient_4145042M").n_unique().alias("rejectreasonclient_4145042M_n_unique"),
            pl.col("status_219L").n_unique().alias("status_219L_n_unique"),
            (pl.col("approvaldate_319D").diff().abs().min()).alias("approval_to_activation_min_diff"),
            (pl.col("creationdate_885D").diff().abs().min()).alias("creation_min_diff"),
            (pl.col("dtlastpmt_581D").diff().abs().max()).alias("payment_max_diff"),
            pl.col("employedfrom_700D").min().alias("earliest_employment_date"),
            pl.col("byoccupationinc_3656910L").n_unique().alias("byoccupationinc_3656910L_n_unique"),
            pl.col("cancelreason_3545846M").n_unique().alias("cancelreason_3545846M_n_unique"),
            pl.col("district_544M").n_unique().alias("district_544M_n_unique"),
            pl.col("dtlastpmtallstes_3545839D").min().alias("earliest_last_payment_date"),
            pl.col("firstnonzeroinstldate_307D").min().alias("earliest_first_nonzero_installment_date"),
            pl.col("inittransactioncode_279L").n_unique().alias("inittransactioncode_279L_n_unique"),
            pl.col("maxdpdtolerance_577P").max().alias("maximum_dpd_tolerance"),
            pl.col("revolvingaccount_394A").sum().alias("sum_revolving_accounts")
        ])

        self.train_other_1 = self.train_other_1.group_by("case_id").agg([
            pl.col("amtdebitincoming_4809443A").sum().alias("sum_amtdebitincoming"),
            pl.col("amtdebitoutgoing_4809440A").sum().alias("sum_amtdebitoutgoing"),
            pl.col("amtdepositbalance_4809441A").mean().alias("avg_amtdepositbalance"),
            pl.col("amtdepositincoming_4809444A").sum().alias("sum_amtdepositincoming"),
            pl.col("amtdepositoutgoing_4809442A").sum().alias("sum_amtdepositoutgoing")
        ])

        self.train_deposit_1 = self.train_deposit_1.group_by("case_id").agg([
            pl.col("amount_416A").mean().alias("average_amount"),
            pl.count("openingdate_313D").alias("open_contracts_count"),
            pl.count("contractenddate_991D").alias("closed_contracts_count")
        ])

        date_format = ("%m/%d/%Y", "%Y-%m-%d")
        self.train_person_1 = self.train_person_1.with_columns(
            DataPipeline_Depth_1.try_parse_date(pl.col("empl_employedfrom_271D"), *date_format).alias("empl_employedfrom_271D")
        )
        self.train_person_1 = self.train_person_1.with_columns(
            DataPipeline_Depth_1.convert_to_ordinal(pl.col("empl_employedfrom_271D")).alias("ordinal_employedfrom_271D")
        )
        self.train_person_1 = self.train_person_1.group_by("case_id").agg([
            pl.col("birth_259D").n_unique().alias("unique_birth_dates"),
            pl.col("birthdate_87D").n_unique().alias("unique_birth_dates_87D"),
            pl.col("childnum_185L").max().alias("max_children"),
            pl.col("education_927M").n_unique().alias("unique_educations"),
            pl.col("empl_employedtotal_800L").n_unique().alias("avg_employment_length"),
            pl.col("mainoccupationinc_384A").sum().alias("total_main_income"),
            pl.col("gender_992L").n_unique().alias("unique_genders"),
            pl.col("housetype_905L").n_unique().alias("unique_house_types"),
            pl.col("housingtype_772L").n_unique().alias("unique_housing_types"),
            pl.col("incometype_1044T").n_unique().alias("unique_income_types"),
            pl.col("maritalst_703L").n_unique().alias("unique_marital_statuses"),
            pl.col("persontype_1072L").n_unique().alias("unique_person_types_1072L"),
            pl.col("persontype_792L").n_unique().alias("unique_person_types_792L"),
            pl.col("relationshiptoclient_415T").n_unique().alias("unique_relationships_415T"),
            pl.col("relationshiptoclient_642T").n_unique().alias("unique_relationships_642T"),
            pl.col("remitter_829L").sum().alias("sum_remitters"),
            pl.col("role_1084L").n_unique().alias("unique_roles_1084L"),
            pl.col("role_993L").n_unique().alias("unique_roles_993L"),
            pl.col("safeguarantyflag_411L").sum().alias("sum_safeguaranty_flags"),
            pl.col("sex_738L").n_unique().alias("unique_sexes"),
            pl.col("type_25L").n_unique().alias("unique_contact_types"),
            pl.col("contaddr_district_15M").n_unique().alias("unique_contact_address_districts"),
            pl.col("empladdr_district_926M").n_unique().alias("unique_employer_address_districts"),
            pl.col("registaddr_district_1083M").n_unique().alias("unique_registered_address_districts"),
            pl.col("isreference_387L").sum().alias("sum_is_reference_flags"),
            pl.col("empl_industry_691L").n_unique().alias("unique_industries"),
            pl.col("empladdr_zipcode_114M").n_unique().alias("unique_employer_zipcodes"),
            pl.col("contaddr_zipcode_807M").n_unique().alias("unique_contact_zipcodes"),
            pl.col("registaddr_zipcode_184M").n_unique().alias("unique_registered_zipcodes"),
            pl.col("language1_981M").n_unique().alias("unique_languages"),
            pl.col("familystate_447L").n_unique().alias("unique_family_states"),
            pl.col("contaddr_matchlist_1032L").sum().alias("sum_contact_address_matchlist"),
            pl.col("contaddr_smempladdr_334L").sum().alias("sum_contact_same_employer_address"),
            pl.col("personindex_1023L").n_unique().alias("unique_person_indices"),
            pl.col("ordinal_employedfrom_271D").max().alias("latest_employment_date_ordinal")
        ])

        self.train_debitcard_1 = self.train_debitcard_1.with_columns([
            DataPipeline_Depth_1.convert_to_ordinal(
                pl.col("openingdate_857D").str.strptime(pl.Date, "%Y-%m-%d")
            ).alias("ordinal_openingdate")
        ])

        self.train_debitcard_1 = self.train_debitcard_1.group_by("case_id").agg([
            pl.col("last180dayaveragebalance_704A").sum().alias("total_180dayaveragebalance"),
            pl.col("last180dayturnover_1134A").sum().alias("total_180dayturnover"),
            pl.col("last30dayturnover_651A").sum().alias("total_30dayturnover"),
            pl.min("ordinal_openingdate").alias("earliest_openingdate")
        ])

        self.train_tax_registry_a_1 = self.train_tax_registry_a_1.with_columns([
            DataPipeline_Depth_1.convert_to_ordinal(
                pl.col("recorddate_4527225D").str.strptime(pl.Date, "%Y-%m-%d")
            ).alias("ordinal_recorddate_4527225D")
        ])

        self.train_tax_registry_a_1 = self.train_tax_registry_a_1.group_by("case_id").agg([
            pl.col("amount_4527230A").sum().alias("total_amount_4527230A"),
            pl.col("ordinal_recorddate_4527225D").max().alias("ordinal_recorddate_4527225D"),
            pl.col("name_4527232M").n_unique().alias("unique_name_4527232M")
        ])

        self.train_tax_registry_b_1 = self.train_tax_registry_b_1.group_by("case_id").agg([
            pl.col("amount_4917619A").sum().alias("total_amount_4917619A"),
            pl.col("deductiondate_4917603D").n_unique().alias("unique_deductiondate_4917603D"),
            pl.col("name_4917606M").n_unique().alias("unique_name_4917606M")
        ])

        self.train_tax_registry_c_1 = self.train_tax_registry_c_1.group_by("case_id").agg([
            pl.col("pmtamount_36A").sum().alias("total_pmtamount_36A"),
            pl.col("processingdate_168D").n_unique().alias("unique_processingdate_168D"),
            pl.col("employername_160M").n_unique().alias("unique_employername_160M")
        ])

        date_columns_2 = [column for column in self.train_credit_bureau_a_1.columns if column[-1] == 'D']
        amount_column = [column for column in self.train_credit_bureau_a_1.columns if column[-1] == 'A']
        dpd_column = [column for column in self.train_credit_bureau_a_1.columns if column[-1] == 'P']
        mask_column = [column for column in self.train_credit_bureau_a_1.columns if column[-1] == 'M']
        other_columns = [column for column in self.train_credit_bureau_a_1.columns if column[-1] == 'T' or column[-1] == 'L']

        for col in date_columns_2:
            self.train_credit_bureau_a_1 = self.train_credit_bureau_a_1.with_columns(
                DataPipeline_Depth_1.convert_to_ordinal(pl.col(col)).alias(f"ordinal_{col}")
                )
        
        aggregations = []

        for col in self.train_credit_bureau_a_1.columns:
            if col in amount_column:
                aggregations.append(pl.col(col).sum().alias(f"total_{col}"))
            elif col in dpd_column:
                aggregations.append(pl.col(col).sum().alias(f"total_{col}"))
            elif col in mask_column:
                aggregations.append(pl.col(col).n_unique().alias(f"unique_{col}"))
            elif col in date_columns_2:
                aggregations.append(pl.col(col).max().alias(f"max_{col}"))
            elif col in other_columns:
                aggregations.append(pl.col(col).n_unique().alias(f"unique_{col}"))

        self.train_credit_bureau_a_1 = self.train_credit_bureau_a_1.group_by("case_id").agg(aggregations)

        date_columns_3 = [column for column in self.train_credit_bureau_b_1.columns if column[-1] == 'D']
        amount_column_2 = [column for column in self.train_credit_bureau_b_1.columns if column[-1] == 'A']
        dpd_column_2 = [column for column in self.train_credit_bureau_b_1.columns if column[-1] == 'P']
        mask_column_2 = [column for column in self.train_credit_bureau_b_1.columns if column[-1] == 'M']
        other_columns_2 = [column for column in self.train_credit_bureau_b_1.columns if column[-1] == 'T' or column[-1] == 'L']

        for col in date_columns_3:
            self.train_credit_bureau_b_1 = self.train_credit_bureau_b_1.with_columns(
                DataPipeline_Depth_1.convert_to_ordinal(pl.col(col)).alias(f"ordinal_{col}")
                )
        
        aggregations_2 = []

        for col in self.train_credit_bureau_b_1.columns:
            if col in amount_column_2:
                aggregations_2.append(pl.col(col).sum().alias(f"total_{col}"))
            elif col in dpd_column_2:
                aggregations_2.append(pl.col(col).sum().alias(f"total_{col}"))
            elif col in mask_column_2:
                aggregations_2.append(pl.col(col).n_unique().alias(f"unique_{col}"))
            elif col in date_columns_3:
                aggregations_2.append(pl.col(col).max().alias(f"max_{col}"))
            elif col in other_columns_2:
                aggregations_2.append(pl.col(col).n_unique().alias(f"unique_{col}"))

        self.train_credit_bureau_b_1 = self.train_credit_bureau_b_1.group_by("case_id").agg(aggregations_2)

    def merge_data(self):
        df_joined = self.train_applprev_1.join(self.train_other_1, on="case_id", how="left")
        df_joined = df_joined.join(self.train_deposit_1, on="case_id", how="left")
        df_joined = df_joined.join(self.train_person_1, on="case_id", how="left")
        df_joined = df_joined.join(self.train_debitcard_1, on="case_id", how="left")
        df_joined = df_joined.join(self.train_tax_registry_a_1, on="case_id", how="left")
        df_joined = df_joined.join(self.train_tax_registry_b_1, on="case_id", how="left")
        df_joined = df_joined.join(self.train_tax_registry_c_1, on="case_id", how="left")
        df_joined = df_joined.join(self.train_credit_bureau_a_1, on="case_id", how="left")
        df_joined = df_joined.join(self.train_credit_bureau_b_1, on="case_id", how="left")

        string_date_columns = [
            'max_dateofcredend_289D', 'max_dateofcredend_353D', 'max_dateofcredstart_181D', 
            'max_dateofcredstart_739D', 'max_dateofrealrepmt_138D', 'max_lastupdate_1112D', 
            'max_lastupdate_388D', 'max_numberofoverdueinstlmaxdat_148D', 
            'max_numberofoverdueinstlmaxdat_641D', 'max_overdueamountmax2date_1002D', 
            'max_overdueamountmax2date_1142D', 'max_refreshdate_3813885D', 
            'max_contractdate_551D', 'max_contractmaturitydate_151D', 'max_lastupdate_260D'
        ]

        for column in string_date_columns:
            df_joined = df_joined.with_columns(
                DataPipeline_Depth_1.try_parse_date(pl.col(column), '%Y-%m-%d', '%m/%d/%Y').alias(column)
            )

        df_joined = df_joined.collect()

        date_columns = [col for col, dtype in zip(df_joined.columns, df_joined.dtypes) if dtype == pl.Date]
        for col in date_columns:
            df_joined = df_joined.with_columns(
                DataPipeline_Depth_1.convert_to_ordinal(df_joined[col]).alias(col)
            )

        duration_columns = ["approval_to_activation_min_diff", "creation_min_diff", "payment_max_diff"]
        for column in duration_columns:
            df_joined = df_joined.with_columns(
                pl.col(column).str.replace("d", "").cast(pl.Int64) * 1440
            )
        return df_joined

    def execute_pipeline(self):
        self.load_data()
        self.preprocess_data()
        merged_data = self.merge_data()
        self.save_schema()
        return merged_data

if __name__ == "__main__":
    applprev_paths = [
        "Data/parquet_files/train/train_applprev_1_0.parquet",
        "Data/parquet_files/train/train_applprev_1_1.parquet"
    ]
    other_path = "Data/parquet_files/train/train_other_1.parquet"
    deposit_path = "Data/parquet_files/train/train_deposit_1.parquet"
    person_path = "Data/parquet_files/train/train_person_1.parquet"
    debitcard_path = "Data/parquet_files/train/train_debitcard_1.parquet"
    tax_registry_a_1_path = "Data/parquet_files/train/train_tax_registry_a_1.parquet"
    tax_registry_b_1_path = "Data/parquet_files/train/train_tax_registry_b_1.parquet"
    tax_registry_c_1_path = "Data/parquet_files/train/train_tax_registry_c_1.parquet"
    credit_bureau_a_1_paths = [
        "Data/parquet_files/train/train_credit_bureau_a_1_0.parquet",
        "Data/parquet_files/train/train_credit_bureau_a_1_1.parquet",
        "Data/parquet_files/train/train_credit_bureau_a_1_2.parquet",
        "Data/parquet_files/train/train_credit_bureau_a_1_3.parquet"
    ]
    credit_bureau_b_1_path = "Data/parquet_files/train/train_credit_bureau_b_1.parquet"
    schema_path = "unified_schema_3.json"

    pipeline = DataPipeline_Depth_1(applprev_paths, other_path, deposit_path, person_path, debitcard_path, tax_registry_a_1_path, tax_registry_b_1_path, 
                                    tax_registry_c_1_path, credit_bureau_a_1_paths, credit_bureau_b_1_path, schema_path)
    Depth_1 = pipeline.execute_pipeline()
    
class DataPipeline_Depth_1:
    def __init__(self, applprev_paths, other_path, deposit_path, person_path, debitcard_path, tax_registry_a_1_path, tax_registry_b_1_path, 
                                    tax_registry_c_1_path, credit_bureau_a_1_paths, credit_bureau_b_1_path, schema_path):
        self.applprev_paths = applprev_paths
        self.other_path = other_path
        self.deposit_path = deposit_path
        self.person_path = person_path
        self.debitcard_path = debitcard_path
        self.tax_registry_a_1_path = tax_registry_a_1_path
        self.tax_registry_b_1_path = tax_registry_b_1_path
        self.tax_registry_c_1_path = tax_registry_c_1_path
        self.credit_bureau_a_1_paths = credit_bureau_a_1_paths
        self.credit_bureau_b_1_path = credit_bureau_b_1_path
        self.schema_path = schema_path
        self.global_schema = self.load_schema()

    def load_schema(self):
        with open(self.schema_path, 'r') as file:
            return json.load(file)
    
    @staticmethod
    def dtype_mapping(dtype_str):
        mapping = {
            'Int32': pl.Int32,
            'Int64': pl.Int64,
            'Float32': pl.Float32,
            'Float64': pl.Float64,
            'Utf8': pl.Utf8,
            'Boolean': pl.Boolean,
            'Date': pl.Date,
            'Categorical': pl.Categorical
        }
        return mapping.get(dtype_str, pl.Utf8)

    @staticmethod
    def try_parse_date(col, fmt1, fmt2):
        date1 = col.str.strptime(pl.Date, fmt1, strict=False)
        date2 = col.str.strptime(pl.Date, fmt2, strict=False)
        return pl.when(date1.is_not_null()).then(date1).otherwise(date2)

    @staticmethod
    def convert_to_ordinal(date):
        return pl.when(date.is_not_null()).then(
            (date.dt.year() * 365) + (date.dt.month() * 30) + date.dt.day()
        ).otherwise(None)
    
    def load_and_ensure_schema(self, path):
        df = pl.read_parquet(path)
        for col, expected_dtype in self.global_schema.items():
            expected_pl_dtype = self.dtype_mapping(expected_dtype)
            if col in df.columns:
                if df[col].dtype != expected_pl_dtype:
                    df = df.with_columns(df[col].cast(expected_pl_dtype))
            else:
                df = df.with_columns(pl.lit(None).cast(expected_pl_dtype))
        return df

    def load_data(self):
        self.train_applprev_1 = pl.concat([self.load_and_ensure_schema(path) for path in self.applprev_paths]).lazy()
        self.train_other_1 = self.load_and_ensure_schema(self.other_path).lazy()
        self.train_deposit_1 = self.load_and_ensure_schema(self.deposit_path).lazy()
        self.train_person_1 = self.load_and_ensure_schema(self.person_path).lazy()
        self.train_debitcard_1 = self.load_and_ensure_schema(self.debitcard_path).lazy()
        self.train_tax_registry_a_1 = self.load_and_ensure_schema(self.tax_registry_a_1_path).lazy()
        self.train_tax_registry_b_1 = self.load_and_ensure_schema(self.tax_registry_b_1_path).lazy()
        self.train_tax_registry_c_1 = self.load_and_ensure_schema(self.tax_registry_c_1_path).lazy()
        self.train_credit_bureau_a_1 = pl.concat([self.load_and_ensure_schema(path) for path in self.credit_bureau_a_1_paths]).lazy()
        self.train_credit_bureau_b_1 = self.load_and_ensure_schema(self.credit_bureau_b_1_path).lazy()

    def preprocess_data(self):

        date_formats = ("%m/%d/%Y", "%Y-%m-%d")
        date_columns = ["approvaldate_319D", "dateactivated_425D", "creationdate_885D", "dtlastpmt_581D", "employedfrom_700D", "dtlastpmtallstes_3545839D", "firstnonzeroinstldate_307D"]
        self.train_applprev_1 = self.train_applprev_1.with_columns([
            DataPipeline_Depth_1.try_parse_date(pl.col(col), *date_formats).alias(col) for col in date_columns
        ]).group_by("case_id").agg([
            pl.col("actualdpd_943P").mean().alias("actualdpd_943P_mean"),
            pl.col("annuity_853A").sum().alias("annuity_853A_sum"),
            pl.col("childnum_21L").sum().alias("childnum_21L_sum"),
            pl.col("credacc_actualbalance_314A").mean().alias("credacc_actualbalance_314A_mean"),
            pl.col("credacc_credlmt_575A").mean().alias("credacc_credlmt_575A_mean"),
            pl.col("credacc_maxhisbal_375A").max().alias("credacc_maxhisbal_375A_max"),
            pl.col("credacc_minhisbal_90A").min().alias("credacc_minhisbal_90A_min"),
            pl.col("credacc_transactions_402L").sum().alias("credacc_transactions_402L_sum"),
            pl.col("credamount_590A").mean().alias("credamount_590A_mean"),
            pl.col("currdebt_94A").mean().alias("currdebt_94A_mean"),
            pl.col("downpmt_134A").sum().alias("downpmt_134A_sum"),
            pl.col("mainoccupationinc_437A").mean().alias("mainoccupationinc_437A_mean"),
            pl.col("outstandingdebt_522A").sum().alias("outstandingdebt_522A_sum"),
            pl.col("pmtnum_8L").max().alias("pmtnum_8L_max"),
            pl.col("tenor_203L").min().alias("tenor_203L_min"),
            pl.col("isbidproduct_390L").cast(pl.UInt32).sum().alias("isbidproduct_390L_sum"),
            pl.col("isdebitcard_527L").cast(pl.UInt32).sum().alias("isdebitcard_527L_sum"),
            pl.col("credacc_status_367L").n_unique().alias("credacc_status_367L_n_unique"),
            pl.col("credtype_587L").n_unique().alias("credtype_587L_n_unique"),
            pl.col("education_1138M").n_unique().alias("education_1138M_n_unique"),
            pl.col("familystate_726L").n_unique().alias("familystate_726L_n_unique"),
            pl.col("postype_4733339M").n_unique().alias("postype_4733339M_n_unique"),
            pl.col("profession_152M").n_unique().alias("profession_152M_n_unique"),
            pl.col("rejectreason_755M").n_unique().alias("rejectreason_755M_n_unique"),
            pl.col("rejectreasonclient_4145042M").n_unique().alias("rejectreasonclient_4145042M_n_unique"),
            pl.col("status_219L").n_unique().alias("status_219L_n_unique"),
            (pl.col("approvaldate_319D").diff().abs().min()).alias("approval_to_activation_min_diff"),
            (pl.col("creationdate_885D").diff().abs().min()).alias("creation_min_diff"),
            (pl.col("dtlastpmt_581D").diff().abs().max()).alias("payment_max_diff"),
            pl.col("employedfrom_700D").min().alias("earliest_employment_date"),
            pl.col("byoccupationinc_3656910L").n_unique().alias("byoccupationinc_3656910L_n_unique"),
            pl.col("cancelreason_3545846M").n_unique().alias("cancelreason_3545846M_n_unique"),
            pl.col("district_544M").n_unique().alias("district_544M_n_unique"),
            pl.col("dtlastpmtallstes_3545839D").min().alias("earliest_last_payment_date"),
            pl.col("firstnonzeroinstldate_307D").min().alias("earliest_first_nonzero_installment_date"),
            pl.col("inittransactioncode_279L").n_unique().alias("inittransactioncode_279L_n_unique"),
            pl.col("maxdpdtolerance_577P").max().alias("maximum_dpd_tolerance"),
            pl.col("revolvingaccount_394A").sum().alias("sum_revolving_accounts")
        ])

        self.train_other_1 = self.train_other_1.group_by("case_id").agg([
            pl.col("amtdebitincoming_4809443A").sum().alias("sum_amtdebitincoming"),
            pl.col("amtdebitoutgoing_4809440A").sum().alias("sum_amtdebitoutgoing"),
            pl.col("amtdepositbalance_4809441A").mean().alias("avg_amtdepositbalance"),
            pl.col("amtdepositincoming_4809444A").sum().alias("sum_amtdepositincoming"),
            pl.col("amtdepositoutgoing_4809442A").sum().alias("sum_amtdepositoutgoing")
        ])

        self.train_deposit_1 = self.train_deposit_1.group_by("case_id").agg([
            pl.col("amount_416A").mean().alias("average_amount"),
            pl.count("openingdate_313D").alias("open_contracts_count"),
            pl.count("contractenddate_991D").alias("closed_contracts_count")
        ])

        date_format = ("%m/%d/%Y", "%Y-%m-%d")
        self.train_person_1 = self.train_person_1.with_columns(
            DataPipeline_Depth_1.try_parse_date(pl.col("empl_employedfrom_271D"), *date_format).alias("empl_employedfrom_271D")
        )
        self.train_person_1 = self.train_person_1.with_columns(
            DataPipeline_Depth_1.convert_to_ordinal(pl.col("empl_employedfrom_271D")).alias("ordinal_employedfrom_271D")
        )
        self.train_person_1 = self.train_person_1.group_by("case_id").agg([
            pl.col("birth_259D").n_unique().alias("unique_birth_dates"),
            pl.col("birthdate_87D").n_unique().alias("unique_birth_dates_87D"),
            pl.col("childnum_185L").max().alias("max_children"),
            pl.col("education_927M").n_unique().alias("unique_educations"),
            pl.col("empl_employedtotal_800L").n_unique().alias("avg_employment_length"),
            pl.col("mainoccupationinc_384A").sum().alias("total_main_income"),
            pl.col("gender_992L").n_unique().alias("unique_genders"),
            pl.col("housetype_905L").n_unique().alias("unique_house_types"),
            pl.col("housingtype_772L").n_unique().alias("unique_housing_types"),
            pl.col("incometype_1044T").n_unique().alias("unique_income_types"),
            pl.col("maritalst_703L").n_unique().alias("unique_marital_statuses"),
            pl.col("persontype_1072L").n_unique().alias("unique_person_types_1072L"),
            pl.col("persontype_792L").n_unique().alias("unique_person_types_792L"),
            pl.col("relationshiptoclient_415T").n_unique().alias("unique_relationships_415T"),
            pl.col("relationshiptoclient_642T").n_unique().alias("unique_relationships_642T"),
            pl.col("remitter_829L").sum().alias("sum_remitters"),
            pl.col("role_1084L").n_unique().alias("unique_roles_1084L"),
            pl.col("role_993L").n_unique().alias("unique_roles_993L"),
            pl.col("safeguarantyflag_411L").sum().alias("sum_safeguaranty_flags"),
            pl.col("sex_738L").n_unique().alias("unique_sexes"),
            pl.col("type_25L").n_unique().alias("unique_contact_types"),
            pl.col("contaddr_district_15M").n_unique().alias("unique_contact_address_districts"),
            pl.col("empladdr_district_926M").n_unique().alias("unique_employer_address_districts"),
            pl.col("registaddr_district_1083M").n_unique().alias("unique_registered_address_districts"),
            pl.col("isreference_387L").sum().alias("sum_is_reference_flags"),
            pl.col("empl_industry_691L").n_unique().alias("unique_industries"),
            pl.col("empladdr_zipcode_114M").n_unique().alias("unique_employer_zipcodes"),
            pl.col("contaddr_zipcode_807M").n_unique().alias("unique_contact_zipcodes"),
            pl.col("registaddr_zipcode_184M").n_unique().alias("unique_registered_zipcodes"),
            pl.col("language1_981M").n_unique().alias("unique_languages"),
            pl.col("familystate_447L").n_unique().alias("unique_family_states"),
            pl.col("contaddr_matchlist_1032L").sum().alias("sum_contact_address_matchlist"),
            pl.col("contaddr_smempladdr_334L").sum().alias("sum_contact_same_employer_address"),
            pl.col("personindex_1023L").n_unique().alias("unique_person_indices"),
            pl.col("ordinal_employedfrom_271D").max().alias("latest_employment_date_ordinal")
        ])

        self.train_debitcard_1 = self.train_debitcard_1.with_columns([
            DataPipeline_Depth_1.convert_to_ordinal(
                pl.col("openingdate_857D").str.strptime(pl.Date, "%Y-%m-%d")
            ).alias("ordinal_openingdate")
        ])

        self.train_debitcard_1 = self.train_debitcard_1.group_by("case_id").agg([
            pl.col("last180dayaveragebalance_704A").sum().alias("total_180dayaveragebalance"),
            pl.col("last180dayturnover_1134A").sum().alias("total_180dayturnover"),
            pl.col("last30dayturnover_651A").sum().alias("total_30dayturnover"),
            pl.min("ordinal_openingdate").alias("earliest_openingdate")
        ])

        date_columns_2 = [column for column in self.train_credit_bureau_a_1.columns if column[-1] == 'D']
        amount_column = [column for column in self.train_credit_bureau_a_1.columns if column[-1] == 'A']
        dpd_column = [column for column in self.train_credit_bureau_a_1.columns if column[-1] == 'P']
        mask_column = [column for column in self.train_credit_bureau_a_1.columns if column[-1] == 'M']
        other_columns = [column for column in self.train_credit_bureau_a_1.columns if column[-1] == 'T' or column[-1] == 'L']

        for col in date_columns_2:
            self.train_credit_bureau_a_1 = self.train_credit_bureau_a_1.with_columns(
                DataPipeline_Depth_1.convert_to_ordinal(pl.col(col)).alias(f"ordinal_{col}")
                )
        
        aggregations = []

        for col in self.train_credit_bureau_a_1.columns:
            if col in amount_column:
                aggregations.append(pl.col(col).sum().alias(f"total_{col}"))
            elif col in dpd_column:
                aggregations.append(pl.col(col).sum().alias(f"total_{col}"))
            elif col in mask_column:
                aggregations.append(pl.col(col).n_unique().alias(f"unique_{col}"))
            elif col in date_columns_2:
                aggregations.append(pl.col(col).max().alias(f"max_{col}"))
            elif col in other_columns:
                aggregations.append(pl.col(col).n_unique().alias(f"unique_{col}"))

        self.train_credit_bureau_a_1 = self.train_credit_bureau_a_1.group_by("case_id").agg(aggregations)

        date_columns_3 = [column for column in self.train_credit_bureau_b_1.columns if column[-1] == 'D']
        amount_column_2 = [column for column in self.train_credit_bureau_b_1.columns if column[-1] == 'A']
        dpd_column_2 = [column for column in self.train_credit_bureau_b_1.columns if column[-1] == 'P']
        mask_column_2 = [column for column in self.train_credit_bureau_b_1.columns if column[-1] == 'M']
        other_columns_2 = [column for column in self.train_credit_bureau_b_1.columns if column[-1] == 'T' or column[-1] == 'L']

        for col in date_columns_3:
            self.train_credit_bureau_b_1 = self.train_credit_bureau_b_1.with_columns(
                DataPipeline_Depth_1.convert_to_ordinal(pl.col(col)).alias(f"ordinal_{col}")
                )
        
        aggregations_2 = []

        for col in self.train_credit_bureau_b_1.columns:
            if col in amount_column_2:
                aggregations_2.append(pl.col(col).sum().alias(f"total_{col}"))
            elif col in dpd_column_2:
                aggregations_2.append(pl.col(col).sum().alias(f"total_{col}"))
            elif col in mask_column_2:
                aggregations_2.append(pl.col(col).n_unique().alias(f"unique_{col}"))
            elif col in date_columns_3:
                aggregations_2.append(pl.col(col).max().alias(f"max_{col}"))
            elif col in other_columns_2:
                aggregations_2.append(pl.col(col).n_unique().alias(f"unique_{col}"))

        self.train_credit_bureau_b_1 = self.train_credit_bureau_b_1.group_by("case_id").agg(aggregations_2)

        self.train_tax_registry_a_1 = self.train_tax_registry_a_1.with_columns([
            DataPipeline_Depth_1.convert_to_ordinal(
                pl.col("recorddate_4527225D").str.strptime(pl.Date, "%Y-%m-%d")
            ).alias("ordinal_recorddate_4527225D")
        ])

        self.train_tax_registry_a_1 = self.train_tax_registry_a_1.group_by("case_id").agg([
            pl.col("amount_4527230A").sum().alias("total_amount_4527230A"),
            pl.col("ordinal_recorddate_4527225D").max().alias("ordinal_recorddate_4527225D"),
            pl.col("name_4527232M").n_unique().alias("unique_name_4527232M")
        ])

        self.train_tax_registry_b_1 = self.train_tax_registry_b_1.group_by("case_id").agg([
            pl.col("amount_4917619A").sum().alias("total_amount_4917619A"),
            pl.col("deductiondate_4917603D").n_unique().alias("unique_deductiondate_4917603D"),
            pl.col("name_4917606M").n_unique().alias("unique_name_4917606M")
        ])

        self.train_tax_registry_c_1 = self.train_tax_registry_c_1.group_by("case_id").agg([
            pl.col("pmtamount_36A").sum().alias("total_pmtamount_36A"),
            pl.col("processingdate_168D").n_unique().alias("unique_processingdate_168D"),
            pl.col("employername_160M").n_unique().alias("unique_employername_160M")
        ])

    def merge_data(self):
        df_joined = self.train_applprev_1.join(self.train_other_1, on="case_id", how="left")
        df_joined = df_joined.join(self.train_deposit_1, on="case_id", how="left")
        df_joined = df_joined.join(self.train_person_1, on="case_id", how="left")
        df_joined = df_joined.join(self.train_debitcard_1, on="case_id", how="left")
        df_joined = df_joined.join(self.train_tax_registry_a_1, on="case_id", how="left")
        df_joined = df_joined.join(self.train_tax_registry_b_1, on="case_id", how="left")
        df_joined = df_joined.join(self.train_tax_registry_c_1, on="case_id", how="left")
        df_joined = df_joined.join(self.train_credit_bureau_a_1, on="case_id", how="left")
        df_joined = df_joined.join(self.train_credit_bureau_b_1, on="case_id", how="left")

        string_date_columns = [
            'max_dateofcredend_289D', 'max_dateofcredend_353D', 'max_dateofcredstart_181D', 
            'max_dateofcredstart_739D', 'max_dateofrealrepmt_138D', 'max_lastupdate_1112D', 
            'max_lastupdate_388D', 'max_numberofoverdueinstlmaxdat_148D', 
            'max_numberofoverdueinstlmaxdat_641D', 'max_overdueamountmax2date_1002D', 
            'max_overdueamountmax2date_1142D', 'max_refreshdate_3813885D', 
            'max_contractdate_551D', 'max_contractmaturitydate_151D', 'max_lastupdate_260D'
        ]

        for column in string_date_columns:
            df_joined = df_joined.with_columns(
                DataPipeline_Depth_1.try_parse_date(pl.col(column), '%Y-%m-%d', '%m/%d/%Y').alias(column)
            )

        df_joined = df_joined.collect()

        date_columns = [col for col, dtype in zip(df_joined.columns, df_joined.dtypes) if dtype == pl.Date]
        for col in date_columns:
            df_joined = df_joined.with_columns(
                DataPipeline_Depth_1.convert_to_ordinal(df_joined[col]).alias(col)
            )

        duration_columns = ["approval_to_activation_min_diff", "creation_min_diff", "payment_max_diff"]
        for column in duration_columns:
            df_joined = df_joined.with_columns(
                pl.col(column).str.replace("d", "").cast(pl.Int64) * 1440
            )
        return df_joined

    def execute_pipeline(self):
        self.load_data()
        self.preprocess_data()
        merged_data = self.merge_data()
        return merged_data

if __name__ == "__main__":
    applprev_paths = [
        "Data/parquet_files/test/test_applprev_1_0.parquet",
        "Data/parquet_files/test/test_applprev_1_1.parquet",
        "Data/parquet_files/test/test_applprev_1_2.parquet"
    ]
    other_path = "Data/parquet_files/test/test_other_1.parquet"
    deposit_path = "Data/parquet_files/test/test_deposit_1.parquet"
    person_path = "Data/parquet_files/test/test_person_1.parquet"
    debitcard_path = "Data/parquet_files/test/test_debitcard_1.parquet"
    tax_registry_a_1_path = "Data/parquet_files/test/test_tax_registry_a_1.parquet"
    tax_registry_b_1_path = "Data/parquet_files/test/test_tax_registry_b_1.parquet"
    tax_registry_c_1_path = "Data/parquet_files/test/test_tax_registry_c_1.parquet"
    credit_bureau_a_1_paths = [
        "Data/parquet_files/test/test_credit_bureau_a_1_0.parquet",
        "Data/parquet_files/test/test_credit_bureau_a_1_1.parquet",
        "Data/parquet_files/test/test_credit_bureau_a_1_2.parquet",
        "Data/parquet_files/test/test_credit_bureau_a_1_3.parquet",
        "Data/parquet_files/test/test_credit_bureau_a_1_4.parquet"
    ]
    credit_bureau_b_1_path = "Data/parquet_files/test/test_credit_bureau_b_1.parquet"
    schema_path = "unified_schema_3.json"

    pipeline = DataPipeline_Depth_1(applprev_paths, other_path, deposit_path, person_path, debitcard_path, tax_registry_a_1_path, tax_registry_b_1_path, 
                                    tax_registry_c_1_path, credit_bureau_a_1_paths, credit_bureau_b_1_path, schema_path)
    Depth_1_test = pipeline.execute_pipeline()
    
class DataPipeline_Depth_2:
    def __init__(self, applprev_path, person_path, credit_bureau_a_2_paths, credit_bureau_b_2_path):
        self.applprev_path = applprev_path
        self.person_path = person_path
        self.credit_bureau_a_2_paths = credit_bureau_a_2_paths
        self.credit_bureau_b_2_path = credit_bureau_b_2_path

    def load_data(self):
        try:
            self.train_applprev_2 = pl.read_parquet(self.applprev_path).lazy()
            self.train_person_2 = pl.read_parquet(self.person_path).lazy()
            self.credit_bureau_b_2 = pl.read_parquet(self.credit_bureau_b_2_path).lazy()
        except Exception as e:
            print(f"Error loading data: {e}")
            raise

    def preprocess_applprev(self, data):
        data = data.group_by(["case_id", "num_group1"]).agg([
            pl.col("conts_type_509L").n_unique().alias("unique_contact_types"),
            pl.col("cacccardblochreas_147M").first().alias("first_cacccardblochreas_147M"),
            pl.col("credacc_cards_status_52L").first().alias("first_credacc_cards_status_52L")
        ]).with_columns(
            pl.col('first_credacc_cards_status_52L')
                .fill_null('UNKNOWN')
                .alias('status')
        ).with_columns([
            (pl.col('status') == 'ACTIVE').cast(pl.Int32).alias('is_active'),
            (pl.col('status') == 'CANCELLED').cast(pl.Int32).alias('is_cancelled')
        ]).group_by('case_id').agg([
            pl.col('unique_contact_types').max().alias('max_unique_contact_type'),
            pl.col('first_cacccardblochreas_147M').n_unique().alias('n_unique_cacccardblochreas_147M'),
            pl.sum('is_cancelled').alias('total_cancelled'),
            pl.sum('is_active').alias('total_active')
        ])
        return data

    def preprocess_person(self, data):
        data = data.group_by('case_id').agg([
            pl.col('addres_district_368M').n_unique().alias('n_unique_addres_district_368M'),
            pl.col('addres_role_871L').n_unique().alias('n_unique_addres_role_871L'),
            pl.col('addres_zip_823M').n_unique().alias('n_unique_addres_zip_823M'),
            pl.col('conts_role_79M').n_unique().alias('n_unique_conts_role_79M'),
            pl.col('empls_economicalst_849M').n_unique().alias('n_unique_empls_economicalst_849M'),
            pl.col('empls_employedfrom_796D').n_unique().alias('n_unique_empls_employedfrom_796D'),
            pl.col('empls_employer_name_740M').n_unique().alias('n_unique_empls_employer_name_740M'),
            pl.col('relatedpersons_role_762T').n_unique().alias('n_unique_relatedpersons_role_762T')
        ])
        return data
    
    def preprocess_bureau_a_file(self, path):
        data = pl.read_parquet(path).lazy()
        detailed_data = data.group_by(["case_id", "num_group1"]).agg([
            pl.col("collater_valueofguarantee_1124L").sum().alias("sum_collater_valueofguarantee_1124L"),
            pl.col("collater_valueofguarantee_876L").sum().alias("sum_collater_valueofguarantee_876L"),
            pl.col("pmts_overdue_1140A").sum().alias("sum_pmts_overdue_1140A"),
            pl.col("pmts_overdue_1152A").sum().alias("sum_pmts_overdue_1152A"),
            pl.col("pmts_dpd_1073P").mean().alias("avg_pmts_dpd_1073P"),
            pl.col("pmts_dpd_303P").mean().alias("avg_pmts_dpd_303P"),
            pl.col("collater_typofvalofguarant_298M").n_unique().alias("n_unique_collater_typofvalofguarant_298M"),
            pl.col("collater_typofvalofguarant_407M").n_unique().alias("n_unique_collater_typofvalofguarant_407M"),
            pl.col("collaterals_typeofguarante_359M").n_unique().alias("n_unique_collaterals_typeofguarante_359M"),
            pl.col("collaterals_typeofguarante_669M").n_unique().alias("n_unique_collaterals_typeofguarante_669M"),
            pl.col("subjectroles_name_541M").n_unique().alias("n_unique_subjectroles_name_541M"),
            pl.col("subjectroles_name_838M").n_unique().alias("n_unique_subjectroles_name_838M"),
            pl.col("pmts_month_158T").n_unique().alias("n_unique_pmts_month_158T"),
            pl.col("pmts_month_706T").n_unique().alias("n_unique_pmts_month_706T"),
            pl.col("pmts_year_1139T").n_unique().alias("n_unique_pmts_year_1139T"),
            pl.col("pmts_year_507T").n_unique().alias("n_unique_pmts_year_507T")
        ])
        return detailed_data

    def preprocess_bureau_a(self, paths):
        detailed_data_list = [self.preprocess_bureau_a_file(path) for path in paths]
        combined_detailed_data = pl.concat(detailed_data_list)
        data = combined_detailed_data.group_by("case_id").agg([
            pl.sum("sum_collater_valueofguarantee_1124L").alias("total_sum_collater_valueofguarantee_1124L"),
            pl.sum("sum_collater_valueofguarantee_876L").alias("total_sum_collater_valueofguarantee_876L"),
            pl.sum("sum_pmts_overdue_1140A").alias("total_sum_pmts_overdue_1140A"),
            pl.sum("sum_pmts_overdue_1152A").alias("total_sum_pmts_overdue_1152A"),
            pl.mean("avg_pmts_dpd_1073P").alias("overall_avg_pmts_dpd_1073P"),
            pl.mean("avg_pmts_dpd_303P").alias("overall_avg_pmts_dpd_303P"),
            pl.max("n_unique_collater_typofvalofguarant_298M").alias("max_n_unique_collater_typofvalofguarant_298M"),
            pl.max("n_unique_collater_typofvalofguarant_407M").alias("max_n_unique_collater_typofvalofguarant_407M"),
            pl.max("n_unique_collaterals_typeofguarante_359M").alias("max_n_unique_collaterals_typeofguarante_359M"),
            pl.max("n_unique_collaterals_typeofguarante_669M").alias("max_n_unique_collaterals_typeofguarante_669M"),
            pl.max("n_unique_subjectroles_name_541M").alias("max_n_unique_subjectroles_name_541M"),
            pl.max("n_unique_subjectroles_name_838M").alias("max_n_unique_subjectroles_name_838M"),
            pl.max("n_unique_pmts_month_158T").alias("max_n_unique_pmts_month_158T"),
            pl.max("n_unique_pmts_month_706T").alias("max_n_unique_pmts_month_706T"),
            pl.max("n_unique_pmts_year_1139T").alias("max_n_unique_pmts_year_1139T"),
            pl.max("n_unique_pmts_year_507T").alias("max_n_unique_pmts_year_507T")
        ])
        return data

    def preprocess_bureau_b(self, data):
        detailed_data_2 = data.group_by(["case_id", "num_group1"]).agg([
            pl.col("pmts_date_1107D").n_unique().alias("unique_pmts_date_1107D"),
            pl.col("pmts_dpdvalue_108P").sum().alias("sum_pmts_dpdvalue_108P"),
            pl.col("pmts_pmtsoverdue_635A").sum().alias("sum_pmts_pmtsoverdue_635A")
        ])
        data = detailed_data_2.group_by("case_id").agg([
            pl.col("unique_pmts_date_1107D").mean().alias("avg_unique_pmts_date"),
            pl.col("sum_pmts_dpdvalue_108P").max().alias("max_sum_pmts_dpdvalue"),
            pl.col("sum_pmts_pmtsoverdue_635A").sum().alias("total_sum_pmts_overdue")
        ])
        return data
        
    def merge_data(self, applprev_data, person_data, bureau_a_data, bureau_b_data):
        merged_data = applprev_data.join(person_data, on='case_id', how='left')
        merged_data = merged_data.join(bureau_a_data, on='case_id', how='left')
        merged_data = merged_data.join(bureau_b_data, on='case_id', how='left')
        return merged_data

    def execute_pipeline(self):
        self.load_data()
        applprev_data = self.preprocess_applprev(self.train_applprev_2)
        person_data = self.preprocess_person(self.train_person_2)
        bureau_a_data = self.preprocess_bureau_a(self.credit_bureau_a_2_paths)
        bureau_b_data = self.preprocess_bureau_b(self.credit_bureau_b_2)
        depth_2 = self.merge_data(applprev_data, person_data, bureau_a_data, bureau_b_data)
        return depth_2.collect()

if __name__ == "__main__":
    applprev_path = "Data/parquet_files/train/train_applprev_2.parquet"
    person_path = "Data/parquet_files/train/train_person_2.parquet"
    credit_bureau_a_2_paths = [
        "Data/parquet_files/train/train_credit_bureau_a_2_0.parquet",
        "Data/parquet_files/train/train_credit_bureau_a_2_1.parquet",
        "Data/parquet_files/train/train_credit_bureau_a_2_2.parquet",
        "Data/parquet_files/train/train_credit_bureau_a_2_3.parquet",
        "Data/parquet_files/train/train_credit_bureau_a_2_4.parquet",
        "Data/parquet_files/train/train_credit_bureau_a_2_5.parquet",
        "Data/parquet_files/train/train_credit_bureau_a_2_6.parquet",
        "Data/parquet_files/train/train_credit_bureau_a_2_7.parquet",
        "Data/parquet_files/train/train_credit_bureau_a_2_8.parquet",
        "Data/parquet_files/train/train_credit_bureau_a_2_9.parquet",
        "Data/parquet_files/train/train_credit_bureau_a_2_10.parquet"
    ]
    credit_bureau_b_2_path = "Data/parquet_files/train/train_credit_bureau_b_2.parquet"
    pipeline = DataPipeline_Depth_2(applprev_path, person_path, credit_bureau_a_2_paths, credit_bureau_b_2_path)
    Depth_2 = pipeline.execute_pipeline()
    
class DataPipeline_Depth_2:
    def __init__(self, applprev_path, person_path, credit_bureau_a_2_paths, credit_bureau_b_2_path):
        self.applprev_path = applprev_path
        self.person_path = person_path
        self.credit_bureau_a_2_paths = credit_bureau_a_2_paths
        self.credit_bureau_b_2_path = credit_bureau_b_2_path

    def load_data(self):
        try:
            self.test_applprev_2 = pl.scan_parquet(self.applprev_path)
            self.test_person_2 = pl.scan_parquet(self.person_path)
            self.credit_bureau_a_2 = pl.concat([pl.scan_parquet(path) for path in self.credit_bureau_a_2_paths])
            self.credit_bureau_b_2 = pl.scan_parquet(self.credit_bureau_b_2_path)
        except Exception as e:
            print(f"Error loading data: {e}")
            raise

    def preprocess_applprev(self, data):
        data = data.group_by(["case_id", "num_group1"]).agg([
            pl.col("conts_type_509L").unique().count().alias("unique_contact_types"),
            pl.col("cacccardblochreas_147M").max().alias("first_cacccardblochreas_147M"),
            pl.col("credacc_cards_status_52L").max().alias("first_credacc_cards_status_52L")
        ]).with_columns(
            pl.col('first_credacc_cards_status_52L')
                .fill_null('UNKNOWN')
                .alias('status')
        ).with_columns([
            (pl.col('status') == 'ACTIVE').cast(pl.Int32).alias('is_active'),
            (pl.col('status') == 'CANCELLED').cast(pl.Int32).alias('is_cancelled')
        ]).group_by('case_id').agg([
            pl.col('unique_contact_types').max().alias('max_unique_contact_type'),
            pl.col('first_cacccardblochreas_147M').n_unique().alias('n_unique_cacccardblochreas_147M'),
            pl.sum('is_cancelled').alias('total_cancelled'),
            pl.sum('is_active').alias('total_active')
        ])
        return data

    def preprocess_person(self, data):
        data = data.group_by('case_id').agg([
            pl.col('addres_district_368M').n_unique().alias('n_unique_addres_district_368M'),
            pl.col('addres_role_871L').n_unique().alias('n_unique_addres_role_871L'),
            pl.col('addres_zip_823M').n_unique().alias('n_unique_addres_zip_823M'),
            pl.col('conts_role_79M').n_unique().alias('n_unique_conts_role_79M'),
            pl.col('empls_economicalst_849M').n_unique().alias('n_unique_empls_economicalst_849M'),
            pl.col('empls_employedfrom_796D').n_unique().alias('n_unique_empls_employedfrom_796D'),
            pl.col('empls_employer_name_740M').n_unique().alias('n_unique_empls_employer_name_740M'),
            pl.col('relatedpersons_role_762T').n_unique().alias('n_unique_relatedpersons_role_762T')
        ])
        return data
    
    def preprocess_bureau_a(self, data):
        detailed_data = data.group_by(["case_id", "num_group1"]).agg([
            pl.col("collater_valueofguarantee_1124L").sum().alias("sum_collater_valueofguarantee_1124L"),
            pl.col("collater_valueofguarantee_876L").sum().alias("sum_collater_valueofguarantee_876L"),
            pl.col("pmts_overdue_1140A").sum().alias("sum_pmts_overdue_1140A"),
            pl.col("pmts_overdue_1152A").sum().alias("sum_pmts_overdue_1152A"),
            pl.col("pmts_dpd_1073P").mean().alias("avg_pmts_dpd_1073P"),
            pl.col("pmts_dpd_303P").mean().alias("avg_pmts_dpd_303P"),
            pl.col("collater_typofvalofguarant_298M").n_unique().alias("n_unique_collater_typofvalofguarant_298M"),
            pl.col("collater_typofvalofguarant_407M").n_unique().alias("n_unique_collater_typofvalofguarant_407M"),
            pl.col("collaterals_typeofguarante_359M").n_unique().alias("n_unique_collaterals_typeofguarante_359M"),
            pl.col("collaterals_typeofguarante_669M").n_unique().alias("n_unique_collaterals_typeofguarante_669M"),
            pl.col("subjectroles_name_541M").n_unique().alias("n_unique_subjectroles_name_541M"),
            pl.col("subjectroles_name_838M").n_unique().alias("n_unique_subjectroles_name_838M"),
            pl.col("pmts_month_158T").n_unique().alias("n_unique_pmts_month_158T"),
            pl.col("pmts_month_706T").n_unique().alias("n_unique_pmts_month_706T"),
            pl.col("pmts_year_1139T").n_unique().alias("n_unique_pmts_year_1139T"),
            pl.col("pmts_year_507T").n_unique().alias("n_unique_pmts_year_507T")
        ])
        data = detailed_data.group_by("case_id").agg([
            pl.sum("sum_collater_valueofguarantee_1124L").alias("total_sum_collater_valueofguarantee_1124L"),
            pl.sum("sum_collater_valueofguarantee_876L").alias("total_sum_collater_valueofguarantee_876L"),
            pl.sum("sum_pmts_overdue_1140A").alias("total_sum_pmts_overdue_1140A"),
            pl.sum("sum_pmts_overdue_1152A").alias("total_sum_pmts_overdue_1152A"),
            pl.mean("avg_pmts_dpd_1073P").alias("overall_avg_pmts_dpd_1073P"),
            pl.mean("avg_pmts_dpd_303P").alias("overall_avg_pmts_dpd_303P"),
            pl.max("n_unique_collater_typofvalofguarant_298M").alias("max_n_unique_collater_typofvalofguarant_298M"),
            pl.max("n_unique_collater_typofvalofguarant_407M").alias("max_n_unique_collater_typofvalofguarant_407M"),
            pl.max("n_unique_collaterals_typeofguarante_359M").alias("max_n_unique_collaterals_typeofguarante_359M"),
            pl.max("n_unique_collaterals_typeofguarante_669M").alias("max_n_unique_collaterals_typeofguarante_669M"),
            pl.max("n_unique_subjectroles_name_541M").alias("max_n_unique_subjectroles_name_541M"),
            pl.max("n_unique_subjectroles_name_838M").alias("max_n_unique_subjectroles_name_838M"),
            pl.max("n_unique_pmts_month_158T").alias("max_n_unique_pmts_month_158T"),
            pl.max("n_unique_pmts_month_706T").alias("max_n_unique_pmts_month_706T"),
            pl.max("n_unique_pmts_year_1139T").alias("max_n_unique_pmts_year_1139T"),
            pl.max("n_unique_pmts_year_507T").alias("max_n_unique_pmts_year_507T")
        ])
        return data

    def preprocess_bureau_b(self, data):
        detailed_data_2 = data.group_by(["case_id", "num_group1"]).agg([
            pl.col("pmts_date_1107D").n_unique().alias("unique_pmts_date_1107D"),
            pl.col("pmts_dpdvalue_108P").sum().alias("sum_pmts_dpdvalue_108P"),
            pl.col("pmts_pmtsoverdue_635A").sum().alias("sum_pmts_pmtsoverdue_635A")
        ])
        data = detailed_data_2.group_by("case_id").agg([
            pl.col("unique_pmts_date_1107D").mean().alias("avg_unique_pmts_date"),
            pl.col("sum_pmts_dpdvalue_108P").max().alias("max_sum_pmts_dpdvalue"),
            pl.col("sum_pmts_pmtsoverdue_635A").sum().alias("total_sum_pmts_overdue")
        ])
        return data
        
    def merge_data(self, applprev_data, person_data, bureau_a_data, bureau_b_data):
        merged_data = applprev_data.join(person_data, on='case_id', how='left')
        merged_data = merged_data.join(bureau_a_data, on='case_id', how='left')
        merged_data = merged_data.join(bureau_b_data, on='case_id', how='left')
        return merged_data

    def execute_pipeline(self):
        self.load_data()
        applprev_data = self.preprocess_applprev(self.test_applprev_2)
        person_data = self.preprocess_person(self.test_person_2)
        bureau_a_data = self.preprocess_bureau_a(self.credit_bureau_a_2)
        bureau_b_data = self.preprocess_bureau_b(self.credit_bureau_b_2)
        depth_2 = self.merge_data(applprev_data, person_data, bureau_a_data, bureau_b_data)
        return depth_2.collect()

if __name__ == "__main__":
    applprev_path = "Data/parquet_files/test/test_applprev_2.parquet"
    person_path = "Data/parquet_files/test/test_person_2.parquet"
    credit_bureau_a_2_paths = [
        "Data/parquet_files/test/test_credit_bureau_a_2_0.parquet",
        "Data/parquet_files/test/test_credit_bureau_a_2_1.parquet",
        "Data/parquet_files/test/test_credit_bureau_a_2_2.parquet",
        "Data/parquet_files/test/test_credit_bureau_a_2_3.parquet",
        "Data/parquet_files/test/test_credit_bureau_a_2_4.parquet",
        "Data/parquet_files/test/test_credit_bureau_a_2_5.parquet",
        "Data/parquet_files/test/test_credit_bureau_a_2_6.parquet",
        "Data/parquet_files/test/test_credit_bureau_a_2_7.parquet",
        "Data/parquet_files/test/test_credit_bureau_a_2_8.parquet",
        "Data/parquet_files/test/test_credit_bureau_a_2_9.parquet",
        "Data/parquet_files/test/test_credit_bureau_a_2_10.parquet",
        "Data/parquet_files/test/test_credit_bureau_a_2_11.parquet"
    ]
    credit_bureau_b_2_path = "Data/parquet_files/test/test_credit_bureau_b_2.parquet"
    pipeline = DataPipeline_Depth_2(applprev_path, person_path, credit_bureau_a_2_paths, credit_bureau_b_2_path)
    Depth_2_test = pipeline.execute_pipeline()
    
Internal_Final = Depth_0.join(Depth_1, on='case_id', how='left')
Internal_Final = Internal_Final.join(Depth_2, on='case_id', how='left')

Internal_Final_test = Depth_0_test.join(Depth_1_test, on='case_id', how='left')
Internal_Final_test = Internal_Final_test.join(Depth_2_test, on='case_id', how='left')

df = Internal_Final.to_pandas()

X = df.drop(['target', 'case_id'], axis=1)
y = df['target']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
train_data = lgb.Dataset(X_train, label=y_train)
valid_data = lgb.Dataset(X_test, label=y_test)

params = {
    'boosting_type': 'gbdt',
    'objective': 'binary',
    'metric': 'auc',
    'num_leaves': 25,
    'min_data_in_leaf': 100,
    'learning_rate': 0.1,
    'feature_fraction': 0.8,
    'bagging_fraction': 0.7,
    'bagging_freq': 10,
    'verbose': 1,
    'n_jobs': -1,
    'reg_alpha': 0.1,
    'reg_lambda': 10,
    'random_state': 42
}

num_round = 100

callbacks = [lgb.early_stopping(stopping_rounds=10, verbose=True)]

bst = lgb.train(params,
                train_data,
                num_boost_round=num_round,
                valid_sets=[valid_data],
                callbacks=callbacks)

def convert_strings(df: pd.DataFrame) -> pd.DataFrame:
    for col in df.columns:
        if df[col].dtype.name in ['object', 'string']:
            df[col] = df[col].astype("string").astype('category')
            current_categories = df[col].cat.categories
            new_categories = current_categories.to_list() + ["Unknown"]
            new_dtype = CategoricalDtype(categories=new_categories, ordered=True)
            df[col] = df[col].astype(new_dtype)
    return df

X_submission = Internal_Final_test.to_pandas()
X_submission = convert_strings(X_submission)

categorical_cols = X_submission.select_dtypes(include=['category']).columns

for col in categorical_cols:
    train_categories = set(X_train[col].cat.categories)
    submission_categories = set(X_submission[col].cat.categories)
    new_categories = submission_categories - train_categories
    X_submission.loc[X_submission[col].isin(new_categories), col] = "Unknown"
    new_dtype = CategoricalDtype(categories=train_categories, ordered=True)
    X_submission[col] = X_submission[col].astype(new_dtype)

y_submission_pred = bst.predict(X_submission, num_iteration=bst.best_iteration, predict_disable_shape_check=True)

submission = pd.DataFrame({
    "case_id": X_submission["case_id"].to_numpy(),
    "score": y_submission_pred
}).set_index('case_id')
submission.to_csv("./submission.csv")