# Кластеризация пользователей по фичам (UMAP + KMeans)

**Бизнес-цель.** Сжимаем поведение и сезонные сигналы пользователя в низкоразмерное представление и разбиваем на кластеры, чтобы:
- понимать разные паттерны трат/сезонности и маппить их на продуктовые предложения;
- увязывать кластеры с `socdem_cluster` и приоритизировать промо для близких групп;
- находить «похожие» аудитории для апселла/кросс-сейла.


## Описание фичей (что и зачем)
- **daily_amount / high_spend_share** — интенсивность расходов; помогает выделить клиентов с высоким LTV и потребностью в кредитных линиях.
- **saver_share / saver_share_30d** — стабильные «накопители», интересны депозиты, сберсчета, инвестиции.
- **auto_day_share / home_day_share** — тематические паттерны (страхование авто/жилья, ремонт, овердрафт на ремонт).
- **seasonal shares (pre_new_year, gifts_q1, back_to_school, summer, salary_window, social_benefits_window)** — сезонные пики, когда актуальны кредиты/кредитки/страховки/кэшбэк.
- **unique_categories_mean / top_category** — разнообразие и основная категория, сигнал на кэшбэк или спец.программы.
- **weekend_share** — кто тратит по выходным, релевантно travel/развлечения.
- **Продуктовые таргеты (target_*)** — вероятностные признаки релевантности типов продуктов, используем как «полуметки» для подбора офферов внутри кластера.


In [None]:
from pathlib import Path
import json
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.cluster import KMeans
from sklearn.neighbors import LocalOutlierFactor
import umap
import seaborn as sns
import matplotlib.pyplot as plt

pd.set_option('display.max_columns', 120)
pd.set_option('display.width', 160)


In [None]:
# Пути и параметры (оставляем как есть, только управляем через словари)
PROJECT_ROOT = Path.cwd()
candidate_event_dirs = [
    PROJECT_ROOT / 'data/marketplace/events',
    PROJECT_ROOT / 'data/events',
    PROJECT_ROOT / 'data copy/marketplace/events',
]
EVENTS_DIR = next((p for p in candidate_event_dirs if p.exists()), candidate_event_dirs[0])
USERS_PATH = PROJECT_ROOT / 'data/users.pq'
PRODUCTS_PATH = PROJECT_ROOT / 'psb_products_updated.json'

FEATURE_PARAMS = {
    'anchor_date': pd.Timestamp('2023-01-01'),
    'n_files': 2,
    'high_spend_quantile': 0.75,
    'saver_window': 30,
    'saver_min_periods': 10,
    'saver_threshold': 0.8,
    'salary_window_start_day': 25,
    'salary_window_end_day': 5,
    'social_benefits_start_day': 10,
    'social_benefits_end_day': 20,
    'pre_ny_start_day': 15,
    'gifts_q1_end_day': 8,
    'back_to_school_start': (8, 15),
    'back_to_school_end': (9, 15),
    'summer_months': [6, 7, 8],
}

CLUSTER_PARAMS = {
    'n_clusters': 5,  # по умолчанию 5, можно поднять/понизить при интерпретации
    'umap_n_neighbors': 25,
    'umap_min_dist': 0.1,
    'umap_random_state': 42,
    'lof_neighbors': 25,  # для удаления шумовых точек
    'lof_contamination': 0.02,  # доля выбросов (регулируйте при необходимости)
}

print('Используем каталог событий:', EVENTS_DIR)
print('Каталог существует:', EVENTS_DIR.exists())


In [None]:
def choose_first_available(df: pd.DataFrame, candidates):
    for col in candidates:
        if col in df.columns:
            return col
    return None


In [None]:
def detect_keyword_flag(df: pd.DataFrame, columns, keywords) -> pd.Series:
    cols = [c for c in columns if c and c in df.columns]
    if not cols:
        return pd.Series(False, index=df.index)
    text = df[cols[0]].fillna('').astype(str)
    for col in cols[1:]:
        text = text.str.cat(' ' + df[col].fillna('').astype(str))
    text = text.str.lower()
    mask = pd.Series(False, index=df.index)
    for kw in keywords:
        mask = mask | text.str.contains(kw)
    return mask


