Тема: Свой Feature Store

Видео лекции:  
https://www.youtube.com/watch?v=3oI4RY74L-I
    
Видео семинара:  
нет

По разобранному в лекции шаблону требуется реализовать расчет новых признаков и новых преобразований

In [1]:
import featurelib as fl
import dask.dataframe as dd
import datetime
import pandas as pd

from typing import List, Dict, Union#, Optional

## Utils

In [2]:
def dask_groupby(
    data: dd.DataFrame,
    by: List[str],
    config: Dict[str, Union[str, List[str]]]
) -> dd.DataFrame:
    data_ = data.copy()
    dask_agg_config = dict()

    for col, aggs in config.items():
        aggs = aggs if isinstance(aggs, list) else [aggs]
        for agg in aggs:
            fictious_col = f'{col}_{agg}'
            data_ = data_.assign(**{fictious_col: lambda d: d[col]})
            dask_agg_config[fictious_col] = agg

    result = data_.groupby(by=by).agg(dask_agg_config)
    return result

# Расчет новых признаков (calcers)

## DayOfWeekReceiptsCalcer

In [3]:
# (“purchases”) purchases.parquet - детальная (до товара) история покупок.
# (“receipts”) receipts.parquet - не детальная (чеки) история продаж.
# (“products”) products.csv - справочник товаров.
# (“client_profile”) client_profile.csv - соцдем профиль клиентов.
# (“campaigns”) campaigns.csv - результаты кампании.

In [62]:
class DayOfWeekReceiptsCalcer(fl.DateFeatureCalcer):
    name = 'day_of_week_receipts'
    keys = ['client_id']
            
    def __init__(self, delta: int, **kwargs):
        self.delta = delta
        super().__init__(**kwargs)
    # Аргументы конструктора
    #     engine: fl.Engine, # объект соединения с данными.
    #     date_to: datetime.date, # дата, к которой рассчитываются признаки.
    #     delta: int # длительность (в днях) периода времени, по которому считаются призн
            
    # Описание результата расчета
    def compute(self) -> dd.DataFrame:
    # implement me
        receipts = self.engine.get_table('receipts')

        date_to = datetime.datetime.combine(self.date_to, datetime.datetime.min.time())
        date_from = date_to - datetime.timedelta(days=self.delta)
        date_mask = (receipts['transaction_datetime'] >= date_from) & (receipts['transaction_datetime'] < date_to)
            
        features = (
            receipts
            .loc[date_mask]
            .assign(weekday=lambda d: d['transaction_datetime'].dt.weekday)
        )
        # Чтобы можно было сделать pivot_table по колонке 'weekday'
        features = features.categorize(columns=['weekday'])
        features = features.pivot_table(
            index='client_id', columns='weekday', values='transaction_id', aggfunc='count'
        )
        
        features = features[[0, 1, 2, 3, 4, 5, 6]]
        features.columns = [f'purchases_count_dw0__{self.delta}d',
                            f'purchases_count_dw1__{self.delta}d',
                            f'purchases_count_dw2__{self.delta}d',
                            f'purchases_count_dw3__{self.delta}d',
                            f'purchases_count_dw4__{self.delta}d',
                            f'purchases_count_dw5__{self.delta}d',
                            f'purchases_count_dw6__{self.delta}d',
                           ]
        features = features.reset_index()
        return features


In [5]:
# Метод compute должен возвращать dask dataframe с колонками
# client_id
# 7 признаков "purchases_count_dw{day}__{delta}d" - число покупок (чеков) в день недели с ном
# day (от 0 до 6, пн = 0), за delta дней до даты date_to (не включая день date_to).
# Если клиент не совершал покупок в период [date_to - delta, date_to), то для этого клиента не должно
# быть строки в выходной таблице

## FavouriteStoreCalcer

In [6]:
# Объявление
class FavouriteStoreCalcer(fl.DateFeatureCalcer):
    name = 'favourite_store'
    keys = ['client_id']
    
    def __init__(self, delta: int, **kwargs):
        self.delta = delta
        super().__init__(**kwargs)
    
