In [1]:
!pip install torch==1.10.0

Collecting torch==1.10.0
  Using cached torch-1.10.0-cp37-cp37m-manylinux1_x86_64.whl (881.9 MB)
Installing collected packages: torch
  Attempting uninstall: torch
    Found existing installation: torch 1.8.0a0+37c1f4a
    Uninstalling torch-1.8.0a0+37c1f4a:
      Successfully uninstalled torch-1.8.0a0+37c1f4a
Successfully installed torch-1.10.0


In [2]:
import os

DIR = 'input'

TRAIN_TEST_PATH = os.path.join(DIR, 'train_test.parquet') 
RECS_TEST_PATH = os.path.join(DIR, 'recs_test.parquet')  
TEST_IDS_PATH = os.path.join(DIR, 'test_ids.csv') 

CLUSTERS_PATH = os.path.join(DIR, 'clusters.parquet')  
USER_DECODER_PATH = os.path.join(DIR, 'user_decoder.pkl') 
RANKER_MODEL_PATH = os.path.join(DIR, 'ranker_model.pkl')
RANKER_MODEL1_PATH = os.path.join(DIR, 'ranker_model1.pkl')
RANKER_MODEL2_PATH = os.path.join(DIR, 'ranker_model2.pkl')

MF_MODEL_PATH = os.path.join(DIR, 'mf_model.pkl')
NN_MODEL_PATH = os.path.join(DIR, 'nn_model.pkl')
RECS_NN_TEST_PATH = os.path.join(DIR, 'recs_nn_test.parquet')

TOPK_TEST_PATH = os.path.join(DIR, 'topk_test.parquet')
TOPK_TEST1_PATH = os.path.join(DIR, 'topk_test1.parquet')
TOPK_TEST2_PATH = os.path.join(DIR, 'topk_test2.parquet')

NUM_CLUSTERS = 8000
NUM_USERS = 1595239
NUM_RETAILERS = 118
NUM_CITIES = 148

SUBMIT_PATH = os.path.join(DIR, 'submission.csv')  # 'output/submission.csv'

In [3]:
TOP_K_CLUSTERS = [ 
    937, 6849, 4873, 7052,  789, 4938, 5998, 5124, 4083,  345,  721,
    4018, 6995, 3334, 4327, 7401, 3684,  292, 7454, 5452, 1023, 6674,
    3366, 4236, 6983, 4647, 2214, 2895, 3205, 4031, 2578,   42, 7855,
    931, 3107, 2000, 7532, 6761, 1131, 3717, 2351, 2728, 4929, 3027,
    612,   21, 1902,  807, 4001, 3771, 1705,  602, 1020, 6428, 6699,
    6271,  554, 4308, 7589, 7002, 1997,  696,  595, 6675, 1751,  923,
    6711,  999, 1666, 1263,  919, 7602, 2285, 4543, 6051, 4540, 4828,
    3543, 6928, 1886, 6029, 5320, 2924, 7449, 4906, 7757, 1077, 5378,
    6189, 1747, 7691, 2595,  811,  103, 7043, 1339, 1574, 2570, 1249,
    735, 3173, 4739, 2152, 2226, 6021, 7739, 7777, 5187, 5299, 2604,
    6569, 5893,  466, 3483, 3640, 3870, 1442, 7114, 1338, 7747, 1867,
    2702, 3046, 1182, 1409, 4663, 4932, 1570, 6053, 6071, 3733,  712,
    3549, 6668, 1006, 4358, 4285, 3668,  885, 4129, 3293,  407, 4392,
    3555, 5812,  129,  163, 3018, 7752, 6998, 5949, 1266, 6656, 2786,
    2199, 2644, 4201, 3514, 6147, 4426, 7495, 5096, 5653,  341, 1826,
    5380,  587, 4062, 6069, 2881, 1377, 6548, 2685, 2629, 7028, 6831,
    7181, 3251, 3948, 1357, 4438, 1138, 7528, 6149, 7514, 4835, 3938,
    1932, 3358, 2503,   11, 1623, 4028, 1890, 6696,  354,  960, 1765,
    3699, 7636,
]

In [4]:
from typing import Iterable, List

import pickle
import torch
import numpy as np
import pandas as pd
from torch.utils.data import Dataset
from sklearn.preprocessing import LabelEncoder
import scipy.sparse as sp

torch.manual_seed(0)
torch.cuda.manual_seed(0)
torch.cuda.manual_seed_all(0)
np.random.seed(0)

class MaxFactorDataset(Dataset):

    def __init__(
            self,
            users: Iterable[int],
            items: Iterable[int],
            device: str = 'cpu',
    ):
        self.device = device
        self.users = torch.LongTensor(users)
        self.items = torch.LongTensor(items)
        self.num_interactions = len(users)
        self.num_items = int(max(items) + 1)
        
        self.index = None
        self.batch_size = None
        self.neg_sample = None
        self.num_batches = None
        self.targets = None

    def init_params(self, batch_size: int, neg_sample: int):

        self.batch_size = batch_size
        self.neg_sample = neg_sample
        self.num_batches = int((self.num_interactions - 1) / batch_size + 1)
        self.targets = torch.zeros(self.batch_size, dtype=torch.long)

    def __getitem__(self, batch_num):

        i = batch_num * self.batch_size
        size = min(self.num_interactions - i, self.batch_size)

        index = self.index[i: i + size].to(self.device)
        items_pos = self.items[index].to(self.device)
        users = self.users[index].to(self.device)

        items_pos = items_pos.reshape(-1, 1)
        items_neg = torch.randint(high=self.num_items, size=(size, self.neg_sample), device=self.device)
        targets = self.targets[:size].to(self.device)

        return (
            users,
            items_pos,
            items_neg,
            targets,
        )

    def __iter__(self):
        self.index = torch.randperm(self.num_interactions)
        for i in range(self.num_batches):
            yield self[i]

    def __len__(self):
        return self.num_batches