In [None]:
def build_calendar_flags(dates: pd.Series, cfg: dict) -> pd.DataFrame:
    d = pd.to_datetime(dates)
    day = d.dt.day
    month = d.dt.month
    flags = pd.DataFrame(index=dates.index)
    flags['is_pre_new_year'] = (month == 12) & (day >= cfg['pre_ny_start_day'])
    flags['is_gifts_q1'] = (month == 2) | ((month == 3) & (day <= cfg['gifts_q1_end_day']))
    start_m, start_d = cfg['back_to_school_start']
    end_m, end_d = cfg['back_to_school_end']
    flags['is_back_to_school'] = ((month == start_m) & (day >= start_d)) | ((month == end_m) & (day <= end_d))
    flags['is_summer'] = month.isin(cfg['summer_months'])
    flags['is_salary_window'] = (day >= cfg['salary_window_start_day']) | (day <= cfg['salary_window_end_day'])
    flags['is_social_benefits_window'] = (day >= cfg['social_benefits_start_day']) & (day <= cfg['social_benefits_end_day'])
    return flags


In [None]:
def standardize_event_time(df: pd.DataFrame, anchor_date: pd.Timestamp) -> pd.DataFrame:
    # timedelta -> anchor_date + delta; datetime -> как есть
    df = df.copy()
    if df.empty:
        df['event_dt'] = pd.NaT
        df['date'] = pd.NaT
        return df
    time_col = choose_first_available(df, ['event_time', 'timestamp', 'time', 'ts'])
    if time_col is None:
        raise ValueError('Не нашли колонку со временем события')
    raw = df[time_col]
    if np.issubdtype(raw.dtype, np.timedelta64):
        event_dt = anchor_date + pd.to_timedelta(raw)
    else:
        event_dt = pd.to_datetime(raw, errors='coerce', utc=True)
        try:
            event_dt = event_dt.dt.tz_convert(None)
        except TypeError:
            pass
    df['event_dt'] = event_dt
    df['date'] = df['event_dt'].dt.floor('D')
    return df


## Загрузка и предобработка событий
- Фильтр `action_type != view` — оставляем действия, убираем шум просмотров.
- Нормализация времени (timedelta -> anchor_date).
- Сумма/категория, тематические флаги.


In [None]:
if EVENTS_DIR.exists():
    event_files = sorted(EVENTS_DIR.glob('*.pq'))
else:
    event_files = []
print(f'Файлов с событиями: {len(event_files)}')
print('Примеры файлов:', [f.name for f in event_files[:3]])


In [None]:
if event_files:
    n_files = FEATURE_PARAMS['n_files']
    use_files = event_files if n_files is None else event_files[:n_files]
    raw_frames = [pd.read_parquet(f) for f in use_files]
    raw_events = pd.concat(raw_frames, ignore_index=True)
else:
    use_files = []
    raw_events = pd.DataFrame()
print(f'Загружено {len(raw_events):,} строк из {len(use_files)} файлов')


In [None]:
rows_before = len(raw_events)
if 'action_type' in raw_events.columns:
    events_filtered = raw_events[raw_events['action_type'].fillna('') != 'view'].copy()
else:
    events_filtered = raw_events.copy()
rows_after = len(events_filtered)
print(f'Строк до: {rows_before:,} / после удаления view: {rows_after:,}')


In [None]:
events = standardize_event_time(events_filtered, FEATURE_PARAMS['anchor_date'])
if events.empty:
    print('Нет событий после фильтрации')
else:
    events = events.dropna(subset=['event_dt'])
    events['user_id'] = pd.to_numeric(events['user_id'], errors='coerce').astype('Int64')
    events = events.dropna(subset=['user_id'])
    events['user_id'] = events['user_id'].astype(int)
print('Событий после нормализации:', events.shape)


In [None]:
amount_col = choose_first_available(events, ['price', 'amount', 'sum', 'value'])
if amount_col:
    events['amount'] = pd.to_numeric(events[amount_col], errors='coerce').fillna(0.0)
else:
    events['amount'] = 0.0
