# Imports and Initialization

Importing necessary libraries and initializing the timer.

In [None]:
import json
import math
import os
import time
from functools import partial
from urllib.parse import urlencode

import faiss
import numpy as np
import pandas as pd
import requests
from catboost import CatBoostRanker, Pool
from sentence_transformers import SentenceTransformer
import shap
from matplotlib import pyplot as plt

# Configuration

Configuration settings and constants.

In [None]:
class Config:
    SEED = 42
    USE_FORMED_INDEX = True
    USE_FORMED_ID_MAPPING = True
    USE_FORMED_CANDIDATES = True
    VECTOR_DIM = 312
    BATCH_SIZE = 100_000
    SAMPLE_SIZE = 7_000_000
    TOP_N_QUERIES = 1000
    TOP_K = 300
    GENERATED_CANDIDATES_FILENAME = "generated_candidates.parquet"
    CANDIDATES_INDEX_FILENAME = "candidates.index"
    ID_MAPPING_FILENAME = "ind2videoid.json"
    MODEL_FILENAME = "ranker.ckpt"
    METRIC_PERIOD = 250
    QUANTILE_THRESHOLD = 0.999
    MODEL_PARAMS = {
        "task_type": "GPU",
        "verbose": False,
        "random_seed": SEED,
        "loss_function": "QueryRMSE",
        "learning_rate": 0.001,
        "l2_leaf_reg": 30,
        "iterations": 4000,
        "max_depth": 3,
        "metric_period": METRIC_PERIOD
    }

np.random.seed(Config.SEED)
start_time = time.time()

# Functions Definitions

This section defines all the necessary functions.

In [None]:
def download_file(public_key: str, local_filename: str) -> None:
    """
    Downloads a file from Yandex Disk using a public key and saves it locally.

    Parameters
    ----------
    public_key : str
        Public key to access the file on Yandex Disk.
    local_filename : str
        The name to save the downloaded file locally.

    Raises
    ------
    requests.exceptions.RequestException
        If there is an issue with the network request.
    """
    base_url = "https://cloud-api.yandex.net/v1/disk/public/resources/download?"
    final_url = base_url + urlencode(dict(public_key=public_key))
    response = requests.get(final_url)
    download_url = response.json()["href"]

    download_response = requests.get(download_url)
    with open(local_filename, "wb") as file:
        file.write(download_response.content)
        print(f"File {local_filename} downloaded successfully.")

def download_from_yandex_disk() -> None:
    """
    Downloads multiple files from Yandex Disk if they do not already exist locally.
    """
    files = {
        "features.parquet": "https://disk.yandex.ru/d/W_qJitz4dZGzAg",
        "videos.parquet": "https://disk.yandex.ru/d/JXz-oDfKFgm2Dw",
        "automarkup.parquet": "https://disk.yandex.ru/d/vP0FzQHdtxsz4Q",
        "manualmarkup.csv": "https://disk.yandex.ru/d/hDztN1rgW0JNjw",
    }

    for filename, link in files.items():
        if not os.path.exists(filename):
            print(f"Downloading {filename}...")
            download_file(link, filename)

