In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from collections import Counter
import random
import json
import mmh3
import polars as pl
import tqdm
from datetime import timedelta

from utils import create_features, recall_at, fit_lgb_ranker

In [3]:
MIN_FREQ   = 100
HASH_SPACE = 2 ** 24
SEED_1, SEED_2 = 42, 4242 

In [4]:
df_clickstream = pl.read_parquet(f'clickstream.pq')
df_event = pl.read_parquet(f'events.pq')
df_cat = pl.read_parquet(f'cat_features.pq')

In [5]:
def compute_freq_dict(cat_features: pl.DataFrame) -> dict[str, int]:
    cnt = Counter()
    for clean_params in cat_features["clean_params"]:
        pairs = json.loads(clean_params) if isinstance(clean_params, str) else clean_params
        for pair in pairs:
            token = f"{pair['attr']}_{pair['value']}"
            cnt[token] += 1
    return dict(cnt)

In [6]:
MIN_FREQ = 100 
HASH_SPACE = 2 ** 26
SEED = 42

def encode_from_polars(cat_features: pl.DataFrame,
                       freq_dict: dict[int, int],
                       min_freq: int = MIN_FREQ,
                       hash_space: int = HASH_SPACE,
                       seed: int = SEED) -> pl.DataFrame:
    def _encode(cell):
        # если у вас строка — распарсим, иначе предполагаем list[dict]
        pairs = json.loads(cell) if isinstance(cell, str) else cell
        feats = []
        for pair in pairs:
            token = f"{pair['attr']}_{pair['value']}"
            if freq_dict.get(token, 0) < min_freq:
                token = f"{pair['attr']}_<rare>"
            idx = mmh3.hash(token, seed=seed, signed=False) % hash_space
            feats.append(idx)
        return feats

    all_feats: List[List[int]] = [
        _encode(cell) for cell in cat_features["clean_params"]
    ]

    return cat_features.with_columns(
        pl.Series("feature_indices", all_feats)
    )

In [7]:
freq = compute_freq_dict(df_cat)
df_cat = encode_from_polars(df_cat, freq)

In [9]:
df_cat

item,location,category,clean_params,node,feature_indices
i64,i64,i64,str,u32,list[i64]
9,8385,57,"""[{""attr"":1157,""value"":664427},…",194747,"[28843275, 33056286, … 50379062]"
17,2707,35,"""[{""attr"":2140,""value"":501466},…",352905,"[27664982, 61839137, … 34977399]"
144,8383,8,"""[{""attr"":802,""value"":35791},{""…",17188,"[42572211, 21570473, … 16526367]"
202,5397,57,"""[{""attr"":1157,""value"":490527},…",194766,"[28287716, 33056286, … 8568387]"
236,2105,64,"""[{""attr"":112,""value"":420797},{…",153951,"[58011343, 36285982, … 9340551]"
…,…,…,…,…,…
28804461,24,35,"""[{""attr"":2140,""value"":364348},…",326792,"[22119012, 61839137, … 56057636]"
28804502,2305,51,"""[{""attr"":4622,""value"":171723},…",401208,"[53453426, 32017410, … 32138271]"
28804563,2348,0,"""[{""attr"":914,""value"":93691},{""…",13974,"[18365451, 47222697, … 45025104]"
28804609,2348,51,"""[{""attr"":4622,""value"":618809},…",258971,"[15682499, 63491461, … 36718239]"


In [10]:
cut = df_clickstream["event_date"].max() - timedelta(days=28)
df_train = (df_clickstream
            .filter(pl.col("event_date") <= cut)
            .group_by(["item","cookie"])
            .agg([])
            .join(df_cat.select(["item","node","feature_indices"]), on="item"))

In [11]:
def _build_emb(df: pl.DataFrame, key: str) -> pl.DataFrame:
    base = (df.select([key, "feature_indices"])
              .explode("feature_indices")
              .group_by([key,"feature_indices"])
              .agg(pl.count().alias("tf")))
    totals = df.group_by(key).agg(pl.count().alias("n_items"))
    return (base.join(totals, on=key)
                 .with_columns((pl.col("tf")/pl.col("n_items")).alias("val"))
                 .select([key,"feature_indices","val"]))

cookie_emb = _build_emb(df_train, "cookie")
node_emb   = _build_emb(df_cat, "node")

  .agg(pl.count().alias("tf")))
  totals = df.group_by(key).agg(pl.count().alias("n_items"))


In [12]:
cookie_norm = cookie_emb.group_by("cookie").agg(
    (pl.col("val").pow(2).sum().sqrt()).alias("norm")
)
node_norm = node_emb.group_by("node").agg(
    (pl.col("val").pow(2).sum().sqrt()).alias("norm")
)

In [13]:
node_by_feat = (node_emb
                .group_by("feature_indices")
                .agg(pl.struct(["node","val"]).alias("blob"))
                .lazy())

def score_batch(batch):
    return (cookie_emb.lazy()
            .filter(pl.col("cookie").is_in(batch))
            .join(node_by_feat, on="feature_indices")
            .explode("blob")
            .select([
                 "cookie",
                 pl.col("blob").struct.field("node").alias("node"),
                 (pl.col("val") * pl.col("blob").struct.field("val")).alias("prod")
            ])
            .group_by(["cookie","node"]).agg(pl.sum("prod").alias("dot"))
            .join(cookie_norm.lazy(), on="cookie")
            .join(node_norm.lazy(),   on="node")
            .with_columns((pl.col("dot")/(pl.col("norm")*pl.col("norm_right"))).alias("score"))
            .sort(["cookie","score"], descending=True)
            .group_by("cookie")
            .agg(pl.col("node").head(300), pl.col("score").head(300))
            .explode(["node","score"])
            .collect(streaming=True))

In [19]:
cookies = cookie_emb.select("cookie").unique().to_series()
BATCH = 3_000
recommendations = pl.concat([score_batch(cookies[i:i+BATCH]) for i in range(0,len(cookies),BATCH)])

  return (cookie_emb.lazy()

More information on the new streaming engine: https://github.com/pola-rs/polars/issues/20947
  return (cookie_emb.lazy()


In [20]:
recommendations

cookie,node,score
i64,u32,f64
149975,232276,0.889045
149975,232262,0.878304
149975,234870,0.878121
149975,232260,0.876172
149975,241906,0.867977
…,…,…
128,152118,0.577923
128,151950,0.577413
128,152671,0.576844
128,152096,0.575274


In [21]:
recommendations.write_parquet('retrieval_data/top300_tag_cosine1_28d.pq')

In [22]:
df_eval = df_clickstream.filter(df_clickstream['event_date']> cut)[['cookie', 'node', 'event']]
df_eval = df_eval.join(df_train, on=['cookie', 'node'], how='anti')
df_eval = df_eval.filter(
    pl.col('event').is_in(
        df_event.filter(pl.col('is_contact')==1)['event'].unique()
    )
)
df_eval = df_eval.filter(
        pl.col('cookie').is_in(df_train['cookie'].unique())
    ).filter(
        pl.col('node').is_in(df_train['node'].unique())
    )
df_eval = df_eval.unique(['cookie', 'node'])

In [23]:
recall_at(df_eval, recommendations, k=300)

0.09225898865980968

In [24]:
node_by_feat.collect().write_parquet('retrieval_data/tag_cosine_node_28d.pq')
cookie_emb.write_parquet('retrieval_data/tag_cosine_cookie_28d.pq')