category_col = choose_first_available(events, ['category', 'category_id', 'subdomain', 'domain', 'brand_id'])
if category_col:
    events[category_col] = events[category_col].astype(str)
print('Колонка суммы:', amount_col)
print('Колонка категории:', category_col)


In [None]:
category_candidates = [category_col, 'brand_id', 'domain', 'subdomain', 'item_id', 'action_type']
auto_keywords = ['auto', 'car', 'fuel', 'gas', 'azs', 'sto', 'parking', 'tire', 'taxi']
home_keywords = ['home', 'repair', 'remont', 'stroi', 'furniture', 'kitchen', 'flat', 'rent', 'mortgage', 'paint']

events['auto_related'] = detect_keyword_flag(events, category_candidates, auto_keywords)
events['home_related'] = detect_keyword_flag(events, category_candidates, home_keywords)
print('auto_related mean:', events['auto_related'].mean() if len(events) else 0)
print('home_related mean:', events['home_related'].mean() if len(events) else 0)


## Дневные фичи и сезонные окна
- Сумма/частота за день, разнообразие категорий
- Самая значимая категория дня
- Авто/ремонт активность в день
- High-spend дни и «накопители»
- Сезонность (предНГ, подарки, школа, лето, зарплаты, соцвыплаты)


In [None]:
agg_dict = {
    'daily_amount': ('amount', 'sum'),
    'daily_events': ('event_dt', 'size'),
}
if category_col:
    agg_dict['unique_categories'] = (category_col, 'nunique')

daily = events.groupby(['user_id', 'date']).agg(**agg_dict).reset_index() if len(events) else pd.DataFrame()
if len(daily):
    daily = daily.sort_values(['user_id', 'date'])
print('daily shape:', daily.shape)


In [None]:
if len(daily) and category_col:
    cat_daily = (
        events
        .groupby(['user_id', 'date', category_col])
        .agg(category_amount=('amount', 'sum'), category_events=('amount', 'size'))
        .reset_index()
    )
    top_cat = (
        cat_daily
        .sort_values(['category_amount', 'category_events'], ascending=False)
        .groupby(['user_id', 'date'])
        .head(1)
        .rename(columns={category_col: 'top_category'})
    )
    daily = daily.merge(top_cat[['user_id', 'date', 'top_category', 'category_amount', 'category_events']], on=['user_id', 'date'], how='left')


In [None]:
for source_col, target_col in [('auto_related', 'is_auto_active'), ('home_related', 'is_home_repair_period')]:
    if source_col in events.columns and len(events):
        flag = events.groupby(['user_id', 'date'])[source_col].any().reset_index().rename(columns={source_col: target_col})
        daily = daily.merge(flag, on=['user_id', 'date'], how='left') if len(daily) else flag
    else:
        daily[target_col] = False
for col in ['is_auto_active', 'is_home_repair_period']:
    if col in daily.columns:
        daily[col] = daily[col].fillna(False)


In [None]:
if len(daily):
    q = FEATURE_PARAMS['high_spend_quantile']
    p_user = daily.groupby('user_id')['daily_amount'].transform(lambda s: s.quantile(q))
    daily['is_high_spend_day'] = daily['daily_amount'] >= p_user

    low_spend_flag = daily['daily_amount'] <= daily.groupby('user_id')['daily_amount'].transform('median')
    win = FEATURE_PARAMS['saver_window']
    min_p = FEATURE_PARAMS['saver_min_periods']
    daily['saver_share_30d'] = low_spend_flag.groupby(daily['user_id']).transform(lambda s: s.rolling(win, min_periods=min_p).mean())
    daily['is_saver'] = daily['saver_share_30d'] >= FEATURE_PARAMS['saver_threshold']
else:
    daily['is_high_spend_day'] = []
    daily['saver_share_30d'] = []
    daily['is_saver'] = []


In [None]:
if len(daily):
    calendar_flags = build_calendar_flags(daily['date'], FEATURE_PARAMS)
    daily = pd.concat([daily, calendar_flags], axis=1)
    daily['month'] = pd.to_datetime(daily['date']).dt.month
    daily['dayofweek'] = pd.to_datetime(daily['date']).dt.dayofweek
    daily['is_weekend'] = daily['dayofweek'] >= 5
    daily['weekofyear'] = pd.to_datetime(daily['date']).dt.isocalendar().week.astype(int)
