In [22]:
from functools import partial
import scipy.sparse as sparse
import multiprocessing as mp
import pandas as pd
import numpy as np
import tqdm
import copy
import gc
import os
import sys
sys.path.append('../../../code/utils')
sys.path.append('../../../code/pipeline')
sys.path.append('../../../code')
import data_utils as du
import perf_utils as pu
import data_jointer as dj
import config

In [23]:
clickhist_folder = os.path.join(config.DATA_DIR, "click_history/simple_cross/byUserFeatureName")
clickrate_folder = os.path.join(config.DATA_DIR, "clickrate_bs/simple_cross/byUserFeatureName")


def click_history_fold_dir(mode, num_folds):
    folder = "{}[{}_{}]".format(clickhist_folder, mode, num_folds)
    return folder


def click_rate_fold_dir(mode, num_folds):
    folder = "{}[{}_{}]".format(clickrate_folder, mode, num_folds)
    return folder


def click_rate_path(mode, num_folds, fold_index, ad_feat_name, user_feat_name):
    folder = click_rate_fold_dir(mode, num_folds)
    folder = os.path.join(folder, str(fold_index),  "[featureName='{}']".format(user_feat_name))
    clickrate_file = "[adFeatureName='{}'].csv".format(ad_feat_name)
    clickrate_filepath = os.path.join(folder, clickrate_file)
    return clickrate_filepath


def load_split_indices(mode, num_folds):
    fold_dir = click_history_fold_dir(mode, num_folds=n_splits)
    index_file = "indices.pkl"
    index_path = os.path.join(fold_dir, index_file)
    split_indices = du.load_pickle(index_path)
    return split_indices


def load_clickrate(mode, num_folds, fold_index, ad_feat_name, user_feat_name):
    in_path = click_rate_path(mode, num_folds, fold_index, ad_feat_name, user_feat_name)
    df_clickrate = pd.read_csv(in_path)
    return df_clickrate


def batch_load_clickrate(num_folds, ad_feat_name, user_feat_name):
    quick_load = partial(load_clickrate, mode="StratifiedKFold", num_folds=num_folds, 
                         ad_feat_name=ad_feat_name, user_feat_name=user_feat_name)
    df_clickrate = None
    
    for i in range(n_splits):
        df_new = quick_load(fold_index=i)
        df_new["fold"] = i
        if df_clickrate is None:
            df_clickrate = df_new
        else:
            df_clickrate = pd.concat([df_clickrate, df_new], ignore_index=True)
        del df_new
        gc.collect()
        
    df_clickrate["fold"] = df_clickrate["fold"].astype(int)
    return df_clickrate


def insert_valid_fold_index(df, fold_indices):
    df = df.copy()
    df["fold"] = -1
    for i, (train_index, valid_index) in enumerate(fold_indices):
        df.loc[valid_index, "fold"] = i
    return df

In [24]:
pairs = [('productType', 'kw1'),  # 'kw1' looks very overfitting prone, to be decide whether to keep it
         ('productType', 'kw2'),
         ('productType', 'kw3'),
         ('productType', 'topic1'),
         ('aid', 'topic2'),
         ('productType', 'topic2'),
         ('advertiserId', 'interest1'),
         ('aid', 'interest2'),
         ('creativeSize', 'interest2'), 
         ('campaignId', 'interest4'),  # whether to keep it? 
         ('aid', 'interest5'),  
         ('aid', 'ct'),
         ('aid', 'os')]

In [25]:
# define jointer
ad_jointer = dj.PandasPandasJointer("aid")
user_jointer = dj.PandasPandasJointer("uid")

# load DataFrame
df_train = du.load_raw_data("train")
df_ad = du.load_raw_data("ad")

# load train/valid split indices
mode = "StratifiedKFold"
n_splits = 5
split_indices = load_split_indices(mode, n_splits)

# insert fold info
df_train = insert_valid_fold_index(df_train, split_indices)

# join ad features
df_train = ad_jointer.join(df_train, df_ad)

