In [6]:
# делаем import необходимых библиотек
import os

import mlflow

# устанавливаем локальное хранилище для наших экспериментов
# хранилище должно быть такое же, как и при запуске сервиса
mlflow.set_tracking_uri('file:./mlflow_experiments_store')

# получаем id эксеримента, который создаётся по умолчанию
# эксперимент по умолчанию называется Default
experiment_id = mlflow.get_experiment_by_name("Default").experiment_id

# залогируем тестовую метрику и артефакт
with mlflow.start_run(run_name='Default', experiment_id=experiment_id) as run:
    run_id = run.info.run_id
    mlflow.log_metric("test_metric", 0)
    mlflow.log_artifact("/home/mle-user/mle_projects/mle-mlflow/test_artifact.txt", "test_artifact")

print(f"Run id запуска: {run_id}")

Run id запуска: 37a0ce26eb914a1e8a65a9ded48a23ef


In [7]:
!ls mlflow_experiments_store/0/{run_id}/metrics

test_metric


In [8]:
!cat mlflow_experiments_store/0/{run_id}/artifacts/test_artifact/test_artifact.txt

test_artifact


In [12]:
import os

import mlflow

mlflow.set_tracking_uri('http://0.0.0.0:5000')

# получаем id эксперимента, который создаётся по умолчанию
# эксперимент по умолчанию называется Default
experiment_id = mlflow.get_experiment_by_name("Default").experiment_id

with mlflow.start_run(run_name="Default", experiment_id=experiment_id) as run:
    run_id = run.info.run_id
    
    mlflow.log_metric("test_metric_sqlite", 0)
    mlflow.log_artifact("/home/mle-user/mle_projects/mle-mlflow/test_artifact.txt", "test_artifact")

# проверим, что наши данные сохранились в локальной папке, а также создалась база данных SQLite
assert os.path.exists("/home/mle-user/mle_projects/mle-mlflow/mlflow_experiments_store_sqlite")
assert os.path.exists("/home/mle-user/mle_projects/mle-mlflow/mydb.sqlite")

In [None]:
import os

import mlflow

# определяем основные credentials, которые нужны для подключения к MLflow
# важно, что credentials мы передаём для себя как пользователей Tracking Service
# у вас должен быть доступ к бакету, в который вы будете складывать артефакты
os.environ["MLFLOW_S3_ENDPOINT_URL"] = "https://storage.yandexcloud.net" #endpoint бакета от YandexCloud
os.environ["S3_ACCESS_KEY"] = os.getenv("S3_ACCESS_KEY") # получаем id ключа бакета, к которому подключён MLFlow, из .env
os.environ["S3_SECRET_KEY"] = os.getenv("S3_SECRET_KEY") # получаем ключ бакета, к которому подключён MLFlow, из .env

# определяем глобальные переменные
# поднимаем MLflow локально
TRACKING_SERVER_HOST = "127.0.0.1"
TRACKING_SERVER_PORT = 5000

YOUR_NAME = "ALEKSANDR" # введите своё имя для создания уникального эксперимента
assert YOUR_NAME, "ALEKSANDR"

# название тестового эксперимента и запуска (run) внутри него
EXPERIMENT_NAME = f"test_connection_experiment_{YOUR_NAME}"
RUN_NAME = "test_connection_run"

# тестовые данные
METRIC_NAME = "test_metric"
METRIC_VALUE = 0

# устанавливаем host, который будет отслеживать наши эксперименты
mlflow.set_tracking_uri(f"http://{TRACKING_SERVER_HOST}:{TRACKING_SERVER_PORT}")

# создаём тестовый эксперимент и записываем в него тестовую информацию
experiment_id = mlflow.create_experiment(EXPERIMENT_NAME)
with mlflow.start_run(run_name=RUN_NAME, experiment_id=experiment_id) as run:
    run_id = run.info.run_id
    
    mlflow.log_metric(METRIC_NAME, METRIC_VALUE)

TypeError: str expected, not NoneType

In [4]:
import os
import pandas as pd
import psycopg

connection = {"sslmode": "require", "target_session_attrs": "read-write"}
postgres_credentials = {
    "host": os.getenv("DB_DESTINATION_HOST"), 
    "port": os.getenv("DB_DESTINATION_PORT"),
    "dbname": os.getenv("DB_DESTINATION_NAME"),
    "user": os.getenv("DB_DESTINATION_USER"),
    "password": os.getenv("DB_DESTINATION_PASSWORD"),
}
assert all([var_value != "" for var_value in list(postgres_credentials.values())])