class MaxFactorModel(torch.nn.Module):

    def __init__(
            self,
            num_users: int,
            num_items: int,
            dim: int,
            learning_rate: float,
            device: str = 'cpu',
    ):
        super().__init__()

        self.negative_sampling_batch_size = None
        self.hard_neg_sample = None
        self.device = device
        
        self.item_embeddings = torch.nn.Embedding(num_items, dim).to(self.device)
        self.user_embeddings = torch.nn.Embedding(num_users, dim).to(self.device)
        torch.nn.init.xavier_uniform_(self.item_embeddings.weight)
        torch.nn.init.xavier_uniform_(self.user_embeddings.weight)
        self.optimizer = torch.optim.Adagrad(self.parameters(), lr=learning_rate)

    def get_hard_negatives(self, users, items_neg):

        hard_negatives = []
        with torch.no_grad():
            for i in range(0, len(users), self.negative_sampling_batch_size):
                neg = self(
                    users[i: i + self.negative_sampling_batch_size],
                    items_neg[i: i + self.negative_sampling_batch_size],
                )
                topk = torch.topk(neg, self.hard_neg_sample)[1]
                hard_negatives.append(items_neg[i: i + self.negative_sampling_batch_size].gather(1, topk))
        items_neg = torch.cat(hard_negatives, dim=0)
        return items_neg

    def _fit(
            self,
            dataset: MaxFactorDataset,
            epochs: int,
            learning_rate: float,
             penalty_alpha: float,
    ):

        loss_function = torch.nn.CrossEntropyLoss()
        for epoch in range(epochs):
            
            for users, items_pos, items_neg, targets in dataset:
                self.optimizer.zero_grad()
                if self.hard_neg_sample:
                    items_neg = self.get_hard_negatives(users, items_neg)
                items = torch.cat([items_pos, items_neg], dim=1)
                penalty = (((self.item_embeddings.weight ** 2).sum(1) - 1) ** 2).mean()
                score = self(users, items) 
                loss = loss_function(score, targets) + penalty * penalty_alpha
                loss.backward()
                self.optimizer.step()

    def fit(
            self,
            dataset: MaxFactorDataset,
            epochs: int,
            batch_size: int,
            neg_sample: int,
            negative_sampling_batch_size: int = None,
            hard_neg_sample: int = None,
            learning_rate: float = 0.015,
            penalty_alpha: float = 0.003,
    ):
        dataset.init_params(batch_size, neg_sample)
        self.negative_sampling_batch_size = negative_sampling_batch_size
        self.hard_neg_sample = hard_neg_sample

        self._fit(dataset, epochs, learning_rate, penalty_alpha)

    def forward(self, users: torch.LongTensor, items: torch.LongTensor) -> torch.FloatTensor:

        user_embeddings = self.user_embeddings(users).unsqueeze(2)
        item_embeddings = self.item_embeddings(items)
        score = torch.bmm(item_embeddings, user_embeddings).squeeze(2)

        return score

    def predict(self, users: torch.LongTensor, items: torch.LongTensor) -> torch.FloatTensor:

        user_embeddings = self.user_embeddings(users)
        item_embeddings = self.item_embeddings(items).t()
        score = torch.mm(user_embeddings, item_embeddings)

        return score

    def _create_recommendations(
            self,
            target_users: Iterable[int],
            target_items: Iterable[int],
            num_recommendations: int,
    ):
        target_users = torch.LongTensor(target_users).to(self.device)
        target_items = torch.LongTensor(target_items).to(self.device)

        topk = min(num_recommendations, target_items.shape[0])

        with torch.no_grad():
            res = self.predict(target_users, target_items)
            recom = torch.topk(res, topk)
            items = target_items[recom[1]].flatten()
            scores = recom[0].flatten()
            users = target_users.reshape(-1, 1).repeat(1, topk).flatten()

        users = users.cpu().detach().numpy()
        items = items.cpu().detach().numpy()
        scores = scores.cpu().detach().numpy()

        return users, items, scores

    def create_recommendations(
            self,
            target_users: Iterable[int],
            target_items: Iterable[int],
            num_recommendations: int,
    ) -> (np.array, np.array, np.array):
        
        num_batch_users = int(200 ** 3 / 4 / len(target_items))

        all_users = []
        all_items = []
        all_scores = []

        for i in range(0, len(target_users), num_batch_users):
            users, items, scores = self._create_recommendations(
                target_users[i:i + num_batch_users],
                target_items,
                num_recommendations,
            )

            all_users.append(users)
            all_items.append(items)
            all_scores.append(scores)

        all_users = np.hstack(all_users)
        all_items = np.hstack(all_items)
        all_scores = np.hstack(all_scores)

        return all_users, all_items, all_scores
    
    
class MaxFactorRecommender:

    def __init__(self, config):
        self.config = config

        self.cnt = {}
        self.user_encoder = LabelEncoder()
        self.item_encoder = LabelEncoder()
        self.recs = None
        self.train_set = None
        self.dataset = None
        self.model = None
        self.already_seen = None

    def init_model(self):

        self.dataset = MaxFactorDataset(
            users=self.train_set['user'].values,
            items=self.train_set['item'].values,
            device=self.config['device'],
        )

        self.model = MaxFactorModel(
            num_users=self.cnt['users'],
            num_items=self.cnt['items'],
            dim=self.config['dim'],
            learning_rate=self.config['fit_params']['learning_rate'],
            device=self.config['device'],
        )

    def encode_ids(self):
        self.train_set['user'] = self.user_encoder.fit_transform(self.train_set['user_id'])
        self.train_set['item'] = self.item_encoder.fit_transform(self.train_set['cluster_id'])
        self.cnt['items'] = self.train_set.item.max() + 1
        self.cnt['users'] = self.train_set.user.max() + 1
        self.already_seen = self.get_user_item_id(
            user_col=self.train_set['user'],
            item_col=self.train_set['item'],
        ).drop_duplicates().values

    def decode_ids(self):
        self.recs['user_id'] = self.user_encoder.classes_[self.recs.user]
        self.recs['cluster_id'] = self.item_encoder.classes_[self.recs.item]

    def fit(self):
        self.model.fit(
            dataset=self.dataset,
            **self.config['fit_params'],
        )

    def torch_recommend(self, users):
        all_items = []
        all_users = []
        all_scores = []
        target_users = self.user_encoder.transform(users)
        target_items = np.arange(self.cnt['items'])
        users, items, scores = self.model.create_recommendations(
            target_users,
            target_items,
            self.config['num_recommendations'],
        )
        all_items.append(items.astype(np.uint16))
        all_users.append(users.astype(np.int32))
        all_scores.append(scores)

        all_items = np.hstack(all_items)
        all_users = np.hstack(all_users)
        all_scores = np.hstack(all_scores)

        self.recs = pd.DataFrame()
        self.recs['user'] = all_users
        self.recs['item'] = all_items
        self.recs['score'] = all_scores
       
    @staticmethod
    def get_user_item_id(user_col: pd.Series, item_col: pd.Series) -> pd.Series:
        return item_col.astype(np.int64) * (10 ** 8) + user_col
    
    @staticmethod
    def apply_rank(col, df):
        if len(df) == 0:
            return []
        _, index, num_ranges = np.unique(df[col], return_counts=True, return_index=True)
        num_ranges = num_ranges[index.argsort()]
        arange = np.arange(num_ranges.max(), dtype=int)
        ranks = np.hstack([arange[:i] for i in num_ranges])
        return ranks
        
    def filter_seen_recs(self):
        # self.recs['ui'] = self.get_user_item_id(
        #     user_col=self.recs['user'],
        #     item_col=self.recs['item'],
        # )
        # seen = self.recs.ui.isin(self.already_seen)
        # self.recs = self.recs[~seen]
        self.recs['rnk'] = self.apply_rank('user', self.recs)
        
    def create_recommendations(
            self,
            train_set: pd.DataFrame,
            users: Iterable[int],
    ) -> pd.DataFrame:
        """"
        :return
        pd.DataFrame({
            id: [1, 2, 3],
            cluster_id: [4, 5, 6],
            score: [0.1, 0.3, -0.2],
            rnk: [0, 1, 2],
        })
        """
        self.train_set = train_set
        self.encode_ids()
        self.init_model()
        self.fit()
        self.torch_recommend(users)
        self.filter_seen_recs()
        self.decode_ids()
        return self.recs[['user_id', 'cluster_id', 'score']]

In [5]:
def create_recs_mf():
    test_ids = pd.read_csv(TEST_IDS_PATH)
    user_decoder = pickle.load(open(USER_DECODER_PATH, 'rb'))
    user_ecnoder = dict(zip(user_decoder, np.arange(len(user_decoder))))
    test_ids['user_id'] = test_ids['id'].map(user_ecnoder)

    recommender = pickle.load(open(MF_MODEL_PATH, 'rb'))

    recommender.torch_recommend(test_ids['user_id'])
    recommender.filter_seen_recs()
    recommender.decode_ids()
    recs_test = recommender.recs[['user_id', 'cluster_id', 'score']]

    recs_test.to_parquet(RECS_TEST_PATH)

In [6]:
!pip install memory_profiler

Collecting memory_profiler
  Using cached memory_profiler-0.60.0-py3-none-any.whl
Installing collected packages: memory-profiler
Successfully installed memory-profiler-0.60.0


In [7]:
%load_ext memory_profiler

In [8]:
%%time
%%memit
create_recs_mf()



peak memory: 8220.55 MiB, increment: 7941.73 MiB
CPU times: user 1min 14s, sys: 8.24 s, total: 1min 22s
Wall time: 44.8 s


