In [1]:
import sys
sys.path.insert(0,'./src')

import numpy as np
import pandas as pd
import dask.dataframe as dd
import datetime
import functools

from typing import List, Union, Optional, Dict

In [2]:
import featurelib as fl
from feature_impl import dask_groupby
from data_config import data_config

In [3]:
receipts = dd.read_parquet('./csv/receipts.parquet')

campaigns      = dd.read_csv('./csv/campaigns.csv')
client_profile = dd.read_csv('./csv/client_profile.csv')
products       = dd.read_csv('./csv/products.csv')
purchases      = dd.read_parquet('./csv/purchases.parquet')

# purchases_head = purchases.head(100)

In [4]:
TABLES = {
    'receipts': receipts,
    'campaigns': campaigns,
    'client_profile': client_profile,
    'products': products,
    'purchases': purchases,
}

engine = fl.Engine(tables=TABLES)

In [25]:
k = 10
dd_meta = {k:v for k,v in purchases.dtypes.items()}
sampled_series = purchases.map_partitions(lambda x: x.sample(n=min(k, x.shape[0])), meta=dd_meta)
sampled_series.compute()

Unnamed: 0,client_id,transaction_id,transaction_datetime,regular_points_received,express_points_received,regular_points_spent,express_points_spent,purchase_sum,store_id,product_id,product_quantity,trn_sum_from_iss,trn_sum_from_red
5761184,82571,6166424,2018-12-26 13:54:29,1.4,0.0,0.0,0.0,286.0,13697,3698,1.0,43.0,
6523823,415241,5606984,2019-03-02 16:19:35,2.2,0.0,0.0,0.0,458.0,2594,40768,2.0,55.0,
21033131,180332,1798815,2018-12-18 14:54:17,1.6,0.0,0.0,0.0,320.0,1976,3305,1.0,27.0,
164539,271261,2203247,2019-02-01 08:05:33,6.0,0.0,0.0,0.0,603.62,10624,7436,1.0,38.0,
3607049,300875,3452248,2019-01-11 12:33:12,23.4,0.0,0.0,0.0,2345.0,1926,3851,2.0,232.0,
34285597,405590,10001424,2018-11-25 07:08:43,1.7,0.0,0.0,0.0,272.64,4444,10810,1.0,5.0,
7640970,352257,6855433,2019-02-03 11:47:50,8.3,0.0,0.0,0.0,834.0,13251,2313,1.0,80.0,
4402095,185564,7271969,2019-03-08 08:05:02,30.8,0.0,0.0,0.0,2838.42,4750,247,1.0,70.0,
24098728,149698,5253845,2019-03-05 09:35:43,0.4,0.0,0.0,0.0,90.0,2386,29203,1.0,60.0,
2929139,165245,9846371,2019-01-07 06:49:19,5.9,0.0,0.0,0.0,599.0,5247,5190,1.0,140.0,


In [368]:
class DayOfWeekReceiptsCalcer(fl.DateFeatureCalcer):
    name = 'day_of_week_receipts'
    keys = ['client_id']
    
    def __init__(self, delta: int, **kwargs):
        self.delta = delta
        super().__init__(**kwargs)

    def compute(self) -> dd.DataFrame:
        receipts = self.engine.get_table('receipts')

        date_to = datetime.datetime.combine(self.date_to, datetime.datetime.min.time())
        date_from = date_to - datetime.timedelta(days=self.delta)
        date_mask = (
                        (receipts['transaction_datetime'] >= date_from) & 
                        (receipts['transaction_datetime'] < date_to) & 
                        (receipts['purchase_sum'] > 0)
        )

        features = (
            receipts
            .loc[date_mask, ['client_id', 'transaction_datetime', 'purchase_sum']]
            .assign(dayofweek=lambda x: x['transaction_datetime'].dt.dayofweek)
            .categorize(columns=['dayofweek'])
        ).pivot_table(index=self.keys[0], 
                      columns='dayofweek', 
                      values='purchase_sum', 
                      aggfunc='count'
                     )
        
        orderedcols = features.columns.categories.values
        features = features[orderedcols]
        
        column_names = [
            f'purchases_count_dw{categ}__{self.delta}d' 
            for categ in orderedcols
        ]
        features.columns = column_names
        features = features.reset_index()
        
        return features
    
    
