In [1]:
import pandas as pd
from gensim.models import Word2Vec
from pathlib import Path
import dask.dataframe as dd
import os
import pyarrow as pa

csv_file = Path("/home/users/belucci/downloads/UserBehavior.csv")

In [2]:
# dask_pipeline_alibaba.py


# ---------------- PARAMETERS ----------------
INPUT_CSV = csv_file # path to the Tianchi CSV
OUT_DIR = Path("/home/users/belucci/output_features")

os.makedirs(OUT_DIR, exist_ok=True)

# Feature & sampling params
TOP_K_CLASSES = 30
N_PER_CLASS = 5000  # number of samples per class in balanced output
ITEM_VEC_SIZE = 32
ITEM_MIN_COUNT = 5  # min_count for Word2Vec; set to 1 for very small prototyping
RANDOM_SEED = 42

# CSV schema
COLS = ["user_id", "item_id", "category_id", "behavior_type", "timestamp"]
DTYPES = {
    "user_id": "str",
    "item_id": "str",
    "category_id": "str",
    "behavior_type": "str",
    "timestamp": "int64",
}

# --------------- READ DATA -------------------
print("Reading CSV with Dask...")
ddf = dd.read_csv(
    INPUT_CSV, names=COLS, dtype=DTYPES, assume_missing=True
)  # assume_missing helps with integer nulls


# ---------------- LABEL ----------------------
print("Creating composite label column...")
ddf["label"] = ddf["category_id"].astype(str) + "_" + ddf["behavior_type"].astype(str)

# ---------------- TOP K LABELS ----------------
print(f"Selecting top {TOP_K_CLASSES} labels...")
# value_counts is lazy; compute to get top labels
top_labels = ddf["label"].value_counts().nlargest(TOP_K_CLASSES).index.compute()
top_labels = list(top_labels)
print("Top labels:", top_labels)

# Filter to top labels
ddf = ddf[ddf["label"].isin(top_labels)]

# ---------------- TEMPORAL FEATURES -----------
print("Creating temporal features...")
# Convert timestamp -> datetime parts. Use map_partitions for to_datetime
ddf["ts"] = dd.to_datetime(ddf["timestamp"], unit="s")
ddf["hour"] = ddf["ts"].dt.hour
ddf["dayofweek"] = ddf["ts"].dt.dayofweek


# ---------------- BINARY BEHAVIOR FLAGS -------
print("Creating behavior indicator columns...")
for b in ["pv", "cart", "fav", "buy"]:
    ddf[f"is_{b}"] = (ddf["behavior_type"] == b).astype("int8")

# save to avoid losing computation
ddf.to_parquet(os.path.join(OUT_DIR, "ddf.parquet"))

# load (if previously saved)
# ddf = dd.read_parquet(os.path.join(OUT_DIR, "ddf.parquet"))

Reading CSV with Dask...
Creating composite label column...
Selecting top 30 labels...
Top labels: ['4756105_pv', '2355072_pv', '4145813_pv', '3607361_pv', '982926_pv', '2520377_pv', '4801426_pv', '1320293_pv', '2465336_pv', '3002561_pv', '2735466_pv', '4181361_pv', '149192_pv', '1080785_pv', '2885642_pv', '4217906_pv', '154040_pv', '3738615_pv', '1879194_pv', '2640118_pv', '1464116_pv', '2520771_pv', '4357323_pv', '2920476_pv', '570735_pv', '4170419_pv', '2096639_pv', '1299190_pv', '4643350_pv', '2939262_pv']
Creating temporal features...
Creating behavior indicator columns...


