In [1]:
# делаем import необходимых библиотек
import os
import sys

import mlflow


* 'schema_extra' has been renamed to 'json_schema_extra'


In [2]:
# !{sys.executable} -m pip install psycopg
# !{sys.executable} -m pip install 'psycopg[binary]'


In [3]:
import psycopg
import pandas as pd


### Запуск с локальной записью артефактов и метрик

In [1]:


# устанавливаем локальное хранилище для наших экспериментов
# хранилище должно быть такое же, как и при запуске сервиса
mlflow.set_tracking_uri('file:./mlflow_experiments_store')

# получаем id эксеримента, который создаётся по умолчанию
# эксперимент по умолчанию называется Default
experiment_id = mlflow.get_experiment_by_name("Default").experiment_id

# залогируем тестовую метрику и артефакт
with mlflow.start_run(run_name='Default', experiment_id=experiment_id) as run:
    run_id = run.info.run_id
    mlflow.log_metric("test_metric", 0)
    mlflow.log_artifact("test_artifact.txt", "test_artifact")

print(f"Run id запуска: {run_id}") 


* 'schema_extra' has been renamed to 'json_schema_extra'


Run id запуска: a874309545674637a6f48c6858ff8a7c


In [2]:
!ls mlflow_experiments_store/0/{run_id}/metrics

test_metric


In [3]:
!cat mlflow_experiments_store/0/{run_id}/artifacts/test_artifact/test_artifact.txt

test_artifact

### Запуск с БД sqlite

In [4]:
mlflow.set_tracking_uri('http://0.0.0.0:5000')

# получаем id эксперимента, который создаётся по умолчанию
# эксперимент по умолчанию называется Default
experiment_id = mlflow.get_experiment_by_name("Default").experiment_id

with mlflow.start_run(run_name="Default", experiment_id=experiment_id) as run:
    run_id = run.info.run_id
    
    mlflow.log_metric("test_metric_sqlite", 0)
    mlflow.log_artifact("test_artifact.txt", "test_artifact_sqlite")

# проверим, что наши данные сохранились в локальной папке, а также создалась база данных SQLite
assert os.path.exists("mlflow_experiments_store_sqlite")
assert os.path.exists("mydb.sqlite") 

### Запуск с удаленным хранилищем артефактов

In [3]:
# определяем основные credentials, которые нужны для подключения к MLflow
# важно, что credentials мы передаём для себя как пользователей Tracking Service
# у вас должен быть доступ к бакету, в который вы будете складывать артефакты
os.environ["MLFLOW_S3_ENDPOINT_URL"] = "https://storage.yandexcloud.net" #endpoint бакета от YandexCloud
os.environ["AWS_ACCESS_KEY_ID"] = os.getenv("AWS_ACCESS_KEY_ID") # получаем id ключа бакета, к которому подключён MLFlow, из .env
os.environ["AWS_SECRET_ACCESS_KEY"] = os.getenv("AWS_SECRET_ACCESS_KEY") # получаем ключ бакета, к которому подключён MLFlow, из .env

# определяем глобальные переменные
# поднимаем MLflow локально
TRACKING_SERVER_HOST = "127.0.0.1"
TRACKING_SERVER_PORT = 5000

YOUR_NAME = "alexdem" # введите своё имя для создания уникального эксперимента
assert YOUR_NAME, "введите своё имя в переменной YOUR_NAME для создания уникального эксперимента"

# название тестового эксперимента и запуска (run) внутри него
EXPERIMENT_NAME = f"test_connection_experiment_{YOUR_NAME}"
RUN_NAME = "test_connection_run"

# тестовые данные
METRIC_NAME = "test_metric"
METRIC_VALUE = 0

# устанавливаем host, который будет отслеживать наши эксперименты
mlflow.set_tracking_uri(f"http://{TRACKING_SERVER_HOST}:{TRACKING_SERVER_PORT}")

# создаём тестовый эксперимент и записываем в него тестовую информацию
experiment_id = mlflow.create_experiment(EXPERIMENT_NAME)
with mlflow.start_run(run_name=RUN_NAME, experiment_id=experiment_id) as run:
    run_id = run.info.run_id
    
    mlflow.log_metric(METRIC_NAME, METRIC_VALUE) 

In [4]:
##### 4. Проверяем себя, что в MLflow:
# - создался `experiment` с нашим именем
# - внутри эксперимента появился запуск `run`
# - внутри `run` записалась наша тестовая `metric`
experiment = mlflow.get_experiment_by_name(EXPERIMENT_NAME)
run = mlflow.get_run(run_id)

assert "active" == experiment.lifecycle_stage
assert mlflow.get_run(run_id)
assert METRIC_VALUE == run.data.metrics[METRIC_NAME] 

### Выгрузка данных

In [4]:
import psycopg2 as psycopg
import pandas as pd

