In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import requests
from datetime import datetime as dt
import boto3
from io import BytesIO
import pickle
import os
from dotenv import load_dotenv
import scipy
from sklearn.preprocessing import LabelEncoder
from implicit.als import AlternatingLeastSquares
from sklearn.preprocessing import MinMaxScaler
import sklearn

plt.style.use('ggplot')



In [2]:
load_dotenv()

S3_BUCKET_NAME=os.getenv("S3_BUCKET_NAME")
AWS_ACCESS_KEY_ID=os.getenv("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY=os.getenv("AWS_SECRET_ACCESS_KEY")

In [3]:
# определение функций для работы с s3
def upload_to_s3(df, file_name):
    s3_client = boto3.client(
        "s3",
        endpoint_url='https://storage.yandexcloud.net',
        aws_access_key_id=AWS_ACCESS_KEY_ID,
        aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
    )
    
    with BytesIO() as buffer:
        df.to_parquet(buffer)
        buffer.seek(0)
        s3_client.upload_fileobj(buffer, S3_BUCKET_NAME, file_name)
    
    
def download_from_s3(file_name) -> pd.DataFrame:
    s3_client = boto3.client(
        "s3",
        endpoint_url='https://storage.yandexcloud.net',
        aws_access_key_id=AWS_ACCESS_KEY_ID,
        aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
    )
    with BytesIO() as buffer:
        s3_client.download_fileobj(S3_BUCKET_NAME, file_name, buffer)
        buffer.seek(0)
        return pd.read_parquet(buffer)
    
    
def upload_pickle_to_s3(object, file_name):
    s3_client = boto3.client(
        "s3",
        endpoint_url='https://storage.yandexcloud.net',
        aws_access_key_id=AWS_ACCESS_KEY_ID,
        aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
    )
    with BytesIO() as buffer:
        pickle.dump(object, buffer)
        buffer.seek(0)
        s3_client.upload_fileobj(buffer, S3_BUCKET_NAME, file_name)
        
def download_pickle_from_s3(file_name):
    s3_client = boto3.client(
        "s3",
        endpoint_url='https://storage.yandexcloud.net',
        aws_access_key_id=AWS_ACCESS_KEY_ID,
        aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
    )
    with BytesIO() as buffer:
        s3_client.download_fileobj(S3_BUCKET_NAME, file_name, buffer)
        buffer.seek(0)
        return pickle.load(buffer)

## Выгрузка данных

In [4]:
data = download_from_s3('bank_products_processed.parquet')

In [5]:
split_date = '2016-01-28'

train_data = data[data['div_data'] <= split_date]
test_data = data[data['div_data'] > split_date]


In [6]:
target_cols = [col for col in train_data.columns if col.startswith('acc_')]
acc_to_id = {acc: id for id, acc in enumerate(target_cols)}
id_to_acc = {id: acc for id, acc in enumerate(target_cols)}

## Baseline model  
Первый вариант модели с использованием только данных о взаимодействии клиентов с объектами

In [7]:
client_enc = LabelEncoder()

In [8]:
train_data['max_date'] = train_data['div_data'].max()
test_data['max_date'] = test_data['div_data'].max()

train_data['days_from_max_date'] = (train_data['max_date'] - train_data['div_data']).dt.days
test_data['days_from_max_date'] = (test_data['max_date'] - test_data['div_data']).dt.days

train_data['days_from_max_date'] = train_data['days_from_max_date'].astype(int)
test_data['days_from_max_date'] = test_data['days_from_max_date'].astype(int)

train_data['weight'] = train_data['days_from_max_date'] / train_data['days_from_max_date'].max()
test_data['weight'] = test_data['days_from_max_date'] / test_data['days_from_max_date'].max()


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  train_data['max_date'] = train_data['div_data'].max()
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  test_data['max_date'] = test_data['div_data'].max()
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  train_data['days_from_max_date'] = (train_data['max_date'] - train_data['div_data']).dt.days
A valu

In [9]:
for target in target_cols:
    train_data[target] = train_data[target] * train_data['weight']
    test_data[target] = test_data[target] * test_data['weight']

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  train_data[target] = train_data[target] * train_data['weight']
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  test_data[target] = test_data[target] * test_data['weight']


In [10]:
interaction_matrix = train_data[['client_id'] + target_cols]

interaction_matrix = interaction_matrix.groupby('client_id').sum().reset_index()

In [11]:
interaction_matrix.shape

(927690, 25)

In [12]:
acc_to_id = {acc: id for id, acc in enumerate(target_cols)}
id_to_acc = {id: acc for id, acc in enumerate(target_cols)}

In [13]:
interaction_matrix['accounts'] = [[targets] for targets in interaction_matrix[target_cols].values]
interaction_matrix['accounts'] = interaction_matrix['accounts'].apply(lambda x: x[0])
interaction_matrix['accounts_name'] = [list(id_to_acc.keys())] * interaction_matrix.shape[0]

In [14]:
interaction_matrix = interaction_matrix[['client_id', 'accounts', 'accounts_name']]
del train_data


In [15]:
# interaction_matrix = interaction_matrix[['client_id', 'accounts', 'accounts_name']].explode(['accounts', 'accounts_name'])

interaction_matrix_exploded = list()
interaction_matrix = interaction_matrix.values

for row in interaction_matrix:
    accounts = row[1]
    accounts_name = row[2]
    client_id = row[0]
    for account, account_name in zip(accounts, accounts_name):
        interaction_matrix_exploded.append((client_id, account, account_name))

interaction_matrix_exploded = pd.DataFrame(interaction_matrix_exploded, columns=['client_id', 'account', 'account_name'])
interaction_matrix_exploded = interaction_matrix_exploded[interaction_matrix_exploded['account'] > 0]
interaction_matrix_exploded.head()

Unnamed: 0,client_id,account,account_name
2,15889,6.860274,2
8,15889,6.860274,8
18,15889,2.846575,18
19,15889,6.860274,19
28,15890,6.860274,4


In [16]:
# interaction_matrix_exploded['account'] = scaler.fit_transform(interaction_matrix_exploded[['account']])
interaction_matrix_exploded['client_id'] = client_enc.fit_transform(interaction_matrix_exploded['client_id'])


In [17]:
# создаём sparse-матрицу формата CSR 
user_item_matrix_train = scipy.sparse.csr_matrix((
    interaction_matrix_exploded["account"],
    (interaction_matrix_exploded['client_id'], interaction_matrix_exploded['account_name'])),
    dtype=np.float32)

In [18]:
als_model = AlternatingLeastSquares(factors=50, iterations=50, regularization=0.05, random_state=0)
als_model.fit(user_item_matrix_train)

  check_blas_config()


  0%|          | 0/50 [00:00<?, ?it/s]

In [19]:
upload_pickle_to_s3(als_model, 'als_model.pkl')
upload_pickle_to_s3(client_enc, 'client_enc.pkl')
upload_pickle_to_s3(user_item_matrix_train, 'user_item_matrix_train.pkl')

## Model evaluation

In [7]:
def create_interaction_matrix(data: pd.DataFrame):
    
    data['max_date'] = data['div_data'].max()
    data['days_from_max_date'] = (data['max_date'] - data['div_data']).dt.days
    data['days_from_max_date'] = data['days_from_max_date'].astype(int)
    data['weight'] = data['days_from_max_date'] / data['days_from_max_date'].max()
    
    for target in target_cols:
        data[target] = data[target] * data['weight']

    interaction_matrix = data[['client_id'] + target_cols]
    interaction_matrix = interaction_matrix.groupby('client_id').sum().reset_index()
    
    interaction_matrix['accounts'] = [[targets] for targets in interaction_matrix[target_cols].values]
    interaction_matrix['accounts'] = interaction_matrix['accounts'].apply(lambda x: x[0])
    interaction_matrix['accounts_name'] = [list(id_to_acc.keys())] * interaction_matrix.shape[0]
    
    interaction_matrix = interaction_matrix[['client_id', 'accounts', 'accounts_name']]
    
    interaction_matrix_exploded = list()
    interaction_matrix = interaction_matrix.values

    for row in interaction_matrix:
        accounts = row[1]
        accounts_name = row[2]
        client_id = row[0]
        for account, account_name in zip(accounts, accounts_name):
            interaction_matrix_exploded.append((client_id, account, account_name))

    interaction_matrix_exploded = pd.DataFrame(interaction_matrix_exploded, columns=['client_id', 'account', 'account_name'])
    interaction_matrix_exploded = interaction_matrix_exploded[interaction_matrix_exploded['account'] > 0]

    
    return interaction_matrix_exploded

In [8]:
als_model = download_pickle_from_s3('als_model.pkl')
client_enc = download_pickle_from_s3('client_enc.pkl')
user_item_matrix_train = download_pickle_from_s3('user_item_matrix_train.pkl')

https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations


In [9]:
test_clients = [client for client in test_data['client_id'].unique() if client in client_enc.classes_]
test_clients_encoded = client_enc.transform(test_clients)
recommendations = als_model.recommend(test_clients_encoded, user_item_matrix_train[test_clients_encoded], filter_already_liked_items=True, N=5)

In [10]:
test_interaction_matrix = create_interaction_matrix(test_data)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  data['max_date'] = data['div_data'].max()
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  data['days_from_max_date'] = (data['max_date'] - data['div_data']).dt.days
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  data['days_from_max_date'] = data['days_from_max_date'].astype(int)
A value is trying to

In [11]:
test_interaction_matrix.rename(columns={'account': 'rating', 'account_name': 'account_id'}, inplace=True)

In [12]:
# преобразуем полученные рекомендации в табличный формат
item_ids_enc = recommendations[0]
als_scores = recommendations[1]

als_recommendations = pd.DataFrame({
    "user_id_enc": test_clients_encoded,
    "item_id_enc": item_ids_enc.tolist(), 
    "score": als_scores.tolist()})
als_recommendations = als_recommendations.explode(["item_id_enc", "score"], ignore_index=True)

# приводим типы данных
als_recommendations["item_id_enc"] = als_recommendations["item_id_enc"].astype("int")
als_recommendations["score"] = als_recommendations["score"].astype("float")

# получаем изначальные идентификаторы
als_recommendations["user_id"] = client_enc.inverse_transform(als_recommendations["user_id_enc"])
# als_recommendations["item_id"] = als_recommendations["item_id_enc"].map(id_to_acc)
als_recommendations = als_recommendations.drop(columns=["user_id_enc"])

In [13]:
def compute_ndcg(rating: pd.Series, score: pd.Series, k):

    """ подсчёт ndcg
    rating: истинные оценки
    score: оценки модели
    k: количество айтемов (по убыванию score) для оценки, остальные - отбрасываются
    """
    
    # если кол-во объектов меньше 2, то NDCG - не определена
    if len(rating) < 2:
        return np.nan

    ndcg = sklearn.metrics.ndcg_score(np.asarray([rating.to_numpy()]), np.asarray([score.to_numpy()]), k=k)

    return ndcg

def compute_hit_rate(items: pd.Series, recommendations: pd.Series, k: int = 5):

    """ подсчёт hit rate
    items: истинные айтемы
    recommendations: рекоменуемые айтемы
    k: количество айтемов (по убыванию score) для оценки, остальные - отбрасываются
    """
    hit_cnt = 0
    rate_cnt = 0
    
    for rec_item in recommendations:
        if rec_item in items:
            rate_cnt += 1
        hit_cnt += 1
        if hit_cnt >= k:
            break
    return rate_cnt / len(items)

def process_events_recs_for_binary_metrics(events_train, events_test, recs, top_k=None):

    """
    размечает пары <user_id, item_id> для общего множества пользователей признаками
    - gt (ground truth)
    - pr (prediction)
    top_k: расчёт ведётся только для top k-рекомендаций
    """

    events_test["gt"] = True
    common_users = set(events_test["user_id"]) & set(recs["user_id"])

    print(f"Common users: {len(common_users)}")
    
    events_for_common_users = events_test[events_test["user_id"].isin(common_users)].copy()
    recs_for_common_users = recs[recs["user_id"].isin(common_users)].copy()

    recs_for_common_users = recs_for_common_users.sort_values(["user_id", "score"], ascending=[True, False])

    # оставляет только те item_id, которые были в events_train, 
    # т. к. модель не имела никакой возможности давать рекомендации для новых айтемов
    # events_for_common_users = events_for_common_users[events_for_common_users["item_id"].isin(events_train["item_id"].unique())]

    if top_k is not None:
        recs_for_common_users = recs_for_common_users.groupby("user_id").head(top_k)
    
    events_recs_common = events_for_common_users[["user_id", "item_id", "gt"]].merge(
        recs_for_common_users[["user_id", "item_id", "score"]], 
        on=["user_id", "item_id"], how="outer")    

    events_recs_common["gt"] = events_recs_common["gt"].fillna(False)
    events_recs_common["pr"] = ~events_recs_common["score"].isnull()
    
    events_recs_common["tp"] = events_recs_common["gt"] & events_recs_common["pr"]
    events_recs_common["fp"] = ~events_recs_common["gt"] & events_recs_common["pr"]
    events_recs_common["fn"] = events_recs_common["gt"] & ~events_recs_common["pr"]

    return events_recs_common 

def compute_cls_metrics(events_recs_for_binary_metric):
    
    groupper = events_recs_for_binary_metric.groupby("user_id")

    # precision = tp / (tp + fp)
    precision = groupper["tp"].sum()/(groupper["tp"].sum()+groupper["fp"].sum())
    precision = precision.fillna(0).mean()
    
    # recall = tp / (tp + fn)
    recall = groupper["tp"].sum()/(groupper["tp"].sum()+groupper["fn"].sum())
    recall = recall.fillna(0).mean()

    return precision, recall 

In [14]:
users_coverage = len(als_recommendations['user_id'].unique()) / len(test_data['client_id'].unique())
items_coverage = len(als_recommendations['item_id_enc'].unique()) / len(target_cols)

print(f"Users coverage: {users_coverage}")
print(f"Items coverage: {items_coverage}")

Users coverage: 0.7443695569536453
Items coverage: 1.0


In [15]:
als_recommendations = (
    als_recommendations
    .merge(test_interaction_matrix.rename(columns={"client_id": "user_id", "account_id": "item_id_enc"}), 
           on=["user_id", "item_id_enc"], how="left")
)


In [16]:
rating_test_idx = ~als_recommendations["rating"].isnull()
ndcg_at_5_scores = als_recommendations[rating_test_idx].groupby("user_id").apply(lambda x: compute_ndcg(x["rating"], x["score"], k=5))
hit_at_5_scores = als_recommendations.groupby("user_id").apply(lambda x: compute_hit_rate(x["rating"], x["score"], k=5))
# ndcg_at_5_scores = als_recommendations[rating_test_idx].groupby("user_id").apply(lambda x: compute_ndcg(x["rating"], x["score"], k=5))

print(f"NDCG at 5: {ndcg_at_5_scores.mean()}")

NDCG at 5: 0.9683827217035406


In [17]:
precision_5, recall_5 = compute_cls_metrics(process_events_recs_for_binary_metrics(
  None,
    test_interaction_matrix.rename(columns={"client_id": "user_id", "account_id": "item_id"}), 
    als_recommendations.rename(columns={"item_id_enc": "item_id"}), 
    top_k=5) )


Common users: 681540


In [18]:
print(f"Precision at 5: {precision_5}, Recall at 5: {recall_5}")


Precision at 5: 0.006006690729817766, Recall at 5: 0.010828292579106913


## Логирование в MlFlow

In [19]:
import mlflow

* 'schema_extra' has been renamed to 'json_schema_extra'


In [35]:
# константы для логирования в mlflow

EXPERIMENT_NAME = "final_project_bank_alexdem"


os.environ["MLFLOW_S3_ENDPOINT_URL"] = "https://storage.yandexcloud.net"
os.environ["AWS_ACCESS_KEY_ID"] = os.getenv("AWS_ACCESS_KEY_ID")
os.environ["AWS_SECRET_ACCESS_KEY"] = os.getenv("AWS_SECRET_ACCESS_KEY")

TRACKING_SERVER_HOST = "127.0.0.1"
TRACKING_SERVER_PORT = 5020

mlflow.set_tracking_uri(f"http://{TRACKING_SERVER_HOST}:{TRACKING_SERVER_PORT}")
mlflow.set_registry_uri(f"http://{TRACKING_SERVER_HOST}:{TRACKING_SERVER_PORT}")

In [37]:
RUN_NAME = "baseline_als_model_bank"
REGISTRY_MODEL_NAME = "model_als_bank_alexdem"

pip_requirements= "../requirements.txt"
signature = mlflow.models.infer_signature(user_item_matrix_train.toarray(), np.array(recommendations))
input_example = test_clients_encoded[:10]


experiment_id = mlflow.get_experiment_by_name(EXPERIMENT_NAME).experiment_id

with mlflow.start_run(run_name=RUN_NAME, experiment_id=experiment_id) as run:
    run_id = run.info.run_id
    
    model_info = mlflow.sklearn.log_model( 
			      als_model,
            artifact_path="models",
            registered_model_name=REGISTRY_MODEL_NAME,
            pip_requirements=pip_requirements,
            signature=signature,
            input_example=input_example,
            await_registration_for=60
		)
    mlflow.log_metrics({'precision_5': precision_5, 'recall_5': recall_5, 'ndcg_5': ndcg_at_5_scores.mean(), "users_coverage": users_coverage, "items_coverage": items_coverage})
    mlflow.log_artifact("train_model_baseline.ipynb")


Registered model 'model_als_bank_alexdem' already exists. Creating a new version of this model...
2025/09/05 14:35:27 INFO mlflow.tracking._model_registry.client: Waiting up to 60 seconds for model version to finish creation. Model name: model_als_bank_alexdem, version 2
Created version '2' of model 'model_als_bank_alexdem'.