ignoring exception in ensure_cleanup_on_exception
Traceback (most recent call last):
  File "/home/users/belucci/miniconda3/envs/gensim/lib/python3.11/site-packages/dask/dataframe/shuffle.py", line 226, in ensure_cleanup_on_exception
    yield
  File "/home/users/belucci/miniconda3/envs/gensim/lib/python3.11/site-packages/dask/dataframe/dask_expr/_shuffle.py", line 507, in _shuffle_group
    p.append(d, fsync=True)
  File "/home/users/belucci/miniconda3/envs/gensim/lib/python3.11/site-packages/partd/encode.py", line 25, in append
    self.partd.append(data, **kwargs)
  File "/home/users/belucci/miniconda3/envs/gensim/lib/python3.11/site-packages/partd/buffer.py", line 45, in append
    self.flush(keys)
  File "/home/users/belucci/miniconda3/envs/gensim/lib/python3.11/site-packages/partd/buffer.py", line 99, in flush
    self.slow.append(dict(zip(keys, self.fast.get(keys))))
  File "/home/users/belucci/miniconda3/envs/gensim/lib/python3.11/site-packages/partd/file.py", line 42, in appen

In [3]:

# # --------------- AGGREGATIONS -----------------
# print("Computing user / item / category aggregates (sums & rates)...")

# # User aggregates
# user_agg = ddf.groupby("user_id")[["is_pv", "is_cart", "is_fav", "is_buy"]].sum().rename(columns=lambda c: "user_" + c)
# # add buy_rate
# user_agg = user_agg.reset_index()
# user_agg["user_buy_rate"] = user_agg["user_is_buy"] / (user_agg["user_is_pv"] + 1)

# # Item aggregates
# item_agg = ddf.groupby("item_id")[["is_pv", "is_cart", "is_fav", "is_buy"]].sum().rename(columns=lambda c: "item_" + c)
# item_agg = item_agg.reset_index()
# item_agg["item_buy_rate"] = item_agg["item_is_buy"] / (item_agg["item_is_pv"] + 1)

# # Category aggregates
# cat_agg = (
#     ddf.groupby("category_id")[["is_pv", "is_cart", "is_fav", "is_buy"]].sum().rename(columns=lambda c: "cat_" + c)
# )
# cat_agg = cat_agg.reset_index()
# cat_agg["cat_buy_rate"] = cat_agg["cat_is_buy"] / (cat_agg["cat_is_pv"] + 1)

# # Persist aggregates to speed up later joins
# user_agg = user_agg.persist()
# item_agg = item_agg.persist()
# cat_agg = cat_agg.persist()
# print("Aggregates computed and persisted.")

# load (if previously saved)
user_agg = dd.read_parquet(os.path.join(OUT_DIR, "user_agg.parquet"))
item_agg = dd.read_parquet(os.path.join(OUT_DIR, "item_agg.parquet"))
cat_agg = dd.read_parquet(os.path.join(OUT_DIR, "cat_agg.parquet"))


FileNotFoundError: An error occurred while calling the read_parquet method registered to the pandas backend.
Original Message: /mnt/nfs/nrdata02-users-data/belucci/output_features/output_features/user_agg.parquet

In [12]:
# save to avoid losing computation
user_agg.to_parquet(os.path.join(OUT_DIR, "user_agg.parquet"), schema={"user_is_pv": pa.int32()})
item_agg.to_parquet(os.path.join(OUT_DIR, "item_agg.parquet"), schema={"item_is_pv": pa.int32()})
cat_agg.to_parquet(os.path.join(OUT_DIR, "cat_agg.parquet"), schema={"cat_is_pv": pa.int32()})

In [28]:
# ---------------- ITEM SEQUENCES ----------------
print("Building item sequences per user (this will be computed to train Word2Vec).")
# Create per-user list of item_id (as strings for gensim)
# WARNING: This `.apply(list)` will bring the grouped lists to pandas in memory when computed.
user_item_sequences = ddf.groupby("user_id")["item_id"].apply(
    lambda s: s.astype(str).tolist(), meta=("item_id", "object")
)
# compute sequences (pandas Series)
user_item_sequences = user_item_sequences.compute()
print("Number of users with sequences:", len(user_item_sequences))

# Option: sample sequences to train embeddings if dataset huge
# For large datasets, do: user_item_sequences = user_item_sequences.sample(frac=0.1, random_state=RANDOM_SEED)