In [9]:
class Model(torch.nn.Module):
    
    def __init__(self, device='cpu'):
        super().__init__()
        self.device = device
        dim = 2 * NUM_CLUSTERS + NUM_RETAILERS + NUM_CITIES
        self.linear = torch.nn.Linear(dim, 10000).to(self.device)
        self.linear2 = torch.nn.Linear(10000, NUM_CLUSTERS).to(self.device)
        torch.nn.init.xavier_uniform_(self.linear.weight)
        torch.nn.init.xavier_uniform_(self.linear2.weight)
        self.sigmoid = torch.nn.Sigmoid()
        self.relu = torch.nn.ReLU()
    
    def forward(self, x):
        return self.sigmoid(self.linear2(self.relu(self.linear(x))))

In [10]:
class Dataset:
    
    def __init__(self, x, y, users, batch_size, device='cuda'):
        
        self.batch_size = batch_size
        self.device = device
        self.x = x
        self.y = y
        self.users = users
        self.num_users = len(users)
        self.num_batches = int((self.num_users - 1) / batch_size + 1)
        
    def __getitem__(self, batch_num):
        
        i = batch_num * self.batch_size
        size = min(self.num_users - i, self.batch_size)
        users = self.users[i: i + size]
        if self.y is not None:
            return (torch.FloatTensor(self.x[users].todense()).to(self.device), 
                  torch.FloatTensor(self.y[users].todense()).to(self.device))
        else:
            return torch.FloatTensor(self.x[users].todense()).to(self.device), None
            
    def __iter__(self):
        np.random.shuffle(self.users)
        for i in range(self.num_batches):
            yield self[i]

    def __len__(self):
        return self.num_batches

In [11]:
def create_sparse_matrix(short_train, col, num_classes, use_ones=False):
    df = short_train[['user_id', col]].drop_duplicates()
    df[f'user_{col}'] = df['user_id'].astype(np.int64) * 10000 + df[col]
    df['user_col_count'] = df[f'user_{col}'].map(short_train[f'user_{col}'].value_counts()) 
    df['user_count'] = df['user_id'].map(short_train['user_id'].value_counts()) 
    df['user_col_share'] = df['user_col_count'] / df['user_count']
    if use_ones:
        return sp.csr_matrix((np.ones(len(df)), (df['user_id'], df[col])), shape=(NUM_USERS, num_classes))
    return sp.csr_matrix((df['user_col_share'], (df['user_id'], df[col])), shape=(NUM_USERS, num_classes))

def create_x_y(train_val, val=None):
    short_train = train_val[~train_val[['order_id', 'cluster_id']].duplicated()]
    short_train['user_retailer_id'] = short_train['user_id'].astype(np.int64) * 10000 + short_train['retailer_id']
    short_train['user_city_id'] = short_train['user_id'].astype(np.int64) * 10000 + short_train['city_id']
    short_train['user_cluster_id'] = short_train['user_id'].astype(np.int64) * 10000 + short_train['cluster_id']

    x1 = create_sparse_matrix(short_train, 'retailer_id', NUM_RETAILERS)
    x2 = create_sparse_matrix(short_train, 'city_id', NUM_CITIES)
    x3 = create_sparse_matrix(short_train, 'cluster_id', NUM_CLUSTERS)
    x4 = create_sparse_matrix(short_train, 'cluster_id', NUM_CLUSTERS, True)

    x = sp.hstack([x1, x2, x3, x4], format='csr')
    if val is not None:
        y = sp.csr_matrix((np.ones(len(val)), [val['user_id'], val['cluster_id']]), shape=(NUM_USERS, NUM_CLUSTERS))
        return x, y
    else:
        return x, None


In [12]:
def get_rec(model, dataset, topk=160):
    items = []
    scores = []
    losses = []
    with torch.no_grad():
        for x, y in dataset:
            score = model(x)
            recom = torch.topk(score, topk)
            items.append(recom[1].flatten().cpu().detach().numpy().astype(np.int16))
            scores.append(recom[0].flatten().cpu().detach().numpy())

    users = dataset.users.reshape(-1, 1).repeat(topk, 1).flatten()
    items = np.hstack(items)
    scores = np.hstack(scores)

    recs = pd.DataFrame()
    recs['user_id'] = users
    recs['cluster_id'] = items
    recs['scores'] = scores
    return recs

In [13]:
def create_recs_nn():
    config = {
        'batch_size': 3000,
        'device': 'cpu',
    }
    model = pickle.load(open(NN_MODEL_PATH, 'rb'))
    train_test =  pd.read_parquet(TRAIN_TEST_PATH)
    test_ids = pd.read_csv(TEST_IDS_PATH)
    user_decoder = pickle.load(open(USER_DECODER_PATH, 'rb'))
    user_ecnoder = dict(zip(user_decoder, np.arange(len(user_decoder))))
    test_ids['user_id'] = test_ids['id'].map(user_ecnoder)
    
    x, y = create_x_y(train_test)
    dataset = Dataset(x, y, np.array(test_ids['user_id']).astype(np.int32), 
                      config['batch_size'], config['device'])
    recs = get_rec(model, dataset)
    recs.to_parquet(RECS_NN_TEST_PATH)

In [None]:
%%time
%%memit
create_recs_nn()

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  del sys.path[0]
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  from ipykernel import kernelapp as app


In [None]:
import lightgbm
import pandas as pd
import numpy as np
import pickle

from collections import defaultdict

def most_common(array):
    elements, counts = np.unique(array, return_counts=True)
    return elements[np.argpartition(counts, kth=-1)[-1]]

def apply_rank(col, df):
    if len(df) == 0:
        return []
    _, index, num_ranges = np.unique(df[col], return_counts=True, return_index=True)
    num_ranges = num_ranges[index.argsort()]
    arange = np.arange(num_ranges.max(), dtype=int)
    ranks = np.hstack([arange[:i] for i in num_ranges])
    return ranks

def get_mean_diff_dt(array):
    if len(array) == 1:
        return -1
    np_array = np.array(array)
    np_array[1:] - np_array[:-1]
    return (np_array[1:] - np_array[:-1]).mean()
    