connection.update(postgres_credentials)

# определим название таблицы, в которой хранятся наши данные.
TABLE_NAME = "users_churn"

# эта конструкция создаёт контекстное управление для соединения с базой данных 
# оператор with гарантирует, что соединение будет корректно закрыто после выполнения всех операций 
# закрыто оно будет даже в случае ошибки, чтобы не допустить "утечку памяти"
with psycopg.connect(**connection) as conn:

# создаёт объект курсора для выполнения запросов к базе данных
# с помощью метода execute() выполняется SQL-запрос для выборки данных из таблицы TABLE_NAME
    with conn.cursor() as cur:
        cur.execute(f"SELECT * FROM {TABLE_NAME}")
                
                # извлекаем все строки, полученные в результате выполнения запроса
        data = cur.fetchall()

                # получает список имён столбцов из объекта курсора
        columns = [col[0] for col in cur.description]

# создаёт объект DataFrame из полученных данных и имён столбцов. 
# это позволяет удобно работать с данными в Python, используя библиотеку Pandas.
df = pd.DataFrame(data, columns=columns)

In [5]:
# 1. Название колонок вашего датафрейма запишите в текстовый файл
with open("columns.txt", "w", encoding="utf-8") as fio:
    fio.write(",".join(df.columns))

In [6]:
counts_columns = [
    "type", "paperless_billing", "internet_service", "online_security", "online_backup", "device_protection",
    "tech_support", "streaming_tv", "streaming_movies", "gender", "senior_citizen", "partner", "dependents",
    "multiple_lines", "target"
]

stats = {}

for col in counts_columns:
    column_stat = df[col].value_counts().to_dict()
    column_stat = {f"{col}_{key}": int(value) for key, value in column_stat.items()}
    stats.update(column_stat)

stats["data_length"] = int(df.shape[0])
stats["monthly_charges_min"] = float(df["monthly_charges"].min())
stats["monthly_charges_max"] = float(df["monthly_charges"].max())
stats["monthly_charges_mean"] = float(df["monthly_charges"].mean())
stats["monthly_charges_median"] = float(df["monthly_charges"].median())
stats["total_charges_min"] = float(df["total_charges"].min())
stats["total_charges_max"] = float(df["total_charges"].max())
stats["total_charges_mean"] = float(df["total_charges"].mean())
stats["total_charges_median"] = float(df["total_charges"].median())
stats["unique_customers_number"] = int(df["customer_id"].nunique())
stats["end_date_nan"] = int(df["end_date"].isna().sum())

In [9]:
df.to_csv("users_churn.csv", index=False)

In [11]:
import mlflow
import os

# задаём название эксперимента и имя запуска для логирования в MLflow
EXPERIMENT_NAME = "churn_fio"
RUN_NAME = "data_check"

# устанавливаем эксперимент (создаёт новый или использует существующий)
mlflow.set_experiment(EXPERIMENT_NAME)

with mlflow.start_run(run_name=RUN_NAME) as run:
    # получаем уникальный идентификатор запуска эксперимента
    run_id = run.info.run_id
    
    # логируем метрики эксперимента
    mlflow.log_metrics(stats)
    
    # логируем файлы как артефакты эксперимента
    mlflow.log_artifact("columns.txt", artifact_path="dataframe")
    mlflow.log_artifact("users_churn.csv", artifact_path="dataframe")

experiment = mlflow.get_experiment_by_name(EXPERIMENT_NAME)
# получаем данные о запуске эксперимента по его уникальному идентификатору
run = mlflow.get_run(run_id)

# проверяем, что статус запуска эксперимента изменён на 'FINISHED'
assert run.info.status == "FINISHED"

# удаляем файлы
os.remove("columns.txt")
os.remove("users_churn.csv")

## Логируем модель

Обычно, помимо самой модели, логируются и метрики качества модели. Для этого оценим, насколько хорошо модель делает предсказания, и сохраним эти метрики. Предсказанные значения хранятся в переменной prediction, а реальные значения в y_test.
Начнём с вычисления метрик, используя модуль sklearn.metrics. Оцените метрики:
ROC-AUC,
F1-мера,
точность — precision,
полнота recall,
матрица ошибок — confusion_matrix,
logloss, которая показывает логарифмические потери.
Значения этих метрик запишите в словарь metrics. 

