In [1]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import polars as pl
from argparse import Namespace
from pathlib import Path
import json
import sys
sys.path.append('/kaggle/input/mycredit/')

from dataset.feature.preprocessor import Preprocessor
from dataset.feature.feature_loader import FeatureLoader
from dataset.const import TOPICS
from dataset.datainfo import RawInfo, RawReader
from dataset.feature.util import optimize_dataframe
import lightgbm as lgb


In [2]:
def read_json(path: str):
    with open(path, 'r') as f:
        return json.load(f)

# load model
MODEL_NAME = 'small_feature'
MODEL_PATH = Path(f'/kaggle/input/mycredit/data/model/{MODEL_NAME}')
model = lgb.LGBMClassifier()
model = lgb.Booster(model_file=MODEL_PATH / 'model.pkl')
artifacts = read_json(MODEL_PATH / 'artifacts.json')

In [3]:
import gc
import shutil
import polars as pl
import os

from dataset.datainfo import RawInfo, RawReader, DATA_PATH
from dataset.feature.feature import *
from dataset.feature.util import optimize_dataframe
from dataset.const import TOPICS, DEPTH_2_TO_1_QUERY, CB_A_PREPREP_QUERY


class Preprocessor:

    def __init__(self, type_: str, conf: dict = None):
        self.raw_info = RawInfo(conf)
        self.type_ = type_

    def preprocess(self):
        for topic in TOPICS:
            gc.collect()
            if topic.depth <= 1 and topic.name not in DEPTH_2_TO_1_QUERY:
                print(f'[+] Memory optimization {topic.name}')
                self._memory_opt(topic.name, depth=topic.depth)
            elif topic.depth <= 1 and topic.name in DEPTH_2_TO_1_QUERY:
                # skip {topic.name} because it is in DEPTH_2_TO_1_QUERY
                pass
            elif topic.depth == 2 and topic.name in DEPTH_2_TO_1_QUERY:
                print(f'[+] Preprocessing {topic.name}, depth={topic.depth}')
                query = DEPTH_2_TO_1_QUERY[topic.name]
                if topic.name == 'credit_bureau_a':
                    self._preprocess_cb_a(topic.name, query)
                else:
                    self._preprocess_each(topic.name, query)
            elif topic.depth == 2 and topic.name not in DEPTH_2_TO_1_QUERY:
                raise ValueError(f'No query for {topic.name} in DEPTH_2_TO_1_QUERY but it is depth=2 topic')

    def _memory_opt(self, topic: str, depth: int):
        data = self.raw_info.read_raw(topic, depth=depth, reader=RawReader('polars'), type_=self.type_)
        data = optimize_dataframe(data)
        self.raw_info.save_as_prep(data, topic, depth=depth, type_=self.type_)

    def _join_depth2_0(self, depth1, depth2):
        depth2 = depth2.filter(pl.col('num_group2') == 0).drop('num_group2')
        depth1 = depth1.join(depth2, on=['case_id', 'num_group1'], how='left')
        return depth1

    def _preprocess_each(self, topic: str, query: str):
        depth2 = self.raw_info.read_raw(topic, depth=2, reader=RawReader('polars'), type_=self.type_)
        depth1 = self.raw_info.read_raw(topic, depth=1, reader=RawReader('polars'), type_=self.type_)
        temp = pl.SQLContext(data=depth2).execute(query, eager=True)
        depth1 = depth1.join(temp, on=['case_id', 'num_group1'], how='left')
        depth1 = self._join_depth2_0(depth1, depth2)

        depth1 = optimize_dataframe(depth1)
        self.raw_info.save_as_prep(depth1, topic, depth=1, type_=self.type_)

    def _preprocess_cb_a(self, topic: str, query: str):
        temp_path = DATA_PATH / 'parquet_preps' / self.type_
        os.makedirs(temp_path, exist_ok=True)
        os.makedirs(temp_path/'agg', exist_ok=True)
        os.makedirs(temp_path/'depth2_0', exist_ok=True)

        iter = self.raw_info.read_raw_iter(topic, depth=2, reader=RawReader('polars'), type_=self.type_)
        for i, depth2 in enumerate(iter):
            depth2 = optimize_dataframe(depth2)

            depth2_0 = depth2.filter(pl.col('num_group2') == 0).drop('num_group2')
            temp_file = temp_path / 'depth2_0'/ f"{self.type_}_{topic}_1_temp_{i}.parquet"
            depth2_0.write_parquet(temp_file)
            del depth2_0

            depth2 = pl.SQLContext(data=depth2).execute(
                CB_A_PREPREP_QUERY,
                eager=True,
            )
            depth2 = optimize_dataframe(depth2)
            depth2 = pl.SQLContext(data=depth2).execute(query, eager=True)
            depth2 = optimize_dataframe(depth2)
            temp_file = temp_path / 'agg' / f"{self.type_}_{topic}_1_temp_{i}.parquet"
            depth2.write_parquet(temp_file)
            del depth2
            gc.collect()            

        depth1 = self.raw_info.read_raw(topic, depth=1, reader=RawReader('polars'), type_=self.type_)
        depth1 = optimize_dataframe(depth1)

        files = [f for f in os.listdir(temp_path / 'agg') if f.endswith('.parquet')]
        dfs = [pl.read_parquet(temp_path/'agg'/file) for file in files]
        depth2_temp = pl.concat(dfs, how='vertical_relaxed')
        depth1 = depth1.join(depth2_temp, on=['case_id', 'num_group1'], how='left')
        del depth2_temp
        gc.collect()

        files = [f for f in os.listdir(temp_path / 'depth2_0') if f.endswith('.parquet')]
        dfs = [pl.read_parquet(temp_path / 'depth2_0' / file) for file in files]
        depth2_temp = pl.concat(dfs, how='vertical_relaxed')
        depth1 = depth1.join(depth2_temp, on=['case_id', 'num_group1'], how='left')
        del depth2_temp
        gc.collect()

        self.raw_info.save_as_prep(depth1, topic, depth=1, type_=self.type_)

        # remove temp files
        shutil.rmtree(temp_path / 'agg')
        shutil.rmtree(temp_path / 'depth2_0')