def create_features_simple(table, train, users, clusters):
    table['count_item_id'] = (table.cluster_id.map(train['cluster_id'].value_counts()).fillna(0) / len(train)).astype(np.float32)

    table['num_orders'] = table['user_id'].map(
        train[['order_id', 'user_id']].drop_duplicates()['user_id'].value_counts()
    ).astype(np.int16)

    table['num_order_with_target_item'] = table['ui'].map(
        train[['order_id', 'ui']].drop_duplicates()['ui'].value_counts()
    ).fillna(0).astype(np.int16)
    
    last_order_ui = train[train.dt == \
          train['user_id'].map(
                train[['user_id', 'dt']].drop_duplicates().groupby('user_id').max()['dt']
    )].ui.unique()

    table['was_in_last_order'] = table['ui'].isin(last_order_ui).astype(np.int8)
    del last_order_ui

    prod_quantity = train.groupby('ui')['product_quantity'].sum()
    table['prod_quantity'] = table['ui'].map(prod_quantity).fillna(0).astype(np.int16)
    del prod_quantity
    
    prev_order_ui = train['dt'].max() - train.groupby('ui')['dt'].max()
    table['prev_order_ui'] = table['ui'].map(prev_order_ui).fillna(-1).astype(np.float32)
    del prev_order_ui
    
    mask = ~train[['user_id', 'order_id']].duplicated()
    
    table['user_retailer_most_common'] = table['user_id'].map(
        train[mask].groupby('user_id').retailer_id.apply(most_common)
    ).astype(np.int8)
    
    user_city_most_common = table['user_id'].map(
        train[mask].groupby('user_id').city_id.apply(most_common)
    ).astype(np.int16)
    
    del mask
    
    item_city_vc = (train['cluster_id'] * 100 + train['city_id']).value_counts()
    item_user_city = table['cluster_id'] * 100 + user_city_most_common
    table['user_item_city_vc'] = item_user_city.map(item_city_vc).fillna(0).astype(np.float32)
    del item_city_vc
    del item_user_city
    
    for col in ['cluster_size', 'd_mean', 'd_median']:
        table['cluster_' + col] = table['cluster_id'].map(
            clusters.set_index('cluster_id')[col]
        )
        table['cluster_' + col] = table['cluster_id'].map(
            clusters.set_index('cluster_id')[col]
        )
        
    short_train = train[train.user_id.isin(users)]
    
    table['product_quantity_sum'] = table.user_id.map(
          short_train.groupby('user_id').product_quantity.sum()
    )
    table['user_retailer_num'] = table.user_id.map(
        short_train.groupby('user_id').retailer_id.nunique()
    ).astype(np.int8)
    table['user_city_num'] = table.user_id.map(
        short_train.groupby('user_id').city_id.nunique()
    ).astype(np.int8)
    table['user_product_price_mean'] = table.user_id.map(
        short_train.groupby('user_id').product_price.mean()
    )
    table['user_product_discount_mean'] = table.user_id.map(
        (short_train.product_discount != 0).groupby(short_train.user_id).mean()
    ).astype(np.float16)
    table['user_num_clusters'] = table['user_id'].map(
        short_train[['cluster_id', 'user_id']].drop_duplicates()['user_id'].value_counts()
    ).astype(np.int16)
    table['last_user_city_id'] = table['user_id'].map(
        short_train.groupby('user_id').city_id.last()
    )
    table['last_user_retailer_id'] = table['user_id'].map(
        short_train.groupby('user_id').retailer_id.last()
    )
    
    table['user_most_common_cluster_id'] = table['user_id'].map(
        short_train.groupby('user_id').cluster_id.apply(most_common)
    )
    del short_train
    
    mask = ~train[['user_id', 'order_id', 'cluster_id']].duplicated()

    table['cluster_quantity_mean'] = table['cluster_id'].map(
        train.groupby('cluster_id').product_quantity.mean().astype(np.float16)
    )

    table['cluster_city_count'] = table['cluster_id'].map(
        train[mask].groupby('cluster_id').city_id.nunique()
    ).astype(np.float16)
    
    table['cluster_num_stores'] = table['cluster_id'].map(
        train[mask].groupby('cluster_id').store_id.nunique()
    ).astype(np.float16)
    del mask

    table['cluster_product_price_mean'] = table['cluster_id'].map(
        train.groupby('cluster_id').product_price.mean()
    ).astype(np.float16)

    table['cluster_mean_discount'] = table['cluster_id'].map(
        (train.product_discount == 0).groupby(train.cluster_id).mean().astype(np.float16)
    )

    table['num_users_bought_cluster'] = table['cluster_id'].map(
        train.groupby('cluster_id').user_id.nunique()
    ).fillna(0).astype(np.float16)

    table['num_orders_cluster'] = table['cluster_id'].map(
        train.groupby('cluster_id').order_id.nunique()
    ).fillna(0).astype(np.float16)
    
    mask = ~train[['order_id', 'cluster_id']].duplicated()
    short_train = train[mask]

    city_retailer = short_train.city_id.astype(np.int16) * 100 + short_train.retailer_id
    city_retailer_cluster = city_retailer.astype(np.int64) * 10000 + short_train.cluster_id

    city_retailer_user = user_city_most_common.astype(np.int16) * 100 + \
        table['user_retailer_most_common']
    city_retailer_cluster_user = city_retailer_user.astype(np.int64)*10000 + table.cluster_id

    table['f1'] = city_retailer_user.map(
        city_retailer.value_counts()
    ).fillna(0).astype(np.float32)

    table['f2'] = city_retailer_cluster_user.map(
        city_retailer_cluster.value_counts()
    ).fillna(0).astype(np.float32)

    table['f3'] = table['f2'] \
        / table['f1'] 
    
    
    del city_retailer_user
    del city_retailer_cluster_user

    city_retailer_user = table['last_user_city_id'].astype(np.int16) * 100 + \
        table['last_user_retailer_id']
    city_retailer_cluster_user = city_retailer_user.astype(np.int64)*10000 + table.cluster_id

    f4 = city_retailer_user.map(
        city_retailer.value_counts()
    ).fillna(0).astype(np.float32)

    table['f5'] = city_retailer_cluster_user.map(
        city_retailer_cluster.value_counts()
    ).fillna(0).astype(np.float32)

    table['f6'] = table['f5'] \
        / f4 
    del f4
    
    del city_retailer
    del city_retailer_user
    del city_retailer_cluster_user
    del city_retailer_cluster

    ui_vc = train.ui.value_counts()
    rnk_vc = train[['user_id', 'ui', 'cluster_id']].drop_duplicates()
    rnk_vc['vc'] = rnk_vc.ui.map(ui_vc)
    rnk_vc = rnk_vc.sort_values(['user_id', 'vc'], ascending=False)
    rnk_vc['rnk_user_id_ui'] = apply_rank('user_id', rnk_vc)
    table['rnk_user_id_ui'] = table.ui.map(rnk_vc.set_index('ui')['rnk_user_id_ui']
                                          ).fillna(10000).astype(np.int16)
    del ui_vc

    rnk_vc = rnk_vc.sort_values(['cluster_id', 'vc'], ascending=False)
    rnk_vc['rnk_cluster_id_ui'] = apply_rank('cluster_id', rnk_vc)
    table['rnk_cluster_id_ui'] = table.ui.map(rnk_vc.set_index('ui')['rnk_cluster_id_ui']
                                          ).fillna(10000).astype(np.int16)
    del rnk_vc

    rnk_vc = train['cluster_id'].value_counts().to_frame()
    rnk_vc['rnk_cluster_id'] = np.arange(len(rnk_vc))
    table['rnk_cluster_id'] = table.cluster_id.map(rnk_vc['rnk_cluster_id']
                                                  ).fillna(10000).astype(np.int16)
    del rnk_vc

    cluster_city_vc = (train['city_id'].astype(np.int32) * 10000 + train['cluster_id']
                      ).value_counts()
    rnk_vc = train[['city_id', 'cluster_id']].drop_duplicates()
    rnk_vc['cluster_city'] = rnk_vc['city_id'].astype(np.int32) * 10000 + rnk_vc['cluster_id']
    rnk_vc['vc'] = rnk_vc['cluster_city'].map(cluster_city_vc)
    rnk_vc = rnk_vc.sort_values(['city_id', 'vc'], ascending=False)
    rnk_vc['rnk_cluster_city'] = apply_rank('city_id', rnk_vc)
    user_city_cluster = table['last_user_city_id'].astype(np.int32) * 10000 \
        + table['cluster_id']
    table['rnk_cluster_city'] = user_city_cluster.map(
        rnk_vc.set_index('cluster_city')['rnk_cluster_city']
    ).fillna(10000).astype(np.int16)
    del cluster_city_vc
    del rnk_vc
    del user_city_cluster

    cluster_retailer_vc = (train['retailer_id'].astype(np.int32) * 10000 + train['cluster_id']
                      ).value_counts()
    rnk_vc = train[['retailer_id', 'cluster_id']].drop_duplicates()
    rnk_vc['cluster_retailer'] = rnk_vc['retailer_id'].astype(np.int32) * 10000 + rnk_vc['cluster_id']
    rnk_vc['vc'] = rnk_vc['cluster_retailer'].map(cluster_retailer_vc)
    rnk_vc = rnk_vc.sort_values(['retailer_id', 'vc'], ascending=False)
    rnk_vc['rnk_cluster_retailer'] = apply_rank('retailer_id', rnk_vc)
    user_retailer_cluster = table['last_user_retailer_id'].astype(np.int32) * 10000 \
        + table['cluster_id']
    table['rnk_cluster_retailer'] = user_retailer_cluster.map(
        rnk_vc.set_index('cluster_retailer')['rnk_cluster_retailer']
    ).fillna(10000).astype(np.int16)
    user_retailer_cluster = table['user_retailer_most_common'].astype(np.int32) * 10000 \
        + table['cluster_id']
    table['rnk_cluster_retailer2'] = user_retailer_cluster.map(
        rnk_vc.set_index('cluster_retailer')['rnk_cluster_retailer']
    ).fillna(10000).astype(np.int16)
    del cluster_retailer_vc
    del rnk_vc
    del user_retailer_cluster

    cluster_retailer_city_vc = (train['city_id'].astype(np.int64) * 10000000 + \
        train['retailer_id'].astype(np.int64) * 10000 + \
        train['cluster_id']).value_counts()
    rnk_vc = train[['retailer_id', 'cluster_id', 'city_id']].drop_duplicates()
    rnk_vc['cluster_retailer_city'] = (rnk_vc['city_id'].astype(np.int64) * 10000000 + \
        rnk_vc['retailer_id'].astype(np.int64) * 10000 + \
        rnk_vc['cluster_id'])
    rnk_vc['vc'] = rnk_vc['cluster_retailer_city'].map(cluster_retailer_city_vc)
    rnk_vc['retailer_city'] = (rnk_vc['city_id'].astype(np.int64) * 1000 + \
        rnk_vc['retailer_id'].astype(np.int64))
    rnk_vc = rnk_vc.sort_values(['retailer_city', 'vc'], ascending=False)
    rnk_vc['rnk_cluser_city_retailer'] = apply_rank('retailer_city', rnk_vc)
    user_retailer_city_cluster = (table['last_user_city_id'].astype(np.int64) * 10000000 + \
        table['last_user_retailer_id'].astype(np.int64) * 10000 + \
        table['cluster_id'])
    table['rnk_cluster_retailer_city'] = user_retailer_city_cluster.map(
        rnk_vc.set_index('cluster_retailer_city')['rnk_cluser_city_retailer']
    ).fillna(10000).astype(np.int16)
    user_retailer_city_cluster = (table['last_user_city_id'].astype(np.int64) * 10000000 + \
        table['user_retailer_most_common'].astype(np.int64) * 10000 + \
        table['cluster_id'])
    table['rnk_cluster_retailer_city2'] = user_retailer_city_cluster.map(
        rnk_vc.set_index('cluster_retailer_city')['rnk_cluser_city_retailer']
    ).fillna(10000).astype(np.int16)
    del cluster_retailer_city_vc
    del rnk_vc
    del user_retailer_city_cluster

    
    return table