connection = {"sslmode": "require", "target_session_attrs": "read-write"}
postgres_credentials = {
    "host": os.getenv("DESTINATION_HOST"), 
    "port": "6432",
    "dbname": os.getenv("DESTINATION_DB"),
    "user": os.getenv("DESTINATION_USER"),
    "password": os.getenv("DESTINATION_PASSWORD"),
}
assert all([var_value != "" for var_value in list(postgres_credentials.values())])

connection.update(postgres_credentials)

# определяем название таблицы, в которой хранятся наши данные
TABLE_NAME = "users_churn"


# эта конструкция создаёт контекстное управление для соединения с базой данных 
# оператор with гарантирует, что соединение будет корректно закрыто после выполнения всех операций с базой данных
# причём закрыто оно будет даже в случае ошибки при работе с базой данных
# это нужно, чтобы не допустить так называемую "утечку памяти"
with psycopg.connect(**connection) as conn:

# создаём объект курсора для выполнения запросов к базе данных 
# с помощью метода execute() выполняется SQL-запрос для выборки данных из таблицы TABLE_NAME
    with conn.cursor() as cur:
        cur.execute(f"SELECT * FROM {TABLE_NAME}")
				
				# извлекаем все строки, полученные в результате выполнения запроса
        data = cur.fetchall()

				# получаем список имён столбцов из объекта курсора
        columns = [col[0] for col in cur.description]

# создаём объект DataFrame из полученных данных и имён столбцов 
# это позволяет удобно работать с данными в Python с использованием библиотеки Pandas
df = pd.DataFrame(data, columns=columns)

print(f"Размер нашей таблицы: {df.shape[0]} строк; {df.shape[1]} столбцов")

Размер нашей таблицы: 7043 строк; 22 столбцов


In [5]:
# определяем основные credentials, которые нужны для подключения к MLflow
# важно, что credentials мы передаём для себя как пользователей Tracking Service
# у вас должен быть доступ к бакету, в который вы будете складывать артефакты
os.environ["MLFLOW_S3_ENDPOINT_URL"] = "https://storage.yandexcloud.net" #endpoint бакета от YandexCloud
os.environ["AWS_ACCESS_KEY_ID"] = os.getenv("AWS_ACCESS_KEY_ID") # получаем id ключа бакета, к которому подключён MLFlow, из .env
os.environ["AWS_SECRET_ACCESS_KEY"] = os.getenv("AWS_SECRET_ACCESS_KEY") # получаем ключ бакета, к которому подключён MLFlow, из .env

# определяем глобальные переменные
# поднимаем MLflow локально
TRACKING_SERVER_HOST = "127.0.0.1"
TRACKING_SERVER_PORT = 5000

In [7]:
mlflow.get_experiment_by_name(EXPERIMENT_NAME)

In [8]:
EXPERIMENT_NAME = "churn_task_alexdem"
RUN_NAME = "data_check"

experiment_id = mlflow.create_experiment(EXPERIMENT_NAME) 

In [None]:
stats = {"mae": 2, "r2": 0.82}
artifact_path = "dataframe"

In [8]:
with open("columns.txt", "w") as f:
    f.write(",".join(df.columns))

    

In [10]:
df.to_csv("users_churn.csv", index=False)

In [11]:
# задаём название эксперимента и имя запуска для логирования в MLflow

EXPERIMENT_NAME = "churn_task_alexdem"
RUN_NAME = "data_check"

# создаём новый эксперимент в MLflow с указанным названием 
# если эксперимент с таким именем уже существует, 
# MLflow возвращает идентификатор существующего эксперимента
experiment_id = mlflow.get_experiment_by_name(EXPERIMENT_NAME).experiment_id
if not experiment_id:
    experiment_id = mlflow.create_experiment(EXPERIMENT_NAME) 

with mlflow.start_run(run_name=RUN_NAME, experiment_id=experiment_id) as run:
    # получаем уникальный идентификатор запуска эксперимента
    run_id = run.info.run_id
    
    # логируем метрики эксперимента
    # предполагается, что переменная stats содержит словарь с метриками,
    # где ключи — это названия метрик, а значения — числовые значения метрик
    mlflow.log_metrics(stats)
    
    # логируем файлы как артефакты эксперимента — 'columns.txt' и 'users_churn.csv'
    mlflow.log_artifact("columns.txt", artifact_path)
    mlflow.log_artifact("users_churn.csv", artifact_path)


experiment = mlflow.get_experiment_by_name(EXPERIMENT_NAME)
# получаем данные о запуске эксперимента по его уникальному идентификатору
run = mlflow.get_run(run_id)


# проверяем, что статус запуска эксперимента изменён на 'FINISHED'
# это утверждение (assert) можно использовать для автоматической проверки того, 
# что эксперимент был завершён успешно
assert run.info.status == "FINISHED"

# удаляем файлы 'columns.txt' и 'users_churn.csv' из файловой системы,
# чтобы очистить рабочую среду после логирования артефактов
os.remove("columns.txt")
os.remove("users_churn.csv")

AttributeError: 'NoneType' object has no attribute 'experiment_id'