sentences = user_item_sequences.tolist()  # list of lists of str


Building item sequences per user (this will be computed to train Word2Vec).
Number of users with sequences: 890390


In [30]:
# save sentences
user_item_sequences.to_csv(os.path.join(OUT_DIR, "user_item_sequences.csv"), header=True)

In [31]:

print("Training Word2Vec on", len(sentences), "sentences ...")
w2v = Word2Vec(
    sentences=sentences,
    vector_size=ITEM_VEC_SIZE,
    window=5,
    min_count=ITEM_MIN_COUNT,
    workers=max(1, os.cpu_count() - 1),
    sg=1,
    seed=RANDOM_SEED,
)
print("Word2Vec trained. Vocabulary size:", len(w2v.wv))

# Build item embedding dataframe (pandas) -> convert to dask later
print("Building item embedding DataFrame...")
item_vecs = {}
for token in w2v.wv.index_to_key:
    # tokens are strings of item_id
    item_vecs[token] = w2v.wv[token]

item_vec_df = pd.DataFrame.from_dict(item_vecs, orient="index")
item_vec_df.index.name = "item_id"
item_vec_df.columns = [f"item_emb_{i}" for i in range(item_vec_df.shape[1])]
# Convert to dask df
item_vec_ddf = dd.from_pandas(item_vec_df.reset_index(), npartitions=64).set_index("item_id")
print("Item embedding df shape:", item_vec_df.shape)

# save to avoid losing computation
item_vec_ddf.to_parquet(os.path.join(OUT_DIR, "item_vectors.parquet"))

# load (if previously saved)
# item_vec_ddf = dd.read_parquet(os.path.join(OUT_DIR, "item_vectors.parquet")) 

Training Word2Vec on 890390 sentences ...
Word2Vec trained. Vocabulary size: 500033
Building item embedding DataFrame...
Item embedding df shape: (500033, 32)


In [34]:

# ---------------- MERGE FEATURES ----------------
print("Merging features into event-level table (joins)...")
# Keep only necessary columns from original data for final join
cols_keep = [
    "user_id",
    "item_id",
    "category_id",
    "behavior_type",
    "ts",
    "hour",
    "dayofweek",
    "label",
    "is_pv",
    "is_cart",
    "is_fav",
    "is_buy",
]
events = ddf[cols_keep]

# join user_agg (on user_id)
events = events.merge(user_agg, on="user_id", how="left")
# join item_agg (on item_id)
events = events.merge(item_agg, on="item_id", how="left")
# join category agg
events = events.merge(cat_agg, on="category_id", how="left")
# join embeddings: item_vec_ddf has index item_id, so join by index
events = events.merge(item_vec_ddf, left_on="item_id", right_index=True, how="left")

# Persist the merged events
events = events.persist()
print("Merged events persisted. Columns:", events.columns)