def create_table(train, recs_nn, recs_mf, users):
    
    recs_nn['rnk'] = apply_rank('user_id', recs_nn)
    recs_mf['rnk'] = apply_rank('user_id', recs_mf)

    mask1 = recs_nn['user_id'].isin(users)
    mask2 = ~recs_mf.ui.isin(recs_nn.ui) & recs_mf['user_id'].isin(users)
    mask3 = ~(train.ui.isin(recs_nn.ui) | train.ui.isin(recs_mf.ui) \
              | train.ui.duplicated())  & train['user_id'].isin(users)
    
    table = pd.concat([
        recs_nn[['user_id', 'cluster_id']][mask1], 
        recs_mf[['user_id', 'cluster_id']][mask2], 
        train[['user_id', 'cluster_id']][mask3]
    ])
    table.reset_index(drop=True, inplace=True)
    del mask1
    del mask2
    del mask3
    table['ui'] = table['user_id'].astype(np.int64) * 10000 + table['cluster_id']
    
    table['rnk'] = table['ui'].map(
        recs_nn.set_index('ui')['rnk']
    ).fillna(10000).astype(np.int16)
    
    table['score'] = table['ui'].map(
        recs_nn.set_index('ui')['scores']
    ).fillna(-100).astype(np.float32)
    
    recs_nn = recs_nn[~recs_nn.ui.isin(train.ui)]
    recs_nn['rnk2'] = apply_rank('user_id', recs_nn)
    table['rnk2'] = table['ui'].map(
        recs_nn.set_index('ui')['rnk']
    ).fillna(10000).astype(np.int16)
    
    table['rnk3'] = table['ui'].map(
        recs_mf.set_index('ui')['rnk']
    ).fillna(10000).astype(np.int16)
    
    table['score2'] = table['ui'].map(
        recs_mf.set_index('ui')['score']
    ).fillna(-100).astype(np.float32)
    
    recs_mf = recs_mf[~recs_mf.ui.isin(train.ui)]
    recs_mf['rnk2'] = apply_rank('user_id', recs_mf)
    table['rnk4'] = table['ui'].map(
        recs_mf.set_index('ui')['rnk2']
    ).fillna(10000).astype(np.int16)
    
    return table
    

In [None]:
def get_recs(pred, users, items, already_bought, weights, num_recs=20):
    fix_pred = pred * (1.37 - already_bought) * (weights ** 1.5)
    indexes = (-fix_pred).argsort()
    recs = defaultdict(list)
    for user_id, item_id in zip(users[indexes], items[indexes]):
        if len(recs[user_id]) < num_recs:
            recs[user_id].append(item_id)
    return recs


def get_cluster_weights(dataset: pd.DataFrame) -> pd.DataFrame:

    cluster_popularity = dataset["cluster_id"].value_counts().sort_values(ascending=True).reset_index()
    cluster_popularity.columns = ["cluster_id", "cnt"]
    cluster_popularity["rank"] = cluster_popularity["cnt"].rank(method="dense") + 1
    cluster_popularity["w"] = 1 / np.log10(cluster_popularity["rank"])

    return cluster_popularity[["cluster_id", "w"]]

In [None]:
def get_table(train_path, recs_nn_path, recs_mf_path, 
              users, create_features_func, val_path=None):

    train = pd.read_parquet(train_path)
    train['product_price'] = train['product_price'].astype(np.float16)
    train['product_discount'] = train['product_discount'].astype(np.float16)

    recs_nn = pd.read_parquet(recs_nn_path)
    recs_mf = pd.read_parquet(recs_mf_path)
    clusters = pd.read_parquet(CLUSTERS_PATH)
    
    for df in [train, recs_nn, recs_mf]:
        df['ui'] = df['user_id'].astype(np.int64) * 10000 + df['cluster_id']
    
    table = create_table(train, recs_nn, recs_mf, users)
    del recs_nn
    del recs_mf

    table = create_features_func(table, train, users, clusters)
    del train
    del clusters

    X = table.drop(['user_id', 'ui'], axis=1).to_numpy(dtype=np.float32)

    if val_path is None:
        return X
    
    val = pd.read_parquet(val_path)
    val['ui'] = val['user_id'].astype(np.int64) * 10000 + val['cluster_id']
    y = np.array(table['ui'].isin(val['ui']))
    
    return X, y

In [None]:
def get_some_data(train_path, recs_nn_path, recs_mf_path, users):
    
    train = pd.read_parquet(train_path)
    recs_nn = pd.read_parquet(recs_nn_path)
    recs_mf = pd.read_parquet(recs_mf_path)
    clusters = pd.read_parquet(CLUSTERS_PATH)
    
    for df in [train, recs_nn, recs_mf]:
        df['ui'] = df['user_id'].astype(np.int64) * 10000 + df['cluster_id']
    
    table = create_table(train, recs_nn, recs_mf, users)
    
    already_bought = np.array(table['ui'].isin(train['ui']))
    cluster_weights = get_cluster_weights(train)
    weights = np.array(table.cluster_id.map(
        cluster_weights.set_index('cluster_id')['w']
    ).fillna(cluster_weights['w'].max()))
    del cluster_weights
    
    return (
        np.array(table['user_id']), 
        np.array(table['cluster_id']), 
        already_bought, 
        weights
    )