In [26]:
for ad_feat_name, user_feat_name in pairs:
    print("Start Processing '{}' x '{}'".format(ad_feat_name, user_feat_name))
    
    # load clickrates
    df_clickrate = batch_load_clickrate(n_splits, ad_feat_name, user_feat_name)

    # reset type to save memory
    df_clickrate = df_clickrate.astype({
        "bs_clickrate": np.float32,
        "impression": np.int32,
        "user_val": str
    })

    # init clickrates dict
    with pu.profiler("constructing clickrate dictionary"):
        ckr_dict = {(x["fold"], x["ad_val"], x["user_val"]): (x["bs_clickrate"], x["impression"]) 
                    for i, x in df_clickrate.iterrows()}

    # load user feature
    df_user = du.load_user_feature(user_feat_name)

    # join user feature
    train = df_train.copy()
    train = user_jointer.join(train, df_user)

    # split list feature
    with pu.profiler("splitting values"):
        train["val_list"] = train[user_feat_name].fillna("[nan]").apply(lambda x: x.split())

    # join clickrates
    with pu.profiler("joining click info"):
        ckrs = []
        for (fold, aval, uvals) in tqdm.tqdm(zip(train["fold"], train[ad_feat_name], train["val_list"]), total=df_train.shape[0]):
            row_ckrs = [ckr_dict[(fold, aval, uval)] for uval in uvals]
            ckrs.append(row_ckrs)

    # reset DataFrame
    with pu.profiler("reseting DataFrame"):
        train = train[["aid", "uid", "fold"]]
        train.loc[:, "click_list"] = ckrs
        gc.collect()

    # save intermediate result
    with pu.profiler("saving intermediate result"):
        agg_folder = os.path.join(config.DATA_DIR, "intermediate/clickrate_aggregate")
        agg_file = "train.[adFeatureName='{}'][userFeatureName='{}'].pkl".format(ad_feat_name, user_feat_name)
        agg_path = os.path.join(agg_folder, agg_file)
        os.makedirs(agg_folder, exist_ok=True)
        du.save_pickle(train, agg_path)
        del train
        gc.collect()

Start Processing 'productType' x 'kw1'


  exec(code_obj, self.user_global_ns, self.user_ns)


[15:11:06] Finish constructing clickrate dictionary. △M: +883.49MB. △T: 9.1 minutes.
[15:12:21] Finish splitting values. △M: +3.15GB. △T: 42.3 seconds.


100%|██████████| 8798814/8798814 [01:04<00:00, 135503.27it/s]


[15:13:27] Finish joining click info. △M: +1.01GB. △T: 1.1 minutes.
[15:14:00] Finish reseting DataFrame. △M: -3.11GB. △T: 32.6 seconds.
[15:14:27] Finish saving intermediate result. △M: -268.52MB. △T: 26.5 seconds.
Start Processing 'productType' x 'kw2'
[15:16:29] Finish constructing clickrate dictionary. △M: -447.63MB. △T: 1.8 minutes.
[15:17:27] Finish splitting values. △M: +2.96GB. △T: 34.2 seconds.


100%|██████████| 8798814/8798814 [00:35<00:00, 245543.73it/s]


[15:18:05] Finish joining click info. △M: -124.96MB. △T: 38.1 seconds.
[15:18:30] Finish reseting DataFrame. △M: -3.14GB. △T: 25.2 seconds.
[15:18:48] Finish saving intermediate result. △M: -201.39MB. △T: 17.6 seconds.
Start Processing 'productType' x 'kw3'
[15:19:19] Finish constructing clickrate dictionary. △M: +0B. △T: 23.1 seconds.
[15:19:41] Finish splitting values. △M: +352.0KB. △T: 11.4 seconds.


100%|██████████| 8798814/8798814 [00:13<00:00, 629373.63it/s]


[15:19:56] Finish joining click info. △M: -130.31MB. △T: 15.3 seconds.
[15:20:07] Finish reseting DataFrame. △M: -536.76MB. △T: 11.1 seconds.
[15:20:16] Finish saving intermediate result. △M: -201.26MB. △T: 8.5 seconds.
Start Processing 'productType' x 'topic1'
[15:20:40] Finish constructing clickrate dictionary. △M: +48.0KB. △T: 18.7 seconds.
[15:21:22] Finish splitting values. △M: +2.42GB. △T: 24.3 seconds.


100%|██████████| 8798814/8798814 [00:38<00:00, 228853.25it/s]


[15:22:01] Finish joining click info. △M: +18.26MB. △T: 39.2 seconds.
[15:22:28] Finish reseting DataFrame. △M: -2.94GB. △T: 26.7 seconds.
[15:22:49] Finish saving intermediate result. △M: -201.39MB. △T: 21.0 seconds.
Start Processing 'aid' x 'topic2'
[15:37:51] Finish constructing clickrate dictionary. △M: +1.7GB. △T: 14.7 minutes.
[15:38:43] Finish splitting values. △M: +2.35GB. △T: 32.0 seconds.


