In [2]:
import numpy as np
import pandas as pd
import dask.dataframe as dd
import datetime
import functools

from typing import List, Union, Optional, Dict

from feature_impl import dask_groupby

In [3]:
import featurelib as fl

In [4]:
ls ../4_feature_lib/

[0m[01;34mdata[0m/           dataset_mini.csv  featurelib.py  [01;34m__pycache__[0m/  task.ipynb
data_config.py  feature_impl.py   L4.ipynb       submit.py     Untitled.ipynb


In [5]:
# receipts = dd.read_parquet('../3_metrics/data/receipts.parquet/').sample(0.01)

# campaigns = dd.read_csv('../3_metrics/data/campaigns.csv').sample(0.01)
# client_profile = dd.read_csv('../3_metrics/data/client_profile.csv').sample(0.01)
# products = dd.read_csv('../3_metrics/data/products.csv').sample(0.01)

# purchases = dd.read_parquet('../3_metrics/data/purchases.parquet/').sample(0.01)

In [6]:
receipts = dd.read_parquet('data/receipts.parquet/')

campaigns = dd.read_csv('data/campaigns.csv')
client_profile = dd.read_csv('data/client_profile.csv')
# products = dd.read_csv('data/products.csv')

purchases = dd.read_parquet('data/purchases.parquet/')

In [7]:
dataset_mini = pd.read_csv('dataset_mini.csv')

In [8]:
receipts.head(3)

Unnamed: 0,client_id,transaction_id,trn_sum_from_red,trn_sum_from_iss,regular_points_received,express_points_received,regular_points_spent,express_points_spent,purchase_sum,store_id,transaction_datetime
0,5368,572976,0.0,427.0,2.1,0.0,0.0,0.0,427.0,2762,2018-11-27 15:52:50
1,5368,1095593,0.0,438.0,2.1,0.0,0.0,0.0,437.0,3395,2019-02-23 10:22:23
2,5368,1843577,0.0,1890.0,14.7,0.0,0.0,0.0,1889.0,3395,2019-02-23 10:21:29


In [9]:
campaigns.head(3)

Unnamed: 0,client_id,treatment_flg,treatment_date,target_purchases_sum,target_purchases_count,target_campaign_points_spent
0,170057,0,2019-03-21,0.0,0,0.0
1,463303,0,2019-03-21,0.0,0,0.0
2,304317,0,2019-03-21,718.74,2,0.0


In [10]:
client_profile.head(3)

Unnamed: 0,client_id,first_issue_date,age,gender
0,36496,2017-12-18 09:18:35,65,F
1,267776,2018-04-01 17:10:00,35,U
2,350684,2018-05-27 18:58:11,38,U


In [11]:
# products.head(3)

In [11]:
purchases.head(3)

Unnamed: 0,client_id,transaction_id,transaction_datetime,regular_points_received,express_points_received,regular_points_spent,express_points_spent,purchase_sum,store_id,product_id,product_quantity,trn_sum_from_iss,trn_sum_from_red
0,170057,9797969,2019-01-29 15:28:44,6.0,0.0,0.0,0.0,608.68,13327,10810,1.0,5.0,
1,170057,9797969,2019-01-29 15:28:44,6.0,0.0,0.0,0.0,608.68,13327,1358,1.0,170.0,
2,170057,9797969,2019-01-29 15:28:44,6.0,0.0,0.0,0.0,608.68,13327,17306,1.0,93.0,


In [12]:
# purchases[purchases.client_id==162]

In [12]:
TABLES = {
    'receipts': receipts,
    'campaigns': campaigns,
    'client_profile': client_profile,
#     'products': products,
    'purchases': purchases,
}


# аналог connection в sql alchemy
class Engine:
    def __init__(self, tables=TABLES):
        self.tables = tables

    def register_table(self, table: dd.DataFrame, name: str) -> None:
        self.tables[name] = table

    def get_table(self, name: str) -> dd.DataFrame:
        return self.tables[name]


In [13]:
engine = Engine(tables=TABLES)

In [14]:
from abc import ABC, abstractmethod

class FeatureCalcer(ABC):
    name = '_base' # у класса должно быть имя по которому мы будем к нему обращаться
    keys = None # для какой сущности считаем признак (client_id)

    def __init__(self, engine: Engine): # engine для подключения к бд
        self.engine = engine

    @abstractmethod
    def compute(self):
        pass

class DateFeatureCalcer(FeatureCalcer):
    def __init__(self, date_to: datetime.date, **kwargs):
        self.date_to = date_to
        super().__init__(**kwargs)


# Сбор данных

Каждый калкер из задания нужно реализовать, отнаследовав его класс от класса FeatureCalcer (или DateFeatureCalcer).

## DayOfWeekReceiptsCalcer

In [442]:
class DayOfWeekReceiptsCalcer(fl.DateFeatureCalcer): # сколько уникальных категорий покупал клиеент за последние n дней (delta)
    name = 'day_of_week_receipts'
    keys = ['client_id']

    def __init__(self, delta: int, **kwargs):
        self.delta = delta
        super().__init__(**kwargs)

    def compute(self) -> dd.DataFrame:
        receipts = self.engine.get_table('receipts')
        
        date_to = datetime.datetime.combine(self.date_to, datetime.datetime.min.time())
        date_from = date_to - datetime.timedelta(days=self.delta)
        date_mask = (receipts['transaction_datetime'] >= date_from) & (receipts['transaction_datetime'] < date_to)

        receipts = receipts.loc[date_mask]
        
        receipts['transaction_dow'] = receipts['transaction_datetime'].dt.dayofweek
        
        receipts['ones'] = 1

        receipts = receipts.categorize(columns=['transaction_dow'])

        result = dd.pivot_table(receipts, index='client_id', values='ones',
                                    columns='transaction_dow', aggfunc='sum')

        result = result.fillna(0)
        
        for day in result.columns:
            result = result.rename(columns={day: f'purchases_count_dw{day}__{self.delta}d'})
            
#         result.columns = result.columns.astype(str)
        result = result.reset_index()
#         result.columns.name = None
        
        result.columns.name = 'day_of_week'

        return result

# register_calcer(UniqueCategoriesCalcer)

In [443]:
calcer = DayOfWeekReceiptsCalcer(
    engine=engine,
    delta=30,
    date_to=datetime.date(2019, 3, 19)
)

In [447]:
result = calcer.compute().compute() # первый compute вызывает dask df, второй pandas df

In [449]:
result.columns

Index(['client_id', 'receipts_count_dw5__30d', 'receipts_count_dw4__30d',
       'receipts_count_dw1__30d', 'receipts_count_dw6__30d',
       'receipts_count_dw0__30d', 'receipts_count_dw3__30d',
       'receipts_count_dw2__30d'],
      dtype='object', name='transaction_dow')

In [369]:
result.head()

Unnamed: 0,client_id,receipts_count_dw5__30d,receipts_count_dw4__30d,receipts_count_dw1__30d,receipts_count_dw6__30d,receipts_count_dw0__30d,receipts_count_dw3__30d,receipts_count_dw2__30d
0,5368,2.0,2.0,1.0,1.0,0.0,0.0,0.0
1,11882,0.0,1.0,1.0,3.0,1.0,0.0,0.0
2,15640,2.0,3.0,0.0,1.0,0.0,0.0,0.0
3,17317,1.0,2.0,0.0,0.0,1.0,1.0,0.0
4,25169,1.0,0.0,0.0,0.0,1.0,1.0,1.0


In [352]:
dataset_mini.columns

Index(['client_id', 'purchases_count_dw5__30d', 'purchases_count_dw4__30d',
       'purchases_count_dw1__30d', 'purchases_count_dw6__30d',
       'purchases_count_dw0__30d', 'purchases_count_dw3__30d',
       'purchases_count_dw2__30d', 'favourite_store_id__30d', 'age', 'gender',
       'treatment_flg', 'target_purchases_sum', 'target_purchases_count',
       'target_campaign_points_spent', 'weekend_purchases_ratio__30d',
       'target_profit', 'gender__mte__target_profit',
       'gender__mte__target_purchases_count'],
      dtype='object')

## FavouriteStoreCalcer

In [320]:
class FavouriteStoreCalcer(fl.DateFeatureCalcer): # сколько уникальных категорий покупал клиеент за последние n дней (delta)
    name = 'favourite_store'
    keys = ['client_id']

    def __init__(self, delta: int, **kwargs):
        self.delta = delta
        super().__init__(**kwargs)

    def compute(self) -> dd.DataFrame:
        receipts = self.engine.get_table('receipts')
        
        date_to = datetime.datetime.combine(self.date_to, datetime.datetime.min.time())
        date_from = date_to - datetime.timedelta(days=self.delta)
        date_mask = (receipts['transaction_datetime'] >= date_from) & (receipts['transaction_datetime'] < date_to)

        receipts = receipts.loc[date_mask]
        
        result = receipts.groupby(by=['client_id', 'store_id']).size().reset_index()
        
        result = result.map_partitions(lambda x: x.sort_values([0, 'store_id'], ascending=[False, False]))
        
        result = result.drop_duplicates('client_id', keep='first')
        
        result = result.rename(columns={'store_id': f"favourite_store_id__{self.delta}d"})
        
        result = result.drop(0, axis=1)
        
        return result

# register_calcer(UniqueCategoriesCalcer)

In [321]:
calcer = FavouriteStoreCalcer(
    engine=engine,
    delta=30,
    date_to=datetime.date(2019, 3, 19)
)

In [322]:
result = calcer.compute().compute() # первый compute вызывает dask df, второй pandas df

In [323]:
result.head()

Unnamed: 0,client_id,favourite_store_id__30d
38,147711,3791
25,97835,9198
67,219005,3945
129,409030,2921
27,100096,4795


# Преобразование данных

## ExpressionTransformer

In [17]:
import sklearn.base as skbase

In [460]:
class ExpressionTransformer(skbase.BaseEstimator, skbase.TransformerMixin):

    # expression: str # “регулярное” выражение для расчета признака. (пример см. ниже)
    # col_result: str, # название колонки, в которой будет сохранен результат
    
    def __init__(self, expression: str, col_result: str, **kwargs):
        self.expression = expression
        self.col_result = col_result
        super().__init__(**kwargs)
    
    # Метод fit (пустой). Ничего не делает. Возвращает сам объект.
    def fit(self, *args, **kwargs):
        return self

    # Описание результата расчета
    def transform(self, data: pd.DataFrame, *args, **kwargs) -> pd.DataFrame:
#         s = "({d}['purchases_count_dw5__60d'] + {d}['purchases_count_dw6__60d'])"
        s = self.expression.replace('{d}', 'data')
        s = s.replace('[', '.')
        s = s.replace(']', '')
        s = s.replace("'", "")
        data[self.col_result]  = pd.eval(s)
        
        return data

In [461]:
calcer = ExpressionTransformer(
    expression="({d}['a'] + {d}['b'])",
    col_result='aaa',
)

In [463]:
df = calcer.transform(df)
df

Unnamed: 0,a,b,aaa
0,1,3,4
1,2,4,6
2,3,5,8


In [450]:
df = pd.DataFrame({'a' : [1,2,3], 'b':[3,4,5]})
df1 = pd.DataFrame({'a' : [1,2,3], 'b':[3,4,5]})

In [190]:
s = "({d}['purchases_count_dw5__60d'] + {d}['purchases_count_dw6__60d'])"
s = s.replace('{d}', 'df')
s = s.replace('[', '.')
s = s.replace(']', '')
s = s.replace("'", "")
s

'(df.purchases_count_dw5__60d + df.purchases_count_dw6__60d)'

In [None]:
pd.eval("(df.a + df1.b)")
pd.eval(s)

## LOOMeanTargetEncoder

In [18]:
from category_encoders.leave_one_out import LeaveOneOutEncoder

In [27]:
class LOOMeanTargetEncoder(skbase.BaseEstimator, skbase.TransformerMixin):

    # expression: str # “регулярное” выражение для расчета признака. (пример см. ниже)
    # col_result: str, # название колонки, в которой будет сохранен результат
    
    def __init__(self, col_categorical: str, col_target: str, col_result: str, **kwargs):
        self.col_categorical = col_categorical
        self.col_target = col_target
        self.col_result = col_result
        super().__init__(**kwargs)
    
    def fit(self, data: pd.DataFrame, *args, **kwargs):
        enc = LeaveOneOutEncoder()
        X = data[self.col_categorical].astype(str)
        y = data[self.col_target]
        enc.fit(np.asarray(X), y)
            
        self.enc = enc
#         self.X = X
        
        return self

    # Описание результата расчета
    def transform(self, data: pd.DataFrame, *args, **kwargs) -> pd.DataFrame:
        
        if self.col_target in data.columns:
            X = data[self.col_categorical].astype(str)
            y = data[self.col_target]
            data[self.col_result] = self.enc.transform_leave_one_out(X, np.asarray(y))
            
        else:
            X = data[self.col_categorical].astype(str)
            data[self.col_result] = self.enc.transform(np.asarray(X))
        
        
        return data
        

In [29]:
dd1 = pd.DataFrame({"cat": np.random.randint(1,3,1000), 'num':np.random.rand(1000)})
dd2 = pd.DataFrame({"cat": np.random.randint(1,3,100), 'num':np.random.rand(100)})

In [30]:
# col_categorical='cat'
# col_target = 'num'
# col_result='num2'

# X = dd1[col_categorical].astype(str)
# y = dd1[col_target]
# dd1[col_result] = enc.transform_leave_one_out(np.asarray(X), y)

In [31]:
loom = LOOMeanTargetEncoder(col_categorical='cat', col_target = 'num', col_result='num2')

In [32]:
loom.fit(dd1)

LOOMeanTargetEncoder(col_categorical='cat', col_result='num2', col_target='num')

In [33]:
res = loom.transform(dd1)

AttributeError: 'NoneType' object has no attribute 'items'

In [695]:
res

Unnamed: 0,cat,num,num2
0,1,0.171551,0.515989
1,2,0.929280,0.501502
2,1,0.554943,0.515989
3,1,0.867280,0.515989
4,2,0.893848,0.501502
...,...,...,...
995,1,0.861329,0.515989
996,1,0.342397,0.515989
997,2,0.162296,0.501502
998,2,0.569823,0.501502


In [617]:
res[res.num2.notnull()]

Unnamed: 0,cat,num,num2
0,1,0.074696,0.469757
1,1,0.036798,0.469757
2,1,0.165753,0.478820
3,1,0.048545,0.478820
4,2,0.033016,0.478820
...,...,...,...
95,2,0.186370,0.469757
96,2,0.883939,0.469757
97,2,0.263076,0.469757
98,2,0.093126,0.478820


In [552]:
X = dd1['cat'].astype(str)
y = dd1['num']

In [554]:
enc = LeaveOneOutEncoder()
enc_data = enc.fit_transform(np.asanyarray(X), y)

In [556]:
enc_data

Unnamed: 0,0
0,0.480074
1,0.477055
2,0.479384
3,0.477538
4,0.480614
...,...
95,0.459666
96,0.465155
97,0.477663
98,0.485359


In [540]:
enc_data = enc.transform(np.asarray(X))
enc_data

Unnamed: 0,0
0,2
1,2
2,1
3,2
4,1
...,...
95,2
96,2
97,2
98,1


# config

In [None]:
import datetime


data_config = {
    'calcers': [
        {
            'name': 'day_of_week_receipts',
            'args': {'delta': 30, 'date_to': datetime.date(2019, 3, 19)}
        },
        {
            'name': 'favourite_store',
            'args': {'delta': 30, 'date_to': datetime.date(2019, 3, 19)}
        },
        {
            'name': 'age_gender',
            'args': {}
        },
        {
            'name': 'target_from_campaigns',
            'args': {'date_to': datetime.date(2019, 3, 21)}
        }
    ],
    'transforms': [
        {
            'name': 'expression',
            'args': {
                'expression': "({d}['purchases_count_dw5__30d'] + {d}['purchases_count_dw6__30d']) / ({d}['purchases_count_dw0__30d'] + {d}['purchases_count_dw1__30d'] + {d}['purchases_count_dw2__30d'] + {d}['purchases_count_dw3__30d'] + {d}['purchases_count_dw4__30d'] + {d}['purchases_count_dw5__30d'] + {d}['purchases_count_dw6__30d'])",
                'col_result': "weekend_purchases_ratio__30d"
            }
        },
        {
            'name': 'expression',
            'args': {
                'expression': "{d}['target_purchases_sum'] * 0.2 - {d}['target_campaign_points_spent'] * 0.1 - {d}['treatment_flg'] * 1.5",
                'col_result': "target_profit"
            }
        },
        {
            'name': 'loo_mean_target_encoder',
            'args': {
                'col_categorical': 'gender',
                'col_target': 'target_profit',
                'col_result': "gender__mte__target_profit"
            }
        },
        {
            'name': 'loo_mean_target_encoder',
            'args': {
                'col_categorical': 'gender',
                'col_target': 'target_purchases_count',
                'col_result': "gender__mte__target_purchases_count"
            }
        }
    ]
}