In [None]:
from sklearn.metrics import (
    roc_auc_score,
    f1_score,
    precision_score,
    recall_score,
    confusion_matrix,
    log_loss
)

# заведите словарь со всеми метриками
metrics = {}

# посчитайте метрики из модуля sklearn.metrics
# err_1 — ошибка первого рода (False Positive)
# err_2 — ошибка второго рода (False Negative)
_, err1, _, err2 = confusion_matrix(y_test, prediction, normalize='all').ravel()
auc = roc_auc_score(y_test, probas)
precision = precision_score(y_test, prediction)
recall = recall_score(y_test, prediction)
f1 = f1_score(y_test, prediction)
logloss = log_loss(y_test, prediction)

# запишите значения метрик в словарь
metrics["err1"] = err1
metrics["err2"] = err2
metrics["auc"] = auc
metrics["precision"] = precision
metrics["recall"] = recall
metrics["f1"] = f1
metrics["logloss"] = logloss

Разверните сервер для запуска MLflow с хранилищем экспериментов и артефактов. Не забудьте про модуль Model Registry. 

In [None]:
#!/bin/bash

export MLFLOW_S3_ENDPOINT_URL=https://storage.yandexcloud.net
export AWS_ACCESS_KEY_ID=$S3_ACCESS_KEY
export AWS_SECRET_ACCESS_KEY=$S3_SECRET_KEY

mlflow server \
    --backend-store-uri postgresql://mle_20250507_39f5f3ff21_freetrack:76bc4e5fcfcd46cd8da35b17e6d24263@rc1b-uh7kdmcx67eomesf.mdb.yandexcloud.net:6432/playground_mle_20250507_39f5f3ff21 \
    --registry-store-uri postgresql://mle_20250507_39f5f3ff21_freetrack:76bc4e5fcfcd46cd8da35b17e6d24263@rc1b-uh7kdmcx67eomesf.mdb.yandexcloud.net:6432/playground_mle_20250507_39f5f3ff21 \
    --default-artifact-root s3://s3-student-mle-20250507-39f5f3ff21-freetrack \
    --no-serve-artifacts

Зарегистрируйте вашу базовую модель в реестре моделей с полученным словарём метрик.
Используйте существующий эксперимент EXPERIMENT_NAME и новый запуск RUN_NAME.
Имя зарегистрированной модели сохраните в переменную REGISTRY_MODEL_NAME.
Окружение проекта сформируйте в файле requirements.txt.
Сформируйте сигнатуру модели из тестовых данных и предсказания модели.
Добавьте мета-информацию, которую считаете важной.

In [None]:
import os

import mlflow


EXPERIMENT_NAME = "churn_prediction_stepanov"  # ваше уникальное имя эксперимента
RUN_NAME = "model_0_registry"
REGISTRY_MODEL_NAME = "churn_model_nikolaistepanov"


os.environ["MLFLOW_S3_ENDPOINT_URL"] = "https://storage.yandexcloud.net"
os.environ["AWS_ACCESS_KEY_ID"] = os.getenv("S3_ACCESS_KEY")
os.environ["AWS_SECRET_ACCESS_KEY"] = os.getenv("S3_SECRET_KEY")

# Настройка MLflow tracking и registry URI
mlflow.set_tracking_uri("http://127.0.0.1:5000")
mlflow.set_registry_uri("http://127.0.0.1:5000")


pip_requirements = '../requirements.txt'
signature = mlflow.models.infer_signature(X_test, prediction)
input_example = X_test[:10]
metadata = {'model_type': 'monthly'}


experiment_id = mlflow.get_experiment_by_name(EXPERIMENT_NAME).experiment_id

with mlflow.start_run(run_name=RUN_NAME, experiment_id=experiment_id) as run:
    run_id = run.info.run_id
    
    # Логируем метрики
    mlflow.log_metrics(metrics)
    
    # Логируем и регистрируем модель
    model_info = mlflow.catboost.log_model(
        cb_model=model,
        artifact_path="models",
        registered_model_name=REGISTRY_MODEL_NAME,
        pip_requirements=pip_requirements,
        signature=signature,
        input_example=input_example,
        metadata=metadata,
        await_registration_for=60
    )

Достаньте модель из реестра и с её помощью сделайте предсказание на отложенной выборке.

loaded_model = mlflow.pyfunc.load_model(model_uri=model_info.model_uri)
model_predictions = loaded_model.predict(X_test)

assert model_predictions.dtype == int

print(model_predictions[:10])