class FavouriteStoreCalcer(fl.DateFeatureCalcer):
    name = 'favourite_store'
    keys = ['client_id'] 
            
    def __init__(self, delta: int, **kwargs):
        self.delta = delta
        super().__init__(**kwargs)

    @staticmethod        
    def getFavourStore(x):
        visits2stores = {}
        stores = x['store_id'].value_counts().to_dict()
        for k,v in stores.items():
            visits2stores.setdefault(v, []).append(k)
        max_visited = np.array(list(visits2stores.keys())).max()       
        res = max(visits2stores[max_visited]) 
        return pd.Series(res, index=["favourite_store_id"])           
            
    def compute(self) -> dd.DataFrame:
        receipts = self.engine.get_table('receipts')
        date_to = datetime.datetime.combine(self.date_to, datetime.datetime.min.time())
        date_from = date_to - datetime.timedelta(days=self.delta)
        date_mask = (
                        (receipts['transaction_datetime'] >= date_from) & 
                        (receipts['transaction_datetime'] < date_to) & 
                        (receipts['purchase_sum'] > 0)
        )         
#         features = receipts.loc[date_mask, self.keys+['store_id']].groupby(self.keys).apply(self.getFavourStore, 
#                                                                                             meta={'favourite_store_id': 'int32'}).compute() 
        
        features = receipts.loc[date_mask, self.keys+['store_id']]  
        
        features = dask_groupby(
            features,
            by=['client_id','store_id'],
            config={
                "store_id": ["count"]
            }
        )             
        features = features.reset_index()
        
        maxmask = dask_groupby(
            features,
            by=['client_id'],
            config={
                "store_id_count": ["max"]
            }
        )         
        maxmask = maxmask.reset_index()
        maxmask = maxmask.drop_duplicates(['client_id', 'store_id_count_max'])

        features = \
        features.merge(
                maxmask[['client_id', 'store_id_count_max']],
                on=['client_id'],
                how='inner'
            )
        
        date_mask = (features['store_id_count'] == features['store_id_count_max'])
        features = features.loc[date_mask,['client_id','store_id']]

        features = dask_groupby(
            features,
            by=['client_id'],
            config={
                "store_id": ["max"]
            }
        )             
        features = features.reset_index()        
        features = features.rename(columns={"store_id_max": f"favourite_store_id__{self.delta}d"})
        return features

class TargetFromCampaignsCalcer(fl.DateFeatureCalcer):
    name = 'target_from_campaigns'
    keys = ['client_id']
    
    def compute(self) -> dd.DataFrame:
        campaigns = self.engine.get_table('campaigns')
        date_mask = (dd.to_datetime(campaigns['treatment_date'], format='%Y-%m-%d').dt.date == self.date_to)

        result = (
            self.engine.get_table('campaigns')
            .loc[date_mask]
            [[
                'client_id', 'treatment_flg',
                'target_purchases_sum', 'target_purchases_count', 'target_campaign_points_spent'
            ]]
        )
        return result    
    
    
class AgeGenderCalcer(fl.FeatureCalcer):
    name = 'age_gender'
    keys = ['client_id']

    def compute(self) -> dd.DataFrame:
        client_profile = self.engine.get_table('client_profile')
        return client_profile[self.keys + ['age', 'gender']]
    

In [369]:
data_config['calcers']

[{'name': 'day_of_week_receipts',
  'args': {'delta': 30,
   'date_to': datetime.date(2019, 3, 19),
   'engine': <featurelib.Engine at 0x7f6de42ced50>}},
 {'name': 'favourite_store',
  'args': {'delta': 30,
   'date_to': datetime.date(2019, 3, 19),
   'engine': <featurelib.Engine at 0x7f6de42ced50>}},
 {'name': 'age_gender',
  'args': {'engine': <featurelib.Engine at 0x7f6de42ced50>}},
 {'name': 'target_from_campaigns',
  'args': {'date_to': datetime.date(2019, 3, 21),
   'engine': <featurelib.Engine at 0x7f6de42ced50>}}]