100%|██████████| 8798814/8798814 [00:59<00:00, 147825.42it/s]


[15:39:44] Finish joining click info. △M: +275.92MB. △T: 1.0 minutes.
[15:40:25] Finish reseting DataFrame. △M: -2.94GB. △T: 41.1 seconds.
[15:40:57] Finish saving intermediate result. △M: -201.39MB. △T: 32.1 seconds.
Start Processing 'productType' x 'topic2'
[15:41:38] Finish constructing clickrate dictionary. △M: -735.92MB. △T: 25.1 seconds.
[15:42:26] Finish splitting values. △M: +1.99GB. △T: 32.0 seconds.


100%|██████████| 8798814/8798814 [00:39<00:00, 221534.29it/s]


[15:43:09] Finish joining click info. △M: -459.51MB. △T: 42.6 seconds.
[15:43:38] Finish reseting DataFrame. △M: -2.52GB. △T: 28.9 seconds.
[15:43:58] Finish saving intermediate result. △M: -201.39MB. △T: 20.4 seconds.
Start Processing 'advertiserId' x 'interest1'
[15:44:13] Finish constructing clickrate dictionary. △M: -9.18MB. △T: 4.7 seconds.
[15:45:30] Finish splitting values. △M: +5.47GB. △T: 58.7 seconds.


100%|██████████| 8798814/8798814 [01:06<00:00, 132019.72it/s]


[15:46:38] Finish joining click info. △M: +855.84MB. △T: 1.1 minutes.
[15:47:28] Finish reseting DataFrame. △M: -5.89GB. △T: 49.4 seconds.
[15:47:59] Finish saving intermediate result. △M: -201.39MB. △T: 30.8 seconds.
Start Processing 'aid' x 'interest2'
[15:48:17] Finish constructing clickrate dictionary. △M: +0B. △T: 6.5 seconds.
[15:48:49] Finish splitting values. △M: +776.12MB. △T: 18.4 seconds.


100%|██████████| 8798814/8798814 [00:34<00:00, 257768.03it/s]


[15:49:25] Finish joining click info. △M: +163.63MB. △T: 35.7 seconds.
[15:49:48] Finish reseting DataFrame. △M: -1.7GB. △T: 23.2 seconds.
[15:50:08] Finish saving intermediate result. △M: +0B. △T: 19.7 seconds.
Start Processing 'creativeSize' x 'interest2'
[15:50:18] Finish constructing clickrate dictionary. △M: -1.0MB. △T: 0.6 seconds.
[15:50:50] Finish splitting values. △M: +800.1MB. △T: 18.2 seconds.


100%|██████████| 8798814/8798814 [00:25<00:00, 344376.07it/s]


[15:51:17] Finish joining click info. △M: -6.79MB. △T: 26.9 seconds.
[15:51:40] Finish reseting DataFrame. △M: -1.5GB. △T: 22.7 seconds.
[15:51:53] Finish saving intermediate result. △M: +0B. △T: 13.9 seconds.
Start Processing 'campaignId' x 'interest4'
[15:52:03] Finish constructing clickrate dictionary. △M: -512.0KB. △T: 0.7 seconds.
[15:52:23] Finish splitting values. △M: -1.0MB. △T: 10.9 seconds.


100%|██████████| 8798814/8798814 [00:16<00:00, 546936.93it/s]


[15:52:40] Finish joining click info. △M: -257.5MB. △T: 17.2 seconds.
[15:52:51] Finish reseting DataFrame. △M: -738.43MB. △T: 10.9 seconds.
[15:53:00] Finish saving intermediate result. △M: -256.0KB. △T: 9.1 seconds.
Start Processing 'aid' x 'interest5'
[15:53:18] Finish constructing clickrate dictionary. △M: +0B. △T: 11.6 seconds.
[15:54:52] Finish splitting values. △M: +6.44GB. △T: 1.2 minutes.


100%|██████████| 8798814/8798814 [01:30<00:00, 97504.92it/s] 