def _create_top_k(train_path, recs_nn_path, recs_mf_path, 
                 users, model_path, top_k_path, model_path2=None, k=120):
    
    X = get_table(train_path, recs_nn_path, recs_mf_path, users, create_features_simple)
    print(0)
    if model_path2 is None:
        ranker_model = pickle.load(open(model_path, 'rb'))
        pred = ranker_model.predict(X)
    else:
        ranker_model = pickle.load(open(model_path, 'rb'))
        pred1 = ranker_model.predict(X)
        ranker_model = pickle.load(open(model_path2, 'rb'))
        pred2 = ranker_model.predict(X)
        pred = np.mean([pred1, pred2], axis=0)
        
    del X
    print(1)
    users, items, already_bought, weights = get_some_data(
        train_path, recs_nn_path, recs_mf_path, users
    )
    recs = get_recs(pred, users, items, already_bought, weights, num_recs=k)
    
    users = []
    items = []
    for user_id in recs:
        users += [user_id] * len(recs[user_id])
        items += recs[user_id]
    del recs

    top_k = pd.DataFrame()
    top_k['user_id'] = users
    top_k['cluster_id'] = items
    top_k.to_parquet(top_k_path)

def create_top_k():
    
    test_ids = pd.read_csv(TEST_IDS_PATH)
    user_decoder = pickle.load(open(USER_DECODER_PATH, 'rb'))
    user_ecnoder = dict(zip(user_decoder, np.arange(len(user_decoder))))
    users = test_ids['id'].map(user_ecnoder)
    users1 = users[users % 2 == 0]
    users2 = users[users % 2 == 1]

    _create_top_k(TRAIN_TEST_PATH, RECS_NN_TEST_PATH, RECS_TEST_PATH,
                 users1, RANKER_MODEL1_PATH, TOPK_TEST1_PATH, 
                 RANKER_MODEL2_PATH)
    _create_top_k(TRAIN_TEST_PATH, RECS_NN_TEST_PATH, RECS_TEST_PATH,
              users2, RANKER_MODEL1_PATH, TOPK_TEST2_PATH, 
              RANKER_MODEL2_PATH)
    
    topk_test1 = pd.read_parquet(TOPK_TEST1_PATH)
    topk_test2 = pd.read_parquet(TOPK_TEST2_PATH)
    topk_test = pd.concat([topk_test1, topk_test2])
    topk_test.to_parquet(TOPK_TEST_PATH, index=False)

In [None]:
%%time
%%memit
create_top_k()

In [None]:
def create_fit_table(train, table, clusters, recs_nn_path, recs_mf_path):
    
    users = table.user_id.unique()
    
    recs_nn = pd.read_parquet(recs_nn_path)
    recs_mf = pd.read_parquet(recs_mf_path)
    
    for df in [train, recs_nn, recs_mf]:
        df['ui'] = df['user_id'].astype(np.int64) * 10000 + df['cluster_id']
        
    recs_nn['rnk'] = apply_rank('user_id', recs_nn)
    recs_mf['rnk'] = apply_rank('user_id', recs_mf)
    
    table['rnk'] = table['ui'].map(
        recs_nn.set_index('ui')['rnk']
    ).fillna(10000).astype(np.int16)
    
    table['score'] = table['ui'].map(
        recs_nn.set_index('ui')['scores']
    ).fillna(-100).astype(np.float32)
    
    mask = recs_nn.ui.isin(train.ui)
    
    recs_short = recs_nn[~mask]
    recs_short['rnk'] = apply_rank('user_id', recs_short)
    table['rnk2'] = table['ui'].map(
        recs_short.set_index('ui')['rnk']
    ).fillna(10000).astype(np.int16)
    
    recs_short = recs_nn[mask]
    recs_short['rnk'] = apply_rank('user_id', recs_short)
    table['rnk3'] = table['ui'].map(
        recs_short.set_index('ui')['rnk']
    ).fillna(10000).astype(np.int16)
    del recs_nn
    
    table['rnk4'] = table['ui'].map(
        recs_mf.set_index('ui')['rnk']
    ).fillna(10000).astype(np.int16)
    
    table['score2'] = table['ui'].map(
        recs_mf.set_index('ui')['score']
    ).fillna(-100).astype(np.float32)
    
    mask = recs_mf.ui.isin(train.ui)
    
    recs_short = recs_mf[~mask]
    recs_short['rnk'] = apply_rank('user_id', recs_short)
    table['rnk5'] = table['ui'].map(
        recs_short.set_index('ui')['rnk']
    ).fillna(10000).astype(np.int16)
    
    recs_short = recs_mf[mask]
    recs_short['rnk'] = apply_rank('user_id', recs_short)
    table['rnk6'] = table['ui'].map(
        recs_short.set_index('ui')['rnk']
    ).fillna(10000).astype(np.int16)
    del recs_mf
    
#     count_user_id = table.user_id.map(train['user_id'].value_counts()).fillna(0).astype(np.int16)
    table['count_item_id'] = (table.cluster_id.map(train['cluster_id'].value_counts()).fillna(0) / len(train)).astype(np.float32)

    table['num_orders'] = table['user_id'].map(
        train[['order_id', 'user_id']].drop_duplicates()['user_id'].value_counts()
    ).astype(np.int16)

    table['num_order_with_target_item'] = table['ui'].map(
        train[['order_id', 'ui']].drop_duplicates()['ui'].value_counts()
    ).fillna(0).astype(np.int16)
    
    last_order_ui = train[train.dt == \
          train['user_id'].map(
                train[['user_id', 'dt']].drop_duplicates().groupby('user_id').max()['dt']
    )].ui.unique()

    table['was_in_last_order'] = table['ui'].isin(last_order_ui).astype(np.int8)
    del last_order_ui

    prod_quantity = train.groupby('ui')['product_quantity'].sum()
    table['prod_quantity'] = table['ui'].map(prod_quantity).fillna(0).astype(np.int16)
    del prod_quantity
    
    mask = ~train[['user_id', 'order_id']].duplicated()
    
    table['user_retailer_most_common'] = table['user_id'].map(
        train[mask].groupby('user_id').retailer_id.apply(most_common)
    ).astype(np.int8)
    
    user_city_most_common = table['user_id'].map(
        train[mask].groupby('user_id').city_id.apply(most_common)
    ).astype(np.int16)
    
    del mask
    
#     item_retailer_vc = (train['cluster_id'] * 100 + train['retailer_id']).value_counts()
#     item_user_retailer = table['cluster_id'] * 100 + table['user_retailer_most_common']
#     table['user_item_retailer_vc'] = item_user_retailer.map(item_retailer_vc).fillna(0).astype(np.float32)
#     del item_retailer_vc
#     del item_user_retailer
    
    item_city_vc = (train['cluster_id'] * 100 + train['city_id']).value_counts()
    item_user_city = table['cluster_id'] * 100 + user_city_most_common
    table['user_item_city_vc'] = item_user_city.map(item_city_vc).fillna(0).astype(np.float32)
    del item_city_vc
    del item_user_city
    
    for col in ['cluster_size', 'd_mean', 'd_median']:
        table['cluster_' + col] = table['cluster_id'].map(
            clusters.set_index('cluster_id')[col]
        )
        table['cluster_' + col] = table['cluster_id'].map(
            clusters.set_index('cluster_id')[col]
        )
        