In [4]:
%%time
type_ = 'train'
conf = Namespace(
    **{"data_path": '/kaggle/input/home-credit-credit-risk-model-stability',
        "raw_format": "parquet",
      })
prep = Preprocessor(type_, conf=conf)
prep.preprocess()

[+] Preprocessing applprev, depth=2
[+] Preprocessing credit_bureau_a, depth=2
[+] Preprocessing credit_bureau_b, depth=2
[+] Preprocessing person, depth=2
[+] Memory optimization debitcard
[+] Memory optimization deposit
[+] Memory optimization other
[+] Memory optimization tax_registry_a
[+] Memory optimization tax_registry_b
[+] Memory optimization tax_registry_c
[+] Memory optimization static
[+] Memory optimization static_cb
CPU times: user 12min 30s, sys: 3min 37s, total: 16min 7s
Wall time: 16min 22s


In [5]:
if type_ == 'test':
    tax_c = pl.read_parquet(f'/kaggle/working/data/home-credit-credit-risk-model-stability/parquet_preps/{type_}/{type_}_tax_registry_c_1.parquet')
    tax_c = tax_c.with_columns(pl.col('pmtamount_36A').cast(pl.Int16))
    tax_c.write_parquet(f'/kaggle/working/data/home-credit-credit-risk-model-stability/parquet_preps/{type_}/{type_}_tax_registry_c_1.parquet')
    del tax_c
    gc.collect()

In [6]:
import gc
import json
import os
import time
import polars as pl
from tqdm import tqdm
from dataset.feature.feature import *
from dataset.feature.feature_definer import FEATURE_DEF_PATH
from dataset.feature.feature import *
from dataset.feature.util import optimize_dataframe

from dataset.datainfo import RawInfo, RawReader, DATA_PATH
from dataset.const import TOPICS, Topic, KEY_COL, DATE_COL, TARGET_COL

from pathlib import Path
FEATURE_DEF_PATH = Path('/kaggle/input/mycredit/data/feature_definition_new')