#     # Аргументы конструктора
#     engine: fl.Engine, # объект соединения с данными.
#     date_to: datetime.date, # дата, к которой рассчитываются признаки.
#     delta: int # длительность (в днях) периода времени, по которому считаются признаки
        
    # Описание результата расчета
    def compute(self) -> dd.DataFrame:
    # implement me
        
        receipts = self.engine.get_table('receipts')

        date_to = datetime.datetime.combine(self.date_to, datetime.datetime.min.time())
        date_from = date_to - datetime.timedelta(days=self.delta)
        date_mask = (receipts['transaction_datetime'] >= date_from) & (receipts['transaction_datetime'] < date_to)
    
        features = receipts.loc[date_mask]
        
        # Количество посещений клиентом каждого магазина
        # 'client_id', 'store_id', 'store_id_count'
        features = dask_groupby(
            features,
            by=['client_id', 'store_id'],
            config={
                "store_id": "count"
            }
        ).reset_index()
        
        # Магазины клиента с макимальным кол-вом посещений
        # 'client_id' 'store_id_count_max'
        features2 = dask_groupby(
            features,
            by=['client_id'],
            config={
                "store_id_count": "max"
            }
        ).reset_index()
        
        # Магазины с максимальным количеством покупок для каждого клиента
        features = (
            features
            .merge(
                features2,
                left_on=['client_id', 'store_id_count'],
                right_on=['client_id', 'store_id_count_max'],
                how='inner'
            )
        )[['client_id', 'store_id']]
        
        # Если есть дубли магазинов, то оставляю один с максимальным id
        features = dask_groupby(
            features,
            by=['client_id'],
            config={
                "store_id": "max"
            }
        ).reset_index()

        features.columns = ['client_id',
                    f'favourite_store_id__{self.delta}d',
                   ]
        
        return features

In [7]:
# Метод compute должен возвращать dask DataFrame с колонками
#     client_id
#     "favourite_store_id__{delta}d" - наиболее часто посещаемый (по числу покупок) клиентом магазин. Если таких
# несколько, выбирать магазин с максимальным номером.

# Если клиент не совершал покупок в период [date_to - delta, date_to), то для этого клиента не должно быть строки в
# выходной таблице

# Преобразование данных

## ExpressionTransformer

In [8]:
# Объявление
import sklearn.base as skbase
class ExpressionTransformer(skbase.BaseEstimator, skbase.TransformerMixin):
    
    def __init__(self, expression: str, col_result: str):
        self.expression = expression
        self.col_result = col_result
    
#     # Аргументы конструктора
#     expression: str # “регулярное” выражение для расчета признака. (пример см. ниже)
#     col_result: str, # название колонки, в которой будет сохранен результат
        
    # Метод fit (пустой). Ничего не делает. Возвращает сам объект.
    def fit(self, *args, **kwargs):
        return self
    
    # Описание результата расчета
    def transform(self, data: pd.DataFrame, *args, **kwargs) -> pd.DataFrame:
        # implement me
        data[self.col_result] = eval(self.expression.format(d='data'))
        return data

In [9]:
# Возвращает измененный pandas DataFrame.
# В колонку col_result будет записан результат расчета по формуле expression.
# Пример expression:
#     "({d}['purchases_count_dw5__60d'] + {d}['purchases_count_dw6__60d'])" - формулы для расчета числа
#     покупок в выходные (сб и вс) за последние 60 дней (см. DayOfWeekReceiptsCalcer)
# От формулы ожидается, в ней вместо {d} будет будет подставляться датасет для трансформации
# производиться расчет новой колонки.

## LOOMeanTargetEncoder

In [10]:
# Предлагается написать обертку для LeaveOneOutEncoder из библиотеки category_encoders (см.
# https://contrib.scikit-learn.org/category_encoders/leaveoneout.html)

In [11]:
# !pip install category_encoders

In [12]:
# Объявление
import sklearn.base as skbase
import category_encoders as ce

class LOOMeanTargetEncoder(skbase.BaseEstimator, skbase.TransformerMixin):
    
    def __init__(self, col_categorical: str, col_target: str, col_result: str, **loo_params):
        self.col_categorical = col_categorical
        self.col_target = col_target
        self.col_result = col_result
        self.encoder_ = ce.LeaveOneOutEncoder(cols=[col_categorical], **(loo_params or {}))
    