In [370]:
fl.register_calcer(DayOfWeekReceiptsCalcer)
fl.register_calcer(FavouriteStoreCalcer)
fl.register_calcer(TargetFromCampaignsCalcer)
fl.register_calcer(AgeGenderCalcer)

In [371]:
# dw = DayOfWeekReceiptsCalcer(delta=7,
#                              date_to=datetime.date(2019, 3, 19), 
#                              engine=engine)

fs = FavouriteStoreCalcer(delta=7,
                          date_to=datetime.date(2019, 3, 19), 
                          engine=engine)

In [372]:
fav_store = fs.compute()

In [373]:
fav_store.head()

Unnamed: 0,client_id,favourite_store_id__7d
0,0,81
1,1,5365
2,2,1351
3,4,8601
4,6,12705


In [10]:
raw_features = fl.compute_features(engine, features_config=data_config['calcers'])

In [11]:
df = raw_features.head(100)

In [94]:
df

Unnamed: 0,client_id,purchases_count_dw0__30d,purchases_count_dw1__30d,purchases_count_dw2__30d,purchases_count_dw3__30d,purchases_count_dw4__30d,purchases_count_dw5__30d,purchases_count_dw6__30d,favourite_store_id__30d,age,gender,treatment_flg,target_purchases_sum,target_purchases_count,target_campaign_points_spent,weekend_purchases_ratio__30d
0,0,0.0,0.0,0.0,1.0,1.0,0.0,0.0,81,45,U,0,909.00,1,0.0,0.000000
1,1,0.0,2.0,1.0,0.0,1.0,1.0,4.0,5365,72,F,1,791.00,2,-0.0,0.555556
2,2,0.0,1.0,0.0,0.0,0.0,0.0,0.0,1351,68,F,0,0.00,0,0.0,0.000000
3,3,0.0,0.0,0.0,0.0,0.0,1.0,0.0,7350,48,U,1,654.00,1,-0.0,1.000000
4,4,1.0,0.0,1.0,1.0,1.0,2.0,0.0,8601,60,F,1,0.00,0,-0.0,0.333333
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
95,95,0.0,0.0,0.0,0.0,1.0,0.0,2.0,7240,27,U,0,0.00,0,0.0,0.666667
96,96,1.0,0.0,0.0,1.0,0.0,0.0,1.0,5252,49,U,1,0.00,0,-0.0,0.333333
97,97,1.0,0.0,1.0,0.0,1.0,1.0,1.0,13045,25,U,1,593.79,3,-0.0,0.400000
98,98,0.0,0.0,0.0,0.0,1.0,1.0,0.0,324,35,F,0,0.00,0,0.0,0.500000


In [95]:
data_config_part = data_config['transforms'][:2]
data_config_looe = data_config['transforms'][3:]
data_config_looe

[{'name': 'loo_mean_target_encoder',
  'args': {'col_categorical': 'gender',
   'col_target': 'target_purchases_count',
   'col_result': 'gender__mte__target_purchases_count'}}]

In [96]:
import sklearn.base as skbase
from category_encoders.leave_one_out import LeaveOneOutEncoder
import functools

In [323]:
class ExpressionTransformer(skbase.BaseEstimator, skbase.TransformerMixin):
    expression: str 
    col_result: str
        
    def __init__(self, function, **params):
        self.function = functools.partial(function, **params)

    def fit(self, *args, **kwargs):
        return self

    def transform(self, *args, **kwargs) -> pd.DataFrame:
        return self.function(*args, **kwargs) 

class LOOMeanTargetEncoder(skbase.BaseEstimator, skbase.TransformerMixin):
         
    def __init__(self, function, **params):
        self.col_categorical, self.col_target, self.col_result = params.values()