else:
    daily['month'] = []
    daily['dayofweek'] = []
    daily['is_weekend'] = []
    daily['weekofyear'] = []
print(daily.head())


## Пользовательские фичи (агрегация по user_id)
Копим поведение в стационарные метрики для кластеризации и UMAP.


In [None]:
if len(daily):
    user_feats = daily.groupby('user_id').agg(
        days=('date', 'nunique'),
        total_amount=('daily_amount', 'sum'),
        mean_amount=('daily_amount', 'mean'),
        median_amount=('daily_amount', 'median'),
        max_amount=('daily_amount', 'max'),
        mean_events=('daily_events', 'mean'),
        unique_categories_mean=('unique_categories', 'mean'),
        auto_day_share=('is_auto_active', 'mean'),
        home_day_share=('is_home_repair_period', 'mean'),
        high_spend_share=('is_high_spend_day', 'mean'),
        saver_share=('is_saver', 'mean'),
        weekend_share=('is_weekend', 'mean'),
        pre_ny_share=('is_pre_new_year', 'mean'),
        gifts_q1_share=('is_gifts_q1', 'mean'),
        bts_share=('is_back_to_school', 'mean'),
        summer_share=('is_summer', 'mean'),
        salary_window_share=('is_salary_window', 'mean'),
        social_benefits_share=('is_social_benefits_window', 'mean'),
    ).reset_index()
else:
    user_feats = pd.DataFrame(columns=['user_id'])
print('user_feats shape:', user_feats.shape)


## Продуктовые таргеты + демография
Используем сигнальные фичи как мягкие таргеты типов продуктов, склеиваем с socdem_cluster/region.


In [None]:
def apply_feature_product_mapping(df: pd.DataFrame, feature_to_types: dict, product_types: list) -> pd.DataFrame:
    targets = pd.DataFrame(index=df.index)
    for pt in product_types:
        targets[f'target_{pt}'] = False
    for feat, pts in feature_to_types.items():
        if feat not in df.columns:
            continue
        for pt in pts:
            if pt in product_types:
                targets[f'target_{pt}'] = targets[f'target_{pt}'] | df[feat].fillna(False)
    target_cols = [f'target_{pt}' for pt in product_types]
    targets['candidate_product_types'] = targets[target_cols].apply(lambda r: [pt for pt, flag in zip(product_types, r) if flag], axis=1)
    return targets


In [None]:
with open(PRODUCTS_PATH, 'r') as f:
    products = json.load(f)
product_types = sorted({p['product_type'] for p in products})
feature_to_types = {
    'is_pre_new_year': ['loan', 'credit_card'],
    'is_gifts_q1': ['credit_card', 'debit_card', 'premium_service'],
    'is_back_to_school': ['credit_card', 'loan', 'savings_account', 'deposit'],
    'is_summer': ['insurance', 'debit_card'],
    'is_salary_window': ['debit_card', 'deposit'],
    'is_social_benefits_window': ['savings_account', 'deposit'],
    'is_high_spend_day': ['loan', 'credit_card'],
    'is_saver': ['deposit', 'savings_account', 'investment'],
    'is_auto_active': ['insurance', 'debit_card'],
    'is_home_repair_period': ['mortgage', 'loan', 'insurance'],
}

if len(daily):
    targets_daily = apply_feature_product_mapping(daily, feature_to_types, product_types)
    daily_with_targets = pd.concat([daily, targets_daily], axis=1)
    user_targets = daily_with_targets.groupby('user_id')[targets_daily.columns].mean().reset_index()
else:
    user_targets = pd.DataFrame(columns=['user_id'])
print('user_targets shape:', user_targets.shape)


In [None]:
users_df = pd.read_parquet(USERS_PATH) if USERS_PATH.exists() else pd.DataFrame(columns=['user_id'])
if not users_df.empty:
    users_df['socdem_cluster'] = users_df['socdem_cluster'].astype('Int64')

full = user_feats.merge(user_targets, on='user_id', how='left') if len(user_feats) else pd.DataFrame()
if not users_df.empty:
    full = full.merge(users_df, on='user_id', how='left')