[15:56:23] Finish joining click info. △M: +727.94MB. △T: 1.5 minutes.
[15:57:25] Finish reseting DataFrame. △M: -6.96GB. △T: 1.0 minutes.
[15:58:10] Finish saving intermediate result. △M: -201.39MB. △T: 45.0 seconds.
Start Processing 'aid' x 'ct'
[15:58:25] Finish constructing clickrate dictionary. △M: +0B. △T: 0.4 seconds.
[15:58:49] Finish splitting values. △M: -1.0MB. △T: 14.5 seconds.


100%|██████████| 8798814/8798814 [00:16<00:00, 544997.39it/s]


[15:59:07] Finish joining click info. △M: -300.63MB. △T: 17.8 seconds.
[15:59:19] Finish reseting DataFrame. △M: -738.43MB. △T: 12.0 seconds.
[15:59:28] Finish saving intermediate result. △M: +0B. △T: 9.0 seconds.
Start Processing 'aid' x 'os'
[15:59:34] Finish constructing clickrate dictionary. △M: -6.25MB. △T: 0.2 seconds.
[15:59:52] Finish splitting values. △M: -26.75MB. △T: 9.0 seconds.


100%|██████████| 8798814/8798814 [00:13<00:00, 631965.90it/s]


[16:00:06] Finish joining click info. △M: -2.25MB. △T: 14.8 seconds.
[16:00:16] Finish reseting DataFrame. △M: -738.43MB. △T: 9.7 seconds.
[16:00:24] Finish saving intermediate result. △M: +0B. △T: 8.0 seconds.


In [27]:
# load DataFrame
df_test = du.load_raw_data("test2")

# join ad features
df_test = ad_jointer.join(df_test, df_ad)

for ad_feat_name, user_feat_name in pairs:
    print("Start Processing '{}' x '{}'".format(ad_feat_name, user_feat_name))
    
    # load clickrates
    df_clickrate = batch_load_clickrate(n_splits, ad_feat_name, user_feat_name)

    # reset type to save memory
    df_clickrate = df_clickrate.astype({
        "bs_clickrate": np.float32,
        "impression": np.float32,
        "user_val": str
    })

    # init clickrates dict
    with pu.profiler("constructing clickrate dictionary"):
        raw_ckr_dict = {}
        raw_imp_dict = {}
        
        for i, x in df_clickrate.iterrows():
            tup = (x["ad_val"], x["user_val"])
            if tup not in raw_ckr_dict:
                raw_ckr_dict[tup] = x["bs_clickrate"] / n_splits
                raw_imp_dict[tup] = x["impression"] / n_splits
            else:
                raw_ckr_dict[tup] += x["bs_clickrate"] / n_splits
                raw_imp_dict[tup] += x["impression"] / n_splits
                
        ckr_dict = {tup: (raw_ckr_dict[tup], raw_imp_dict[tup]) for tup in raw_ckr_dict.keys()}
        assert len(ckr_dict) == df_clickrate.shape[0] / n_splits

    # load user feature
    df_user = du.load_user_feature(user_feat_name)

    # join user feature
    test = df_test.copy()
    test = user_jointer.join(test, df_user)

    # split list feature
    with pu.profiler("splitting values"):
        test["val_list"] = test[user_feat_name].fillna("[nan]").apply(lambda x: x.split())

    # join clickrates
    with pu.profiler("joining click info"):
        ckrs = []
        for (aval, uvals) in tqdm.tqdm(zip(test[ad_feat_name], test["val_list"]), total=df_test.shape[0], disable=True):
            row_ckrs = [ckr_dict[(aval, uval)] for uval in uvals]
            ckrs.append(row_ckrs)

    # reset DataFrame
    with pu.profiler("reseting DataFrame"):
        test = test[["aid", "uid"]]
        test.loc[:, "click_list"] = ckrs
        gc.collect()

    # save intermediate result
    with pu.profiler("saving intermediate result"):
        agg_folder = os.path.join(config.DATA_DIR, "intermediate/clickrate_aggregate")
        agg_file = "test2.[adFeatureName='{}'][userFeatureName='{}'].pkl".format(ad_feat_name, user_feat_name)
        agg_path = os.path.join(agg_folder, agg_file)
        os.makedirs(agg_folder, exist_ok=True)
        du.save_pickle(test, agg_path)
        del test
        gc.collect()

Start Processing 'productType' x 'kw1'


  exec(code_obj, self.user_global_ns, self.user_ns)