def preprocess_features(features: pd.DataFrame, quantile_threshold: float = Config.QUANTILE_THRESHOLD) -> pd.DataFrame:
    """
    Preprocesses the feature DataFrame by filtering and creating new features.

    Parameters
    ----------
    features : pd.DataFrame
        DataFrame containing the feature data.
    quantile_threshold : float, optional
        Quantile threshold for filtering, by default 0.999

    Returns
    -------
    pd.DataFrame
        Processed DataFrame with new features.
    """
    views_threshold = features['v_year_views'].quantile(quantile_threshold)
    likes_threshold = features['v_likes'].quantile(quantile_threshold)
    features = features[
        (features['v_year_views'] >= views_threshold) &
        (features['v_likes'] >= likes_threshold)
    ]
    
    features['v_channel_reg_datetime'] = pd.to_datetime(features['v_channel_reg_datetime'])
    features['v_pub_datetime'] = pd.to_datetime(features['v_pub_datetime'])
    features['report_date'] = pd.to_datetime(features['report_date'])

    features['likes_dislikes_ratio'] = features['v_likes'] / (features['v_dislikes'] + 1)
    features['views_per_like'] = features['v_year_views'] / (features['v_likes'] + 1)
    features['comments_per_view'] = features['total_comments'] / features['v_year_views']
    features['likes_per_view'] = features['v_likes'] / features['v_year_views']

    features['channel_existence_duration'] = (features['v_pub_datetime'] - features['v_channel_reg_datetime']).dt.days
    features['video_age'] = (features['report_date'] - features['v_pub_datetime']).dt.days

    features['pub_year'] = features['v_pub_datetime'].dt.year
    features['pub_month'] = features['v_pub_datetime'].dt.month
    features['pub_day_of_week'] = features['v_pub_datetime'].dt.dayofweek
    features['pub_hour'] = features['v_pub_datetime'].dt.hour
    features['publish_season'] = features['v_pub_datetime'].dt.quarter
    features['publish_time_of_day'] = pd.cut(features['pub_hour'], bins=[-1, 6, 12, 18, 24], labels=['Night', 'Morning', 'Afternoon', 'Evening'])

    features = features.drop(columns=["v_channel_reg_datetime", "v_pub_datetime", "report_date"])

    return features

def merge_data(candidates: pd.DataFrame, features: pd.DataFrame) -> pd.DataFrame:
    """
    Merges candidate data with features and removes duplicate entries.

    Parameters
    ----------
    candidates : pd.DataFrame
        DataFrame containing candidate data with targets.
    features : pd.DataFrame
        DataFrame containing engineered features.

    Returns
    -------
    pd.DataFrame
        Merged DataFrame.
    """
    merged_df = candidates.merge(features, how="inner", on="video_id").drop_duplicates()
    
    merged_df["group_id"] = merged_df.groupby("query").ngroup()
    groups_to_drop = [
        group for group in merged_df["group_id"].unique()
        if merged_df[merged_df["group_id"] == group]["target"].sum() <= 0
    ]
    merged_df = merged_df[~merged_df["group_id"].isin(groups_to_drop)]
    
    return merged_df

def prepare_pool(df: pd.DataFrame, metainfo_cols: list, cat_features: list) -> Pool:
    """
    Prepares a CatBoost Pool from a DataFrame.

    Parameters
    ----------
    df : pd.DataFrame
        The DataFrame to convert into a Pool.
    metainfo_cols : list
        List of columns containing metadata.
    cat_features : list
        List of categorical feature columns.

    Returns
    -------
    Pool
        The prepared CatBoost Pool.
    """
    X = df.drop(columns=metainfo_cols)
    y = df["target"]
    g = df["group_id"]
    return Pool(data=X, label=y, group_id=g, cat_features=cat_features)

def compute_metrics(at: int, model: CatBoostRanker, pool: Pool, metric="NDCG") -> dict:
    """
    Computes the metrics for the model at a specific top-N rank.

    Parameters
    ----------
    at : int
        The top-N rank at which to compute the metrics.
    model : CatBoostRanker
        The trained CatBoost model.
    pool : Pool
        The data pool to evaluate.
    metric : str, optional
        The metric to compute, by default "NDCG"

    Returns
    -------
    dict
        Dictionary of computed metrics.
    """
    eval_metric = f"{metric}:top={at}"
    eval_metrics = model.eval_metrics(pool, metrics=[eval_metric])
    return {key: eval_metrics[key][model.best_iteration_] for key in eval_metrics}