#     # user features 
    short_train = train[train.user_id.isin(users)]
    
    ui_dt = defaultdict(list)
    short_train3 = short_train[~short_train[['ui', 'order_id']].duplicated()]
    for ui, dt in zip(short_train3['ui'], short_train3['dt']):
        ui_dt[ui].append(dt)
    del short_train3
    table['ui_dt_diff_mean'] = table.ui.map(
        {key: get_mean_diff_dt(value) for key, value in ui_dt.items()}
    ).fillna(-1).astype(np.float32)
    del ui_dt
    
    table['product_quantity_sum'] = table.user_id.map(
          short_train.groupby('user_id').product_quantity.sum()
    )
    table['user_retailer_num'] = table.user_id.map(
        short_train.groupby('user_id').retailer_id.nunique()
    ).astype(np.int8)
#     table['user_city_num'] = table.user_id.map(
#         short_train.groupby('user_id').city_id.nunique()
#     ).astype(np.int8)
    table['user_product_price_mean'] = table.user_id.map(
        short_train.groupby('user_id').product_price.mean()
    )
#     table['user_product_price_sum'] = table.user_id.map(
#         short_train.product_price.astype(np.float32).groupby(short_train.user_id).sum()
#     )
    table['user_product_discount_mean'] = table.user_id.map(
        (short_train.product_discount != 0).groupby(short_train.user_id).mean()
    ).astype(np.float32)
    table['user_num_clusters'] = table['user_id'].map(
        short_train[['cluster_id', 'user_id']].drop_duplicates()['user_id'].value_counts()
    ).astype(np.int16)
    table['last_user_city_id'] = table['user_id'].map(
        short_train.groupby('user_id').city_id.last()
    )
    table['last_user_retailer_id'] = table['user_id'].map(
        short_train.groupby('user_id').retailer_id.last()
    )
#     table['user_mean_clusters_in_order'] = table['user_id'].map(
#         short_train.groupby(['user_id', 'order_id']).cluster_id.nunique().reset_index() \
#         .groupby('user_id').cluster_id.mean()
#     ).astype(np.float16)
    table['user_most_common_cluster_id'] = table['user_id'].map(
        short_train.groupby('user_id').cluster_id.apply(most_common)
    )
    del short_train
    
    # item features 
    
    mask = ~train[['user_id', 'order_id', 'cluster_id']].duplicated()
    
#     table['cluster_quantity_sum'] = table['cluster_id'].map(
#         train.groupby('cluster_id').product_quantity.sum().astype(np.float32)
#     )
    table['cluster_quantity_mean'] = table['cluster_id'].map(
        train.groupby('cluster_id').product_quantity.mean().astype(np.float32)
    )

    for retailer_id in [0, 1, 7]: # [1, 7, 0, 16, 6, 4, 19, 12, 15]
        table[f'cluster_retailer_{retailer_id}'] = table['cluster_id'].map(
            (train[mask].retailer_id == retailer_id).groupby(train[mask].cluster_id).mean(
            ).astype(np.float32)
        )
    
    table['cluster_city_count'] = table['cluster_id'].map(
        train[mask].groupby('cluster_id').city_id.nunique()
    ).astype(np.float32)
    

#     table['last_dt_delta'] = table['cluster_id'].map(
#         train.dt.max() - train.groupby('cluster_id').dt.max()
#     ).astype(np.float32)

    table['cluster_num_stores'] = table['cluster_id'].map(
        train[mask].groupby('cluster_id').store_id.nunique()
    ).astype(np.float32)
    del mask

    table['cluster_product_price_mean'] = table['cluster_id'].map(
        train.groupby('cluster_id').product_price.mean()
    ).astype(np.float32)

    table['cluster_mean_discount'] = table['cluster_id'].map(
        (train.product_discount == 0).groupby(train.cluster_id).mean().astype(np.float32)
    )

    table['num_users_bought_cluster'] = table['cluster_id'].map(
        train.groupby('cluster_id').user_id.nunique()
    ).fillna(0).astype(np.float32)

    table['num_orders_cluster'] = table['cluster_id'].map(
        train.groupby('cluster_id').order_id.nunique()
    ).fillna(0).astype(np.float32)
    
#     more features
    
    mask = ~train[['order_id', 'cluster_id']].duplicated()
    short_train = train[mask]

    city_retailer = short_train.city_id.astype(np.int16) * 100 + short_train.retailer_id
    city_retailer_cluster = city_retailer.astype(np.int64) * 10000 + short_train.cluster_id

    city_retailer_user = user_city_most_common.astype(np.int16) * 100 + \
        table['user_retailer_most_common']
    city_retailer_cluster_user = city_retailer_user.astype(np.int64)*10000 + table.cluster_id

    table['f1'] = city_retailer_user.map(
        city_retailer.value_counts()
    ).fillna(0).astype(np.float32)

    table['f2'] = city_retailer_cluster_user.map(
        city_retailer_cluster.value_counts()
    ).fillna(0).astype(np.float32)

    table['f3'] = table['f2'] \
        / table['f1'] 
    
    
    del city_retailer_user
    del city_retailer_cluster_user

    city_retailer_user = table['last_user_city_id'].astype(np.int16) * 100 + \
        table['last_user_retailer_id']
    city_retailer_cluster_user = city_retailer_user.astype(np.int64)*10000 + table.cluster_id

    f4 = city_retailer_user.map(
        city_retailer.value_counts()
    ).fillna(0).astype(np.float32)

    table['f5'] = city_retailer_cluster_user.map(
        city_retailer_cluster.value_counts()
    ).fillna(0).astype(np.float32)

    table['f6'] = table['f5'] \
        / f4 
    del f4
    
    del city_retailer
    del city_retailer_user
    del city_retailer_cluster_user
    del city_retailer_cluster
    
    #more and more features
    
    short_train = train[train.user_id.isin(users)]
    short_train2 = short_train[~short_train[['user_id', 'order_id']].duplicated()]
    
    table['time_from_order_with_target_item'] = table.ui.map(
        short_train.dt.max() - short_train.groupby('ui').dt.last()
    ).fillna(-1).astype(np.float32)
    
    user_dt = defaultdict(list)
    for user_id, dt in zip(short_train2['user_id'], short_train2['dt']):
        user_dt[user_id].append(dt)
    del short_train2
    
    table['user_dt_diff_mean'] = table.user_id.map(
        {key: get_mean_diff_dt(value) for key, value in user_dt.items()}
    ).fillna(-1).astype(np.float32)
    del user_dt
    
    table['share_order_with_target_item'] = (
        table['num_order_with_target_item'] / table['num_orders'] 
    ).astype(np.float32)
    
    table['ui_num'] = table.ui.map(short_train.ui.value_counts()).fillna(0).astype(np.int16)