class FeatureLoader:
    def __init__(self, topic: Topic, type: str, conf: dict = None):
        self.topic = topic
        self.type = type
        self.data = self._load_data(type_=type, stage='prep', rawinfo=RawInfo(conf))

    def _load_data(
        self,
        rawinfo,
        type_='train',
        stage='prep',
        reader=RawReader('polars'),
    ) -> pl.DataFrame:
        base_columns = [*KEY_COL, *DATE_COL]
        if type_ == 'train':
            base_columns += TARGET_COL
        data = rawinfo.read_raw(
            self.topic.name,
            depth=self.topic.depth,
            reader=reader,
            type_=type_,
            stage=stage,
        )
        base = rawinfo.read_raw('base', reader=reader, type_=type_)
        base = base.with_columns(pl.col(KEY_COL).cast(pl.Int32))
        return data.join(base.select(base_columns), on=KEY_COL, how='inner')

    def load_features(self, feature_names: List[str] = None) -> List[Feature]:
        if not os.path.exists(FEATURE_DEF_PATH / f'{self.topic.name}.json'):
            raise FileNotFoundError(
                f'Feature definition for {self.topic.name} not found.'
            )

        with open(FEATURE_DEF_PATH / f'{self.topic.name}.json') as f:
            features = json.load(f)

        if feature_names is None:
            return [Feature.from_dict(feature) for feature in features.values()]
        return [Feature.from_dict(features[feature_name])
            for feature_name in features.keys()
            if feature_name in feature_names
               ]

    def load_feature_data(self, features, verbose=False) -> pl.DataFrame:
        query = [
            f'cast({feat.query} as {feat.agg.data_type}) as {feat.name}'
            for feat in features
        ]
        target_str = ', frame.target ' if self.type == 'train' else ''
        if verbose:
            for q in query:
                print(f'[*] Query: {q}')
        temp = pl.SQLContext(frame=self.data).execute(
            f"""SELECT frame.case_id{target_str}
                , {', '.join(query)}
            from frame
            group by frame.case_id{target_str}
            """,
            eager=True,
        )
        temp = optimize_dataframe(temp)
        return temp

    def load_feature_data_batch(self, features, batch_size, verbose=False, skip=0):
        """
        Load feature data in batch
        """
        start_time = time.time()
        for i, index in enumerate(tqdm(range(0, len(features), batch_size))):
            if i < skip:
                yield None
            else:
                yield self.load_feature_data(
                    features[index : index + batch_size], verbose=verbose
                )
        print(f'[*] Elapsed time: {time.time() - start_time:.4f} sec')


In [None]:
%%time
raw_info = RawInfo(conf)
base = raw_info.read_raw('base', reader=RawReader('polars'), type_=type_)
base = base.select([pl.col('case_id').cast(pl.Int32), 'date_decision'])
depth0_topics = [topic for topic in TOPICS if topic.depth == 0]
for topic in depth0_topics:
    print(f'[*] Processing {topic.name}...')
    data = raw_info.read_raw(topic.name, reader=RawReader('polars'), type_=type_)
    data = optimize_dataframe(data)
    base = base.join(data, on='case_id', how='left')
    del data
    
depth1_topics = [topic for topic in TOPICS if topic.depth == 1]
for topic in depth1_topics:
    print(f'[*] Processing {topic.name}...')
    selected = artifacts['features']
    fl = FeatureLoader(topic, type=type_, conf=conf)
    features = fl.load_features(selected)
    for data in fl.load_feature_data_batch(features, 33):
        dup_keyword = '_if_1_eq_1_then_num_group1_'
        dupable_col = [c for c in data.columns if dup_keyword in c]
        print(dupable_col)
        data = data.rename({col: f'{col}_{topic.name}' for col in dupable_col})
        print('shape:', data.shape)
        data = optimize_dataframe(data)
        base = base.join(data, on='case_id', how='left')
        base = base.drop('target_right')
        del data
    del fl
    gc.collect()

[*] Processing static...
[*] Processing static_cb...
[*] Processing applprev...


  0%|          | 0/2 [00:00<?, ?it/s]

[]
shape: (1221522, 35)


 50%|█████     | 1/2 [00:10<00:10, 10.12s/it]

[]
shape: (1221522, 26)


100%|██████████| 2/2 [00:21<00:00, 10.93s/it]


[*] Elapsed time: 21.8682 sec
[*] Processing credit_bureau_a...


In [None]:
date_cols = [c for c in base.columns if (c.startswith('max__if') and c.endswith('d__')) or c.endswith('D')]
for c in date_cols:
    base = base.with_columns(
        ((pl.col('date_decision').cast(pl.Date) - pl.col(c).cast(pl.Date)).fill_null(0).cast(pl.Int64) / 86400000).alias(c)
    )

In [None]:
submission_df = base.select('case_id').to_pandas()
base = base.drop(['case_id']).to_pandas()
base = base[artifacts['features']]
base = base.astype({c: 'category' for i, c in enumerate(base.columns) if i in artifacts['cat_indicis']})
base = base.astype({c: 'float' for c in base.dtypes[base.dtypes=='O'].index})
submission_df["score"] = model.predict(base)
submission_df.to_csv("/kaggle/working/submission.csv", index=False)

In [None]:
submission_df

EOD