def plot_query_distribution(automarkup_df: pd.DataFrame) -> None:
    """
    Plots the distribution of queries.

    Parameters
    ----------
    automarkup_df : pd.DataFrame
        DataFrame containing the query data.
    """
    query_counts = automarkup_df['query'].value_counts()[:300]
    x = np.arange(300)
    
    fig, ax = plt.subplots(1, 2, figsize=(12, 5))
    
    ax[0].plot(x, query_counts, label='Query Counts')
    ax[0].set_xlabel('Query Rank')
    ax[0].set_ylabel('Query Counts')
    ax[0].set_title('Query Counts (Linear Scale)')
    ax[0].legend()
    
    ax[1].plot(np.log1p(x), np.log1p(query_counts), label='Log Query Counts')
    ax[1].set_xlabel('Log Query Rank')
    ax[1].set_ylabel('Log Query Counts')
    ax[1].set_title('Query Counts (Log Scale)')
    ax[1].legend()
    
    plt.show()

# Data Downloading and Setup

This section downloads necessary data files from Yandex Disk and sets up the environment for further analysis.


In [None]:
download_from_yandex_disk()

# Data Preparation

This section prepares the data by reading the parquet files and sampling a subset of the data.


In [None]:
candidates = pd.read_parquet("videos.parquet", engine="pyarrow", columns=["video_id", "video_title"])
candidates = candidates.sample(n=Config.SAMPLE_SIZE, replace=False, random_state=Config.SEED)
corpus = candidates["video_title"].str.lower().values
video_ids = candidates["video_id"].values
del candidates

# Load the Sentence Transformer model
st_model = SentenceTransformer("cointegrated/rubert-tiny2", device="cuda")

# FAISS Index Setup

This section sets up the FAISS index for vector search.

In [None]:
if not Config.USE_FORMED_INDEX:
    cpu_index = faiss.IndexFlatL2(Config.VECTOR_DIM)

if not Config.USE_FORMED_ID_MAPPING:
    ind2videoid = {ind: video_id for ind, video_id in enumerate(video_ids)}
    with open(Config.ID_MAPPING_FILENAME, "w") as file:
        json.dump(ind2videoid, file, indent=4)
else:
    with open(Config.ID_MAPPING_FILENAME, "r") as file:
        ind2videoid = json.load(file)

num_batches = math.ceil(len(corpus) / Config.BATCH_SIZE)

if not Config.USE_FORMED_INDEX:
    embeddings_list = []
    for i in range(num_batches):
        start, end = i * Config.BATCH_SIZE, (i + 1) * Config.BATCH_SIZE
        corpus_batch = corpus[start:end]
        embeddings = st_model.encode(corpus_batch, batch_size=1000, show_progress_bar=True)
        embeddings_list.append(embeddings)
        print(f"Processed batch {i + 1}/{num_batches}")

    all_embeddings = np.concatenate(embeddings_list)
    cpu_index.add(all_embeddings)
    faiss.write_index(cpu_index, Config.CANDIDATES_INDEX_FILENAME)
    print(f"Total vectors in index: {cpu_index.ntotal}")

# Candidate Selection

This section selects candidate videos based on the FAISS index.

In [None]:
automarkup = pd.read_parquet("automarkup.parquet", engine="pyarrow")
automarkup = automarkup[~automarkup["query"].isna()]
automarkup["query"] = automarkup["query"].str.lower()

top_n_queries = automarkup["query"].value_counts()[:2 * Config.TOP_N_QUERIES].index.to_list()
query2ind = {query: i for i, query in enumerate(top_n_queries)}

qembeddings = st_model.encode(top_n_queries, batch_size=1000, show_progress_bar=True)
search_cpu_index = faiss.read_index(Config.CANDIDATES_INDEX_FILENAME)

if not Config.USE_FORMED_CANDIDATES:
    distances, faiss_indices = search_cpu_index.search(qembeddings, Config.TOP_K)
    
    generated_candidates = {
        "query": [],
        "video_id": [],
        "dist": []
    }
    
    for i, query in enumerate(top_n_queries):
        vids = faiss_indices[i]
        generated_candidates["video_id"] += [ind2videoid[v] for v in vids]
        generated_candidates["dist"] += list(distances[i])
        generated_candidates["query"] += [query] * len(vids)

    generated_candidates_df = pd.DataFrame(generated_candidates)
    generated_candidates_df.to_parquet(Config.GENERATED_CANDIDATES_FILENAME, engine="pyarrow")