[16:08:05] Finish constructing clickrate dictionary. △M: +220.0KB. △T: 7.5 minutes.
[16:08:22] Finish splitting values. △M: +534.46MB. △T: 5.1 seconds.
[16:08:30] Finish joining click info. △M: +0B. △T: 8.4 seconds.
[16:08:38] Finish reseting DataFrame. △M: -534.25MB. △T: 7.8 seconds.
[16:08:44] Finish saving intermediate result. △M: +0B. △T: 5.4 seconds.
Start Processing 'productType' x 'kw2'
[16:10:17] Finish constructing clickrate dictionary. △M: -8.5MB. △T: 1.5 minutes.
[16:10:34] Finish splitting values. △M: +189.57MB. △T: 6.0 seconds.
[16:10:43] Finish joining click info. △M: +0B. △T: 8.7 seconds.
[16:10:49] Finish reseting DataFrame. △M: -197.75MB. △T: 6.3 seconds.
[16:10:54] Finish saving intermediate result. △M: +0B. △T: 4.3 seconds.
Start Processing 'productType' x 'kw3'
[16:11:18] Finish constructing clickrate dictionary. △M: -19.25MB. △T: 20.4 seconds.
[16:11:26] Finish splitting values. △M: -1.25MB. △T: 3.7 seconds.
[16:11:30] Finish joining click info. △M: -3.0MB. △T: 3.8

In [28]:
# =======================
# quantile of click rates
# =======================
# 5 features
def get_ckr_quantiles(lst):
    ckrs = [tup[0] for tup in lst]
    return {q: np.percentile(ckrs, q) for q in [100, 75, 50, 25, 0]}


def batch_get_ckr_quantiles(series):
    quantiles = []
    for i, x in series.iteritems():
        quantiles.append(get_ckr_quantiles(x))
    # print("[{}] batch done.".format(pu.get_time_str()))
    return quantiles


# ===========================
# mean and std of click rates
# ===========================
# 2 features
def get_ckr_stats(lst):
    ckrs = [tup[0] for tup in lst]
    return {"mean": np.mean(ckrs), "std": np.std(ckrs)}


def batch_get_ckr_stats(series):
    stats = []
    for i, x in series.iteritems():
        stats.append(get_ckr_stats(x))
    # print("[{}] batch done.".format(pu.get_time_str()))
    return stats


# ===========================
# mean and std of impressions
# ===========================
# 2 features
def get_imp_stats(lst):
    imps = [tup[1] for tup in lst]
    return {"mean": np.mean(imps), "std": np.std(imps)}


def batch_get_imp_stats(series):
    stats = []
    for i, x in series.iteritems():
        stats.append(get_imp_stats(x))
    # print("[{}] batch done.".format(pu.get_time_str()))
    return stats


# ====================================================
# impressions corresponding to min and max click rates
# ====================================================
# 2 features
def get_corr_stats(lst):
    tups = sorted(lst, key=lambda x: x[0])
    return {"min": tups[0][1], "max": tups[-1][1]}


def batch_get_corr_stats(series):
    stats = []
    for i, x in series.iteritems():
        stats.append(get_corr_stats(x))
    # print("[{}] batch done.".format(pu.get_time_str()))
    return stats


# ===============================
# weighted average of click rates
# ===============================
def get_ckr_weighted_avg(lst):
    ckrs = [tup[0] for tup in lst]
    logimps = [np.log1p(tup[1]) for tup in lst]
    try:
        weighted_avg = np.average(ckrs, weights=logimps)
    except ZeroDivisionError:
        weighted_avg = np.mean(ckrs)
    return {"avg": weighted_avg}


def batch_get_ckr_weighted_avg(series):
    stats = []
    for i, x in series.iteritems():
        stats.append(get_ckr_weighted_avg(x))
    # print("[{}] batch done.".format(pu.get_time_str()))
    return stats