print('full shape:', full.shape)
print(full.head())


## Матрица для кластеризации
- только числовые фичи
- fillna=0, StandardScaler
- PCA (check дисперсия)


In [None]:
feature_cols = [c for c in full.columns if c not in ['user_id', 'candidate_product_types'] and pd.api.types.is_numeric_dtype(full[c])]
X = full[feature_cols].copy()
X = X.fillna(0)
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X) if len(full) else np.empty((0, len(feature_cols)))

pca = PCA(n_components=min(10, len(feature_cols))) if len(feature_cols) else None
pca_res = pca.fit_transform(X_scaled) if pca else np.empty((0, 0))
if pca:
    explained = pca.explained_variance_ratio_.cumsum()
    print('PCA cumulative variance:', explained)


## Удаление шумов (LOF на исходных фичах)
- Local Outlier Factor выкидывает низкоплотные точки
- регулируйте `lof_contamination` и `lof_neighbors` при необходимости


In [None]:
if len(full):
    lof = LocalOutlierFactor(n_neighbors=CLUSTER_PARAMS['lof_neighbors'], contamination=CLUSTER_PARAMS['lof_contamination'])
    lof_labels = lof.fit_predict(X_scaled)
    mask_inliers = lof_labels == 1
    print(f'Inliers: {mask_inliers.sum()} / {len(mask_inliers)}')
    full = full.loc[mask_inliers].reset_index(drop=True)
    X_scaled = X_scaled[mask_inliers]
else:
    print('Нет данных для LOF')


## UMAP + KMeans
- UMAP: снижает размерность для визуальной интерпретации
- KMeans: выделяет группы для продуктовых стратегий


In [None]:
if len(full):
    reducer = umap.UMAP(
        n_neighbors=CLUSTER_PARAMS['umap_n_neighbors'],
        min_dist=CLUSTER_PARAMS['umap_min_dist'],
        random_state=CLUSTER_PARAMS['umap_random_state'],
        n_components=2,
    )
    embedding = reducer.fit_transform(X_scaled)
    kmeans = KMeans(n_clusters=CLUSTER_PARAMS['n_clusters'], n_init='auto', random_state=42)
    clusters = kmeans.fit_predict(embedding)

    full['cluster'] = clusters
    full['umap_x'] = embedding[:, 0]
    full['umap_y'] = embedding[:, 1]
    print(full[['user_id', 'cluster', 'umap_x', 'umap_y']].head())
else:
    full['cluster'] = []
    full['umap_x'] = []
    full['umap_y'] = []


## Визуализация UMAP
- Цвет — кластер
- Можно перекрасить по `socdem_cluster` для проверки соответствия демографии


In [None]:
if len(full):
    plt.figure(figsize=(8, 6))
    sns.scatterplot(data=full, x='umap_x', y='umap_y', hue='cluster', palette='tab10', s=10, linewidth=0)
    plt.title('UMAP проекция пользователей (по фичам)')
    plt.legend(title='cluster', bbox_to_anchor=(1.05, 1), loc='upper left')
    plt.tight_layout()


## Профили кластеров
Смотрим размер, средние фичи, соотношение socdem_cluster и продуктовых таргетов — чтобы понять, кому и что предлагать.


In [None]:
if len(full):
    cluster_sizes = full['cluster'].value_counts().sort_index().rename('count')
    print('Размеры кластеров:', cluster_sizes)

    feature_summary = full.groupby('cluster')[feature_cols].mean()
    print('Средние фичи по кластерам (первые 5 колонок):')
    print(feature_summary.iloc[:, :5])

    if 'socdem_cluster' in full.columns:
        socdem_summary = full.groupby('cluster')['socdem_cluster'].value_counts(normalize=True).rename('share')
        print('Распределение socdem_cluster внутри кластеров (share):')
        print(socdem_summary.head(20))

    target_cols = [c for c in full.columns if c.startswith('target_')]
    if target_cols:
        target_summary = full.groupby('cluster')[target_cols].mean()
        print('Средние продуктовые таргеты по кластерам:')
        print(target_summary)