else:
    generated_candidates_df = pd.read_parquet(Config.GENERATED_CANDIDATES_FILENAME, engine="pyarrow")


# Target Formation

This section forms the target labels based on auto-markup.

In [None]:
automarkup["target"] = 1
candidates_with_target = generated_candidates_df.merge(
    automarkup[["query", "video_id", "target"]],
    how="left",
    on=["query", "video_id"]
)
candidates_with_target["target"].fillna(0, inplace=True)


# Query Distribution Analysis

This section analyzes the distribution of queries.

In [None]:
plot_query_distribution(automarkup)

# Feature Engineering

This section engineers features from the available data.

In [None]:
features = pd.read_parquet("features.parquet", engine="pyarrow")
features = preprocess_features(features)
display(features.head())
display(features.info())
display(features.describe())
display(features.isnull().sum())

# Data Merging

This section merges the candidate data with the engineered features.


In [None]:
full_df = merge_data(candidates_with_target, features)
full_df.to_parquet("full_df.parquet")

groups = pd.Series(full_df["group_id"].unique())
permutation = groups.sample(frac=1, random_state=Config.SEED)
train_groups, val_groups, test_groups = np.split(permutation, [int(0.75 * len(permutation)), int(0.90 * len(permutation))])

train_df = full_df[full_df["group_id"].isin(train_groups)]
val_df = full_df[full_df["group_id"].isin(val_groups)]
test_df = full_df[full_df["group_id"].isin(test_groups)]

train_df = train_df.sort_values("group_id")
val_df = val_df.sort_values("group_id")
test_df = test_df.sort_values("group_id")


# Model Preparation

This section prepares the data for model training.

In [None]:
metainfo_columns = ["query", "video_id", "target", "group_id"]

categorical_features = [
    "v_channel_type", "v_category", "pub_year", "pub_month", 
    "pub_hour", "pub_day_of_week", "publish_season", "publish_time_of_day"
]

train_pool = prepare_pool(train_df, metainfo_columns, categorical_features)
val_pool = prepare_pool(val_df, metainfo_columns, categorical_features)
test_pool = prepare_pool(test_df, metainfo_columns, categorical_features)

# Model Training

This section trains the CatBoost model.

In [None]:
model = CatBoostRanker(**Config.MODEL_PARAMS)
model.fit(train_pool, eval_set=val_pool, use_best_model=True)
model.save_model(Config.MODEL_FILENAME)

# Metric Measurement

This section defines functions to measure model metrics.

In [None]:
metrics_train_at = partial(compute_metrics, model=model, pool=train_pool)
metrics_val_at = partial(compute_metrics, model=model, pool=val_pool)
metrics_test_at = partial(compute_metrics, model=model, pool=test_pool)

display(metrics_train_at(1), metrics_train_at(5))
display(metrics_val_at(1), metrics_val_at(5))
display(metrics_test_at(1), metrics_test_at(5))

# SHAP Analysis

This section performs SHAP analysis to explain the model predictions.

In [None]:
shap_explainer = shap.Explainer(model)
shap_values = shap_explainer.shap_values(train_pool.get_features())
shap.summary_plot(shap_values, train_pool.get_features(), feature_names=train_pool.get_feature_names())
shap.summary_plot(shap_values, train_pool.get_features(), feature_names=train_pool.get_feature_names(), plot_type='bar')

# Cleanup and Timing

This section cleans up the memory and prints the total execution time.

In [None]:
del model, full_df, train_df, val_df, test_df, train_pool, val_pool, test_pool
end_time = time.time()
total_time = (end_time - start_time) / 60
print(f"Total execution time: {int(total_time)} minutes")