In [None]:
def process_pair_train(ad_feat_name, user_feat_name, batch_size=100000, n_procs=4):
    print("Start processing '{}' x '{}'".format(ad_feat_name, user_feat_name))
    
    # ========================
    # load intermediate result
    # ========================
    agg_folder = os.path.join(config.DATA_DIR, "intermediate/clickrate_aggregate")
    agg_file = "train.[adFeatureName='{}'][userFeatureName='{}'].pkl".format(ad_feat_name, user_feat_name)
    agg_path = os.path.join(agg_folder, agg_file)
    df_agg = du.load_pickle(agg_path)

    # =============
    # split batches
    # =============
    with pu.profiler("splitting batches"):
        indices = [(offset, offset + batch_size) for offset in range(0, df_agg.shape[0], batch_size)]
        batches = [df_agg.iloc[tup[0]:tup[1]]["click_list"] for tup in indices]
    print("{} batches, each with {} rows".format(len(batches), batch_size))

    # ====================
    # click rate quantiles
    # ====================
    # process batches 
    with pu.profiler("extracting quantile features"):
        pool = mp.Pool(processes=n_procs)
        results = [pool.apply_async(batch_get_ckr_quantiles, (batch, )) for batch in batches]
        pool.close()
        pool.join()
        result_list = [result.get() for result in results]

        # merge batch results
        final_list = []
        for result in result_list:
            final_list += result
        df_ckr_quantile = pd.DataFrame.from_dict(final_list)

        # reset DataFrame
        colname_dict = {q: "bsClickrate@{}_x_{}_q{}".format(ad_feat_name, user_feat_name, q) for q in [100, 75, 50, 25, 0]}
        df_ckr_quantile = df_ckr_quantile.rename(columns=colname_dict)
        df_ckr_quantile = df_ckr_quantile.astype({colname: np.float32 for colname in colname_dict.values()})
        gc.collect()

    # =======================
    # click rate mean and std
    # =======================
    with pu.profiler("extracting statistical features"):
        pool = mp.Pool(processes=8)
        results = [pool.apply_async(batch_get_ckr_stats, (batch, )) for batch in batches]
        pool.close()
        pool.join()
        result_list = [result.get() for result in results]

        final_list = []
        for result in result_list:
            final_list += result
        df_ckr_stats = pd.DataFrame.from_dict(final_list)

        colname_dict = {s: "bsClickrate@{}_x_{}_{}".format(ad_feat_name, user_feat_name, s) for s in ["mean", "std"]}
        df_ckr_stats = df_ckr_stats.rename(columns=colname_dict)
        df_ckr_stats = df_ckr_stats.astype({colname: np.float32 for colname in colname_dict.values()})
        gc.collect()

    # =======================
    # impression mean and std
    # =======================
    with pu.profiler("extracting statistical features"):
        pool = mp.Pool(processes=8)
        results = [pool.apply_async(batch_get_imp_stats, (batch, )) for batch in batches]
        pool.close()
        pool.join()
        result_list = [result.get() for result in results]

        final_list = []
        for result in result_list:
            final_list += result
        df_imp_stats = pd.DataFrame.from_dict(final_list)

        colname_dict = {s: "impression@{}_x_{}_{}".format(ad_feat_name, user_feat_name, s) for s in ["mean", "std"]}
        df_imp_stats = df_imp_stats.rename(columns=colname_dict)
        df_imp_stats = df_imp_stats.astype({colname: np.float32 for colname in colname_dict.values()})
        gc.collect()

    # =======================================
    # impressions for min and max click rates
    # =======================================
    with pu.profiler("extracting correlated statistical features"):
        pool = mp.Pool(processes=8)
        results = [pool.apply_async(batch_get_corr_stats, (batch, )) for batch in batches]
        pool.close()
        pool.join()
        result_list = [result.get() for result in results]

        final_list = []
        for result in result_list:
            final_list += result
        df_corr_stats = pd.DataFrame.from_dict(final_list)

        colname_dict = {s: "{}_bsClickrate_impression@{}_x_{}".format(s, ad_feat_name, user_feat_name) for s in ["min", "max"]}
        df_corr_stats = df_corr_stats.rename(columns=colname_dict)
        df_corr_stats = df_corr_stats.astype({colname: np.float32 for colname in colname_dict.values()})
        gc.collect()

    # ===========================
    # click rate wegihted average
    # ===========================
    with pu.profiler("extracting weighted average features"):
        pool = mp.Pool(processes=8)
        results = [pool.apply_async(batch_get_ckr_weighted_avg, (batch, )) for batch in batches]
        pool.close()
        pool.join()
        result_list = [result.get() for result in results]

        final_list = []
        for result in result_list:
            final_list += result
        df_ckr_wavg = pd.DataFrame.from_dict(final_list)

        colname_dict = {"avg": "bsClickrate_weighted_avg@{}_x_{}".format(ad_feat_name, user_feat_name)}
        df_ckr_wavg = df_ckr_wavg.rename(columns=colname_dict)
        df_ckr_wavg = df_ckr_wavg.astype({colname: np.float32 for colname in colname_dict.values()})
        gc.collect()

    # ===================
    # merge all DataFrame
    # ===================
    with pu.profiler("merging all features"):
        df_stack = pd.concat([df_ckr_quantile, df_ckr_wavg, df_ckr_stats, df_imp_stats, df_corr_stats], axis=1)
        assert df_stack.isnull().sum().sum() == 0
        assert df_stack.shape[0] == df_train.shape[0]
        assert df_stack.shape[1] == 12
        del df_ckr_quantile
        del df_ckr_wavg
        del df_ckr_stats
        del df_imp_stats
        del df_corr_stats
        gc.collect()

    with pu.profiler("saving features"):
        stack_folder = os.path.join(config.DATA_DIR, "stacking/clickrate")
        stack_file = "train.[adFeatureName='{}'][userFeatureName='{}'].pkl".format(ad_feat_name, user_feat_name)
        stack_path = os.path.join(stack_folder, stack_file)
        os.makedirs(stack_folder, exist_ok=True)
        du.save_pickle(df_stack, stack_path)
        del df_stack
        del result_list
        del final_list
        gc.collect()