#         self.function = functools.partial(function, **params)
        self.LOOE_encoder = LeaveOneOutEncoder()

    def fit(self, data: pd.DataFrame, *args, **kwargs):
        _df = data
        self.LOOE_encoder.fit(_df[self.col_categorical], _df[self.col_target])
        return self

    def transform(self, data: pd.DataFrame, *args, **kwargs) -> pd.DataFrame:
        _df = data
        looe_res = self.LOOE_encoder.transform(_df[self.col_categorical]) 
        _df[self.col_result] = looe_res
        return _df

In [324]:
def expression_transformer(function):
    def builder(**params):
        return ExpressionTransformer(function, **params)
    return builder 

def looe_transformer(function):
    def builder(**params):
        return LOOMeanTargetEncoder(function, **params)
    return builder 

In [325]:
@expression_transformer
def transform_cols(data: pd.DataFrame, expression: str, col_result: str):
    col_result = col_result
    df_name = "data"
    data[col_result] = pd.eval(expression.format(d=df_name))
    return data

@looe_transformer
def transform_cols_looe(data: pd.DataFrame, col_categorical: str, col_target: str, col_result: str):
    return data

In [326]:
fl.register_transformer(transform_cols, 'expression')
fl.register_transformer(transform_cols_looe, 'loo_mean_target_encoder')

In [327]:
pipe = fl.build_pipeline(data_config_looe)

In [328]:
data_config_looe

[{'name': 'loo_mean_target_encoder',
  'args': {'col_categorical': 'gender',
   'col_target': 'target_purchases_count',
   'col_result': 'gender__mte__target_purchases_count'}}]

In [329]:
out = pipe.fit_transform(df.copy())

In [330]:
out

Unnamed: 0,client_id,purchases_count_dw0__30d,purchases_count_dw1__30d,purchases_count_dw2__30d,purchases_count_dw3__30d,purchases_count_dw4__30d,purchases_count_dw5__30d,purchases_count_dw6__30d,favourite_store_id__30d,age,gender,treatment_flg,target_purchases_sum,target_purchases_count,target_campaign_points_spent,weekend_purchases_ratio__30d,gender__mte__target_purchases_count
0,0,0.0,0.0,0.0,1.0,1.0,0.0,0.0,81,45,U,0,909.00,1,0.0,0.000000,1.422222
1,1,0.0,2.0,1.0,0.0,1.0,1.0,4.0,5365,72,F,1,791.00,2,-0.0,0.555556,0.975000
2,2,0.0,1.0,0.0,0.0,0.0,0.0,0.0,1351,68,F,0,0.00,0,0.0,0.000000,0.975000
3,3,0.0,0.0,0.0,0.0,0.0,1.0,0.0,7350,48,U,1,654.00,1,-0.0,1.000000,1.422222
4,4,1.0,0.0,1.0,1.0,1.0,2.0,0.0,8601,60,F,1,0.00,0,-0.0,0.333333,0.975000
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
95,95,0.0,0.0,0.0,0.0,1.0,0.0,2.0,7240,27,U,0,0.00,0,0.0,0.666667,1.422222
96,96,1.0,0.0,0.0,1.0,0.0,0.0,1.0,5252,49,U,1,0.00,0,-0.0,0.333333,1.422222
97,97,1.0,0.0,1.0,0.0,1.0,1.0,1.0,13045,25,U,1,593.79,3,-0.0,0.400000,1.422222
98,98,0.0,0.0,0.0,0.0,1.0,1.0,0.0,324,35,F,0,0.00,0,0.0,0.500000,0.975000


In [331]:
LOOE_encoder = LeaveOneOutEncoder()
LOOE_encoder.fit(df['gender'], df['target_purchases_count'])
test_looe = LOOE_encoder.transform(df['gender'])

In [332]:
train_looe

Unnamed: 0,gender
0,1.431818
1,0.948718
2,1.000000
3,1.431818
4,1.000000
...,...
95,1.454545
96,1.454545
97,1.386364
98,1.000000