#     table['share_clusters_with_target_item'] = (
#         table['ui_num']/ table['count_user_id']
#     ).astype(np.float32)

    table['share_quatity'] = (
        table['prod_quantity'] / table['product_quantity_sum']
    ).astype(np.float32)

    short_train4 = short_train[
        short_train.user_id.map(short_train.groupby('user_id').retailer_id.last()) == \
        short_train.retailer_id
    ]
        
    table['num_order_with_last_retailer'] = table['user_id'].map(
        short_train4[['user_id', 'order_id']].drop_duplicates()['user_id'].value_counts()
    ).astype(np.int16)

    table['num_order_with_target_item_last_retailer'] = table['ui'].map(
        short_train4[['order_id', 'ui']].drop_duplicates()['ui'].value_counts()
    ).fillna(0).astype(np.int16)
    del short_train4

    table['share_order_with_target_item_last_retailer'] = (
        table['num_order_with_target_item_last_retailer'] / table['num_order_with_last_retailer']
    ).astype(np.float32)
    

    
    
    ui_vc = train.ui.value_counts()
    rnk_vc = train[['user_id', 'ui', 'cluster_id']].drop_duplicates()
    rnk_vc['vc'] = rnk_vc.ui.map(ui_vc)
    rnk_vc = rnk_vc.sort_values(['user_id', 'vc'], ascending=False)
    rnk_vc['rnk_user_id_ui'] = apply_rank('user_id', rnk_vc)
    table['rnk_user_id_ui'] = table.ui.map(rnk_vc.set_index('ui')['rnk_user_id_ui']
                                          ).fillna(10000).astype(np.int16)
    del ui_vc

    rnk_vc = rnk_vc.sort_values(['cluster_id', 'vc'], ascending=False)
    rnk_vc['rnk_cluster_id_ui'] = apply_rank('cluster_id', rnk_vc)
    table['rnk_cluster_id_ui'] = table.ui.map(rnk_vc.set_index('ui')['rnk_cluster_id_ui']
                                          ).fillna(10000).astype(np.int16)
    del rnk_vc

    rnk_vc = train['cluster_id'].value_counts().to_frame()
    rnk_vc['rnk_cluster_id'] = np.arange(len(rnk_vc))
    table['rnk_cluster_id'] = table.cluster_id.map(rnk_vc['rnk_cluster_id']
                                                  ).fillna(10000).astype(np.int16)
    del rnk_vc

    cluster_city_vc = (train['city_id'].astype(np.int32) * 10000 + train['cluster_id']
                      ).value_counts()
    rnk_vc = train[['city_id', 'cluster_id']].drop_duplicates()
    rnk_vc['cluster_city'] = rnk_vc['city_id'].astype(np.int32) * 10000 + rnk_vc['cluster_id']
    rnk_vc['vc'] = rnk_vc['cluster_city'].map(cluster_city_vc)
    rnk_vc = rnk_vc.sort_values(['city_id', 'vc'], ascending=False)
    rnk_vc['rnk_cluster_city'] = apply_rank('city_id', rnk_vc)
    user_city_cluster = table['last_user_city_id'].astype(np.int32) * 10000 \
        + table['cluster_id']
    table['rnk_cluster_city'] = user_city_cluster.map(
        rnk_vc.set_index('cluster_city')['rnk_cluster_city']
    ).fillna(10000).astype(np.int16)
    del cluster_city_vc
    del rnk_vc
    del user_city_cluster

    cluster_retailer_vc = (train['retailer_id'].astype(np.int32) * 10000 + train['cluster_id']
                      ).value_counts()
    rnk_vc = train[['retailer_id', 'cluster_id']].drop_duplicates()
    rnk_vc['cluster_retailer'] = rnk_vc['retailer_id'].astype(np.int32) * 10000 + rnk_vc['cluster_id']
    rnk_vc['vc'] = rnk_vc['cluster_retailer'].map(cluster_retailer_vc)
    rnk_vc = rnk_vc.sort_values(['retailer_id', 'vc'], ascending=False)
    rnk_vc['rnk_cluster_retailer'] = apply_rank('retailer_id', rnk_vc)
    user_retailer_cluster = table['last_user_retailer_id'].astype(np.int32) * 10000 \
        + table['cluster_id']
    table['rnk_cluster_retailer'] = user_retailer_cluster.map(
        rnk_vc.set_index('cluster_retailer')['rnk_cluster_retailer']
    ).fillna(10000).astype(np.int16)
    user_retailer_cluster = table['user_retailer_most_common'].astype(np.int32) * 10000 \
        + table['cluster_id']
    table['rnk_cluster_retailer2'] = user_retailer_cluster.map(
        rnk_vc.set_index('cluster_retailer')['rnk_cluster_retailer']
    ).fillna(10000).astype(np.int16)
    del cluster_retailer_vc
    del rnk_vc
    del user_retailer_cluster

    cluster_retailer_city_vc = (train['city_id'].astype(np.int64) * 10000000 + \
        train['retailer_id'].astype(np.int64) * 10000 + \
        train['cluster_id']).value_counts()
    rnk_vc = train[['retailer_id', 'cluster_id', 'city_id']].drop_duplicates()
    rnk_vc['cluster_retailer_city'] = (rnk_vc['city_id'].astype(np.int64) * 10000000 + \
        rnk_vc['retailer_id'].astype(np.int64) * 10000 + \
        rnk_vc['cluster_id'])
    rnk_vc['vc'] = rnk_vc['cluster_retailer_city'].map(cluster_retailer_city_vc)
    rnk_vc['retailer_city'] = (rnk_vc['city_id'].astype(np.int64) * 1000 + \
        rnk_vc['retailer_id'].astype(np.int64))
    rnk_vc = rnk_vc.sort_values(['retailer_city', 'vc'], ascending=False)
    rnk_vc['rnk_cluser_city_retailer'] = apply_rank('retailer_city', rnk_vc)
    user_retailer_city_cluster = (table['last_user_city_id'].astype(np.int64) * 10000000 + \
        table['last_user_retailer_id'].astype(np.int64) * 10000 + \
        table['cluster_id'])
    table['rnk_cluster_retailer_city'] = user_retailer_city_cluster.map(
        rnk_vc.set_index('cluster_retailer_city')['rnk_cluser_city_retailer']
    ).fillna(10000).astype(np.int16)
    user_retailer_city_cluster = (table['last_user_city_id'].astype(np.int64) * 10000000 + \
        table['user_retailer_most_common'].astype(np.int64) * 10000 + \
        table['cluster_id'])
    table['rnk_cluster_retailer_city2'] = user_retailer_city_cluster.map(
        rnk_vc.set_index('cluster_retailer_city')['rnk_cluser_city_retailer']
    ).fillna(10000).astype(np.int16)
    del cluster_retailer_city_vc
    del rnk_vc
    del user_retailer_city_cluster
    
    short_train = train[['cluster_id', 'user_id']][
        train.user_id.isin(users) & (~train[['ui', 'order_id']].duplicated())
    ]
    
    vc = short_train['user_id'].value_counts()
    for cluster_id in TOP_K_CLUSTERS[:40]:
        table[f'f102_{cluster_id}'] = table.user_id.map(
            (short_train.cluster_id == cluster_id).groupby(short_train.user_id).sum() / vc
        ).astype(np.float16)
    
    return table

In [None]:
def predict():

    train_test = pd.read_parquet(TRAIN_TEST_PATH)
    train_test['product_price'] = train_test['product_price'].astype(np.float32)
    train_test['product_discount'] = train_test['product_discount'].astype(np.float32)
    clusters = pd.read_parquet(CLUSTERS_PATH)
    table = pd.read_parquet(TOPK_TEST_PATH)
    user_decoder = pickle.load(open(USER_DECODER_PATH, 'rb'))
    
    for df in [train_test, table]:
        df['ui'] = df['user_id'].astype(np.int64) * 10000 + df['cluster_id']
        
    print(1)
    table = create_fit_table(train_test, table, clusters, 
                             RECS_NN_TEST_PATH, RECS_TEST_PATH)
    print(2)
    del clusters
    
    already_bought = table['ui'].isin(train_test['ui'])
    
    cluster_weights = get_cluster_weights(train_test)
    del train_test
 
    weights = table.cluster_id.map(cluster_weights.set_index('cluster_id')['w']).fillna(
        cluster_weights['w'].max()
    )
    del cluster_weights
    print(3)
    X = table.drop(['user_id', 'ui'], axis=1).to_numpy(dtype=np.float32)
    print(type(X[0][0]))
    print(4)
    users = np.array(table['user_id'])
    items = np.array(table['cluster_id'])
    del table
    print(5)
    
    ranker_model = pickle.load(open(RANKER_MODEL_PATH, 'rb'))
    pred = ranker_model.predict(X)
    
    recs = get_recs(pred, users, items, already_bought, weights)

    submit = pd.DataFrame()
    submit['user_id'] = pd.Series(recs.keys())
    submit['id'] = user_decoder[submit['user_id']]

    submit['target'] = [';'.join([str(i) for i in values]) for values in recs.values()]
    submit[['id', 'target']].to_csv(SUBMIT_PATH, index=False)

In [None]:
%%time
%memit predict()