In [None]:
for ad_feat_name, user_feat_name in pairs:
    process_pair_train(ad_feat_name, user_feat_name, n_procs=8)

Start processing 'productType' x 'kw1'
[16:32:46] Finish splitting batches. △M: +16.0KB. △T: 0.0 seconds.
88 batches, each with 100000 rows
[16:49:26] Finish extracting quantile features. △M: +2.96GB. △T: 16.7 minutes.
[16:51:49] Finish extracting statistical features. △M: +2.66GB. △T: 2.4 minutes.
[16:54:16] Finish extracting statistical features. △M: +1.28GB. △T: 2.5 minutes.
[16:55:05] Finish extracting correlated statistical features. △M: +296.85MB. △T: 49.3 seconds.
[16:56:43] Finish extracting weighted average features. △M: -296.1MB. △T: 1.6 minutes.
[16:56:48] Finish merging all features. △M: +402.56MB. △T: 4.5 seconds.
[16:56:51] Finish saving features. △M: -402.78MB. △T: 2.9 seconds.
Start processing 'productType' x 'kw2'
[16:57:29] Finish splitting batches. △M: +0B. △T: 0.0 seconds.
88 batches, each with 100000 rows
[17:14:23] Finish extracting quantile features. △M: +779.42MB. △T: 16.9 minutes.
[17:16:44] Finish extracting statistical features. △M: +744.63MB. △T: 2.4 minutes