Merging features into event-level table (joins)...
Merged events persisted. Columns: Index(['user_id', 'item_id', 'category_id', 'behavior_type', 'ts', 'hour',
       'dayofweek', 'label', 'is_pv', 'is_cart', 'is_fav', 'is_buy',
       'user_is_pv', 'user_is_cart', 'user_is_fav', 'user_is_buy',
       'user_buy_rate', 'item_is_pv', 'item_is_cart', 'item_is_fav',
       'item_is_buy', 'item_buy_rate', 'cat_is_pv', 'cat_is_cart',
       'cat_is_fav', 'cat_is_buy', 'cat_buy_rate', 'item_emb_0', 'item_emb_1',
       'item_emb_2', 'item_emb_3', 'item_emb_4', 'item_emb_5', 'item_emb_6',
       'item_emb_7', 'item_emb_8', 'item_emb_9', 'item_emb_10', 'item_emb_11',
       'item_emb_12', 'item_emb_13', 'item_emb_14', 'item_emb_15',
       'item_emb_16', 'item_emb_17', 'item_emb_18', 'item_emb_19',
       'item_emb_20', 'item_emb_21', 'item_emb_22', 'item_emb_23',
       'item_emb_24', 'item_emb_25', 'item_emb_26', 'item_emb_27',
       'item_emb_28', 'item_emb_29', 'item_emb_30', 'item_emb_31'

In [37]:
# save full events
events.to_parquet(os.path.join(OUT_DIR, "events.parquet"), schema={"user_is_pv": pa.int32(), "item_is_pv": pa.int32(), "cat_is_pv": pa.int32()})

In [5]:
# load full events
events = dd.read_parquet(os.path.join(OUT_DIR, "events.parquet"))

In [11]:
N_PER_CLASS = 30*100000
# ---------------- BALANCED SAMPLING ----------------
print(f"Sampling {N_PER_CLASS} examples per label to build balanced multi-class dataset ...")


# helper: groupby-apply sampler
def sample_per_group(df_part, n_per_label):
    # df_part is a pandas DataFrame partition
    # We'll sample per label inside the partition; to guarantee exact n_per_label globally we use groupby.apply at dask level.
    return (
        df_part.groupby("label")
        .apply(lambda g: g.sample(n=min(len(g), n_per_label), random_state=RANDOM_SEED))
        .reset_index(drop=True)
    )


# Use dask groupby-apply to sample per label.
# Note: groupby.apply can be expensive; an alternative is to compute label counts, compute per-label fraction,
# and sample by fraction across partitions. Here we do groupby.apply for correctness.
meta = events._meta
sampled = (
    events.groupby("label")
    .apply(lambda g: g.sample(n=min(len(g), N_PER_CLASS), random_state=RANDOM_SEED), meta=meta)
    .reset_index(drop=True)
)

# Compute the sampled dataframe
sampled_df = sampled.compute()
print("Sampled shape:", sampled_df.shape)
print("Class counts:\n", sampled_df["label"].value_counts())

Sampling 3000000 examples per label to build balanced multi-class dataset ...
Sampled shape: (38458747, 59)
Class counts:
 label
2355072_pv    3000000
4145813_pv    3000000
4756105_pv    3000000
3607361_pv    2977555
982926_pv     2800011
2520377_pv    2030255
4801426_pv    1865520
1320293_pv    1794066
2465336_pv    1505542
3002561_pv    1422563
2735466_pv    1116344
4181361_pv    1001627
149192_pv      991154
1080785_pv     960260
2885642_pv     955203
4217906_pv     925363
154040_pv      906449
3738615_pv     829421
1879194_pv     759694
2640118_pv     730140
1464116_pv     684162
2520771_pv     671771
4357323_pv     669466
2920476_pv     631975
570735_pv      606475
4170419_pv     580914
2096639_pv     533509
1299190_pv     516331
4643350_pv     506204
2939262_pv     486773
Name: count, dtype: int64[pyarrow]


In [12]:
sampled_df.head(5)

Unnamed: 0,user_id,item_id,category_id,behavior_type,ts,hour,dayofweek,label,is_pv,is_cart,...,item_emb_22,item_emb_23,item_emb_24,item_emb_25,item_emb_26,item_emb_27,item_emb_28,item_emb_29,item_emb_30,item_emb_31
0,330942,2331370,3607361,pv,2017-11-27 07:31:39,7,0,3607361_pv,1,0,...,-0.760833,-0.720187,1.15889,-0.323915,0.237748,0.281913,0.387161,0.282125,-0.577759,0.1632
1,730430,2039893,3607361,pv,2017-12-03 06:59:55,6,6,3607361_pv,1,0,...,-0.802876,-0.066263,0.826542,-0.416,0.484296,-1.128085,0.310752,0.59979,-0.394224,1.11055
2,460575,3623981,3607361,pv,2017-11-27 12:57:05,12,0,3607361_pv,1,0,...,-0.240308,-1.094818,1.480657,-0.016926,0.255735,0.408532,0.163798,0.989531,-0.177817,0.415017
3,956323,1362545,3607361,pv,2017-12-03 06:38:28,6,6,3607361_pv,1,0,...,,,,,,,,,,
4,483536,1028640,3607361,pv,2017-11-30 10:43:35,10,3,3607361_pv,1,0,...,-0.922799,-0.776164,1.262024,-0.840322,1.108268,-0.290338,0.119393,0.722909,-0.412245,1.127228


In [13]:

# ---------------- FINAL CLEANUP & SAVE ----------------
# Optional: fill NaNs for embeddings with zeros
emb_cols = [c for c in sampled_df.columns if c.startswith("item_emb_")]
sampled_df[emb_cols] = sampled_df[emb_cols].fillna(0.0)

# Select final feature columns for clustering:
# temporal, user/item/cat aggregates, embedding columns
final_cols = (
    ["user_id", "item_id", "category_id", "label", "behavior_type", "hour", "dayofweek"]
	+ [c for c in sampled_df.columns if c.startswith("user_is")]
	+ [c for c in sampled_df.columns if c.startswith("item_is")]
	+ [c for c in sampled_df.columns if c.startswith("cat_is")]
    + [c for c in sampled_df.columns if c.endswith("_rate")]
    + emb_cols
)

final_df = sampled_df[final_cols].reset_index(drop=True)


In [19]:
final_df.to_parquet(
    os.path.join(OUT_DIR, "final_features.parquet"),
)

In [20]:
final_df.head()

Unnamed: 0,user_id,item_id,category_id,label,behavior_type,hour,dayofweek,user_is_pv,user_is_cart,user_is_fav,...,item_emb_22,item_emb_23,item_emb_24,item_emb_25,item_emb_26,item_emb_27,item_emb_28,item_emb_29,item_emb_30,item_emb_31
0,330942,2331370,3607361,3607361_pv,pv,7,0,116,0,0,...,-0.760833,-0.720187,1.15889,-0.323915,0.237748,0.281913,0.387161,0.282125,-0.577759,0.1632
1,730430,2039893,3607361,3607361_pv,pv,6,6,189,0,0,...,-0.802876,-0.066263,0.826542,-0.416,0.484296,-1.128085,0.310752,0.59979,-0.394224,1.11055
2,460575,3623981,3607361,3607361_pv,pv,12,0,263,0,0,...,-0.240308,-1.094818,1.480657,-0.016926,0.255735,0.408532,0.163798,0.989531,-0.177817,0.415017
3,956323,1362545,3607361,3607361_pv,pv,6,6,84,0,0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
4,483536,1028640,3607361,3607361_pv,pv,10,3,107,0,0,...,-0.922799,-0.776164,1.262024,-0.840322,1.108268,-0.290338,0.119393,0.722909,-0.412245,1.127228


In [None]:
# subsample of final_df with 1 M rows but keeping distribution of labels
weights = final_df["label"].value_counts(normalize=True)
final_df["weights"] = final_df["label"].map(weights)
final_df_sub = final_df.sample(n=1_000_000, weights=final_df["weights"], random_state=42)
final_df_sub.to_csv("final_df_sub1M.csv", index=False)

In [28]:
# subsample of final_df with 100000 rows but keeping distribution of labels
weights = final_df["label"].value_counts(normalize=True)
final_df["weights"] = final_df["label"].map(weights)
final_df_sub = final_df.sample(n=100_000, weights=final_df["weights"], random_state=42)
final_df_sub.to_csv("final_df_sub100k.csv", index=False)

In [21]:
X = final_df.drop(columns=["label", "user_id", "item_id", "category_id", "behavior_type"])
y = final_df["label"]

In [23]:
from sklearn.cluster import KMeans
from sklearn.metrics import adjusted_rand_score, calinski_harabasz_score

In [25]:
model = KMeans(n_clusters=30, random_state=42)
labels = model.fit_predict(X)

KeyboardInterrupt: 