#     # Аргументы конструктора
#     col_categorical: str, # название колонки с категориальным признаком, который нужно закодировать
#     col_target: str, # название колонки, из которой берется значение целевой переменной
#     col_result: str, # название колонки, в которой будет сохранен результат

    # Метод fit
    def fit(self, data: pd.DataFrame, *args, **kwargs):
        # Implement me
        
        # Суть: в результате вызова fit encoder должен "запомнить" для каждой категории в col_categorical,
        # чему в среднем равно значение col_target и сколько таких объектов в выборке.
        
        y = None
        if self.col_target in data.columns:
            y = data[self.col_target]
        self.encoder_.fit(data[self.col_categorical], y=y)
        
        return self
    
    
    # Описание результата расчета
    def transform(self, data: pd.DataFrame, *args, **kwargs) -> pd.DataFrame:
        # implement me
        
        y = None
        if self.col_target in data.columns:
            y = data[self.col_target]
        data[self.col_result] = self.encoder_.transform(data[self.col_categorical], y=y)
        return data

In [13]:
# Требования:
# - Если колонка col_target есть в data (сценарий расчета обучающего датасета),
    # то для каждого элемента выборки (строки) вернуть среднее значение col_target в ее категории за исключением
    # самого объекта
# - Если колонки col_target в data нет (сценарий расчета модели для применения модели),
    # то для каждой строки вернуть среднее значение для ее категории, посчитанное на этапе обучения.
    # Обратите внимание, что описанная логика по большей части реализована в классе LeaveOneOutEncoder
    # из библиотеки category_encoders. Поэтому советуем хранить внутри объекта LOOMeanTargetEncoder
    # объект LeaveOneOutEncoder и использовать его возможности. Ниже есть пример расчета подобного признака.

In [14]:
# Обратите внимание:
# На самом деле реализуемые Вами объекты не обязательно должны быть классами. Вы можете реализовать функторы
# (объекты с операцией "вызов" ()), которые при вызове с описанными выше аргументами возвращает 
# соответствующий калкер или трансформацию
# Примером такого объекта является divide_col из лекции.

## Проверка

In [15]:
receipts_sample = dd.read_parquet('data/sample/receipts.parquet')
campaigns_sample = dd.read_csv('data/sample/campaigns.csv')
client_profile_sample = dd.read_csv('data/sample/client_profile.csv')
purchases_sample = dd.read_parquet('data/sample/purchases.parquet/')

products = dd.read_csv('data/products.csv')

In [16]:
TABLES = {
    'receipts': receipts_sample,
    'campaigns': campaigns_sample,
    'client_profile': client_profile_sample,
    'products': products,
    'purchases': purchases_sample,
}


class Engine:
    def __init__(self, tables=TABLES):
        self.tables = tables

    def register_table(self, table: dd.DataFrame, name: str) -> None:
        self.tables[name] = table

    def get_table(self, name: str) -> dd.DataFrame:
        return self.tables[name]


In [17]:
engine = Engine(tables=TABLES)

### Проверка DayOfWeekReceiptsCalcer

In [63]:
calcer = DayOfWeekReceiptsCalcer(
    engine=engine,
    date_to=datetime.date(2019, 3, 19),
    delta=15,
)

result = calcer.compute().compute()

result.head()

Unnamed: 0,client_id,purchases_count_dw0__15d,purchases_count_dw1__15d,purchases_count_dw2__15d,purchases_count_dw3__15d,purchases_count_dw4__15d,purchases_count_dw5__15d,purchases_count_dw6__15d
0,5368,0.0,1.0,0.0,0.0,2.0,0.0,1.0
1,11882,1.0,0.0,0.0,0.0,0.0,0.0,1.0
2,15640,0.0,0.0,0.0,0.0,3.0,0.0,1.0
3,17317,0.0,0.0,0.0,0.0,2.0,1.0,0.0
4,25169,1.0,0.0,1.0,1.0,0.0,0.0,0.0


### Проверка FavouriteStoreCalcer

In [19]:
calcer = FavouriteStoreCalcer(
    engine=engine,
    date_to=datetime.date(2019, 3, 19),
    delta=15,
)

result = calcer.compute().compute()

result.head()

Unnamed: 0,client_id,favourite_store_id__15d
0,5368,3395
1,11882,125
2,15640,11168
3,17317,2782
4,25169,2200


### Проверка ExpressionTransformer

In [26]:
transformer = ExpressionTransformer(expression="({d}['purchases_count_dw5__15d'] + {d}['purchases_count_dw6__15d'])",
                                    col_result='expression_result_column')

In [27]:
# result берем из проверки выше (проверка DayOfWeekReceiptsCalcer)
transformer.fit(result)

ExpressionTransformer(col_result='expression_result_column',
                      expression="({d}['purchases_count_dw5__15d'] + "
                                 "{d}['purchases_count_dw6__15d'])")

In [28]:
tmp = transformer.transform(result)
tmp

Unnamed: 0,client_id,purchases_count_dw0__15d,purchases_count_dw1__15d,purchases_count_dw2__15d,purchases_count_dw3__15d,purchases_count_dw4__15d,purchases_count_dw5__15d,purchases_count_dw6__15d,expression_result_column
0,5368,0,1,0,0,2,0,1,1
1,11882,1,0,0,0,0,0,1,1
2,15640,0,0,0,0,3,0,1,1
3,17317,0,0,0,0,2,1,0,1
4,25169,1,0,1,1,0,0,0,0
...,...,...,...,...,...,...,...,...,...
83,470521,1,0,0,0,0,0,1,1
84,472613,0,0,1,0,0,0,0,0
85,476607,0,0,1,0,1,0,0,0
86,481376,2,0,0,1,1,0,1,1


In [29]:
# Результат полностью соотвествует описанию.
# Но проверку не проходит из-за типа, скорее всего
type(transformer)

__main__.ExpressionTransformer

### Проверка LOOMeanTargetEncoder

In [30]:
transformer = LOOMeanTargetEncoder(col_categorical='store_id',
                                    col_target='purchase_sum',
                                    col_result='loomean_result_column')

In [31]:
test_df = engine.get_table('receipts').compute()
transformer.fit(test_df)

LOOMeanTargetEncoder(col_categorical='store_id',
                     col_result='loomean_result_column',
                     col_target='purchase_sum')

In [32]:
tmp = transformer.transform(test_df)
tmp

Unnamed: 0_level_0,client_id,transaction_id,trn_sum_from_red,trn_sum_from_iss,regular_points_received,express_points_received,regular_points_spent,express_points_spent,purchase_sum,store_id,transaction_datetime,loomean_result_column
__null_dask_index__,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
0,5368,572976,0.0,427.0,2.1,0.0,0.0,0.0,427.00,2762,2018-11-27 15:52:50,417.645017
1,5368,1095593,0.0,438.0,2.1,0.0,0.0,0.0,437.00,3395,2019-02-23 10:22:23,1512.698000
2,5368,1843577,0.0,1890.0,14.7,0.0,0.0,0.0,1889.00,3395,2019-02-23 10:21:29,1367.498000
3,5368,2389159,0.0,3274.0,32.7,0.0,0.0,0.0,3273.00,3395,2018-12-15 10:41:28,1229.098000
4,5368,3766070,0.0,1585.0,11.7,0.0,0.0,0.0,1585.43,3395,2019-02-08 08:41:18,1397.855000
...,...,...,...,...,...,...,...,...,...,...,...,...
2088,498827,5447805,0.0,661.0,6.6,0.0,0.0,0.0,660.00,12994,2018-12-02 14:36:42,365.770000
2089,498827,6231315,0.0,315.0,1.5,0.0,0.0,0.0,315.00,12994,2019-03-12 13:59:50,452.020000
2090,498827,6494029,0.0,208.0,1.0,0.0,0.0,0.0,207.13,4976,2018-12-11 14:58:56,213.478000
2091,498827,7411251,0.0,238.0,1.1,0.0,0.0,0.0,238.00,4976,2019-01-24 14:34:53,207.304000