In [None]:
def process_pair_test(ad_feat_name, user_feat_name, batch_size=100000, n_procs=4):
    print("Start processing '{}' x '{}'".format(ad_feat_name, user_feat_name))
    
    # ========================
    # load intermediate result
    # ========================
    agg_folder = os.path.join(config.DATA_DIR, "intermediate/clickrate_aggregate")
    agg_file = "test2.[adFeatureName='{}'][userFeatureName='{}'].pkl".format(ad_feat_name, user_feat_name)
    agg_path = os.path.join(agg_folder, agg_file)
    df_agg = du.load_pickle(agg_path)

    # =============
    # split batches
    # =============
    with pu.profiler("splitting batches"):
        indices = [(offset, offset + batch_size) for offset in range(0, df_agg.shape[0], batch_size)]
        batches = [df_agg.iloc[tup[0]:tup[1]]["click_list"] for tup in indices]
    print("{} batches, each with {} rows".format(len(batches), batch_size))

    # ====================
    # click rate quantiles
    # ====================
    # process batches 
    with pu.profiler("extracting quantile features"):
        pool = mp.Pool(processes=n_procs)
        results = [pool.apply_async(batch_get_ckr_quantiles, (batch, )) for batch in batches]
        pool.close()
        pool.join()
        result_list = [result.get() for result in results]

        # merge batch results
        final_list = []
        for result in result_list:
            final_list += result
        df_ckr_quantile = pd.DataFrame.from_dict(final_list)

        # reset DataFrame
        colname_dict = {q: "bsClickrate@{}_x_{}_q{}".format(ad_feat_name, user_feat_name, q) for q in [100, 75, 50, 25, 0]}
        df_ckr_quantile = df_ckr_quantile.rename(columns=colname_dict)
        df_ckr_quantile = df_ckr_quantile.astype({colname: np.float32 for colname in colname_dict.values()})
        gc.collect()

    # =======================
    # click rate mean and std
    # =======================
    with pu.profiler("extracting statistical features"):
        pool = mp.Pool(processes=8)
        results = [pool.apply_async(batch_get_ckr_stats, (batch, )) for batch in batches]
        pool.close()
        pool.join()
        result_list = [result.get() for result in results]

        final_list = []
        for result in result_list:
            final_list += result
        df_ckr_stats = pd.DataFrame.from_dict(final_list)

        colname_dict = {s: "bsClickrate@{}_x_{}_{}".format(ad_feat_name, user_feat_name, s) for s in ["mean", "std"]}
        df_ckr_stats = df_ckr_stats.rename(columns=colname_dict)
        df_ckr_stats = df_ckr_stats.astype({colname: np.float32 for colname in colname_dict.values()})
        gc.collect()

    # =======================
    # impression mean and std
    # =======================
    with pu.profiler("extracting statistical features"):
        pool = mp.Pool(processes=8)
        results = [pool.apply_async(batch_get_imp_stats, (batch, )) for batch in batches]
        pool.close()
        pool.join()
        result_list = [result.get() for result in results]

        final_list = []
        for result in result_list:
            final_list += result
        df_imp_stats = pd.DataFrame.from_dict(final_list)

        colname_dict = {s: "impression@{}_x_{}_{}".format(ad_feat_name, user_feat_name, s) for s in ["mean", "std"]}
        df_imp_stats = df_imp_stats.rename(columns=colname_dict)
        df_imp_stats = df_imp_stats.astype({colname: np.float32 for colname in colname_dict.values()})
        gc.collect()

    # =======================================
    # impressions for min and max click rates
    # =======================================
    with pu.profiler("extracting correlated statistical features"):
        pool = mp.Pool(processes=8)
        results = [pool.apply_async(batch_get_corr_stats, (batch, )) for batch in batches]
        pool.close()
        pool.join()
        result_list = [result.get() for result in results]

        final_list = []
        for result in result_list:
            final_list += result
        df_corr_stats = pd.DataFrame.from_dict(final_list)

        colname_dict = {s: "{}_bsClickrate_impression@{}_x_{}".format(s, ad_feat_name, user_feat_name) for s in ["min", "max"]}
        df_corr_stats = df_corr_stats.rename(columns=colname_dict)
        df_corr_stats = df_corr_stats.astype({colname: np.float32 for colname in colname_dict.values()})
        gc.collect()

    # ===========================
    # click rate wegihted average
    # ===========================
    with pu.profiler("extracting weighted average features"):
        pool = mp.Pool(processes=8)
        results = [pool.apply_async(batch_get_ckr_weighted_avg, (batch, )) for batch in batches]
        pool.close()
        pool.join()
        result_list = [result.get() for result in results]

        final_list = []
        for result in result_list:
            final_list += result
        df_ckr_wavg = pd.DataFrame.from_dict(final_list)

        colname_dict = {"avg": "bsClickrate_weighted_avg@{}_x_{}".format(ad_feat_name, user_feat_name)}
        df_ckr_wavg = df_ckr_wavg.rename(columns=colname_dict)
        df_ckr_wavg = df_ckr_wavg.astype({colname: np.float32 for colname in colname_dict.values()})
        gc.collect()

    # ===================
    # merge all DataFrame
    # ===================
    with pu.profiler("merging all features"):
        df_stack = pd.concat([df_ckr_quantile, df_ckr_wavg, df_ckr_stats, df_imp_stats, df_corr_stats], axis=1)
        assert df_stack.isnull().sum().sum() == 0
        assert df_stack.shape[0] == df_test.shape[0]
        assert df_stack.shape[1] == 12
        del df_ckr_quantile
        del df_ckr_wavg
        del df_ckr_stats
        del df_imp_stats
        del df_corr_stats
        gc.collect()

    with pu.profiler("saving features"):
        stack_folder = os.path.join(config.DATA_DIR, "stacking/clickrate")
        stack_file = "test2.[adFeatureName='{}'][userFeatureName='{}'].pkl".format(ad_feat_name, user_feat_name)
        stack_path = os.path.join(stack_folder, stack_file)
        os.makedirs(stack_folder, exist_ok=True)
        du.save_pickle(df_stack, stack_path)
        del df_stack
        del result_list
        del final_list
        gc.collect()

In [None]:
for ad_feat_name, user_feat_name in pairs:
    process_pair_test(ad_feat_name, user_feat_name, n_procs=8)