In [2]:
import psycopg2 as psycopg
import os
from dotenv import load_dotenv
import pandas as pd

load_dotenv()

# определим название таблицы, в которой хранятся наши данные.
TABLE_NAME = "users_churn"

connection = {"sslmode": "require", "target_session_attrs": "read-write"}
postgres_credentials = {
    "host": os.environ.get('DB_DESTINATION_HOST'), 
    "port": os.environ.get('DB_DESTINATION_PORT'),
    "dbname": os.environ.get('DB_DESTINATION_NAME'),
    "user": os.environ.get('DB_DESTINATION_USER'),
    "password": os.environ.get('DB_DESTINATION_PASSWORD'),
}
assert all([var_value != "" for var_value in list(postgres_credentials.values())])

connection.update(postgres_credentials)

In [3]:
# эта конструкция создаёт контекстное управление для соединения с базой данных 
# оператор with гарантирует, что соединение будет корректно закрыто после выполнения всех операций 
# закрыто оно будет даже в случае ошибки, чтобы не допустить "утечку памяти"
with psycopg.connect(**connection) as conn:

# создаёт объект курсора для выполнения запросов к базе данных
# с помощью метода execute() выполняется SQL-запрос для выборки данных из таблицы TABLE_NAME
    with conn.cursor() as cur:
        cur.execute(f"SELECT * FROM {TABLE_NAME}")
                
                # извлекаем все строки, полученные в результате выполнения запроса
        data = cur.fetchall()

                # получает список имён столбцов из объекта курсора
        columns = [col[0] for col in cur.description]

# создаёт объект DataFrame из полученных данных и имён столбцов. 
# это позволяет удобно работать с данными в Python, используя библиотеку Pandas.
df = pd.DataFrame(data, columns=columns)

In [4]:
# 1. Название колонок вашего датафрейма запишите в текстовый файл
with open("columns.txt", "w", encoding="utf-8") as fio:
    i=1
    for col in df.columns:
        if i == df.columns.shape[0]:
            fio.write(col)
        else: 
            if col != 'id':
                fio.write(col+',')
        i+=1
#df.columns.to_csv('columns1.txt', index=False)
          

In [5]:
# второй вариант
import csv  
#columns1 = [columns]
# 1. Название колонок вашего датафрейма запишите в текстовый файл
with open("columns4.txt", "w", encoding="utf-8") as fio:
    writer = csv.writer(fio) 
    # Записываем все данные за один раз  
    writer.writerows([df.columns])

In [6]:
counts_columns = [
    "type", "paperless_billing", "internet_service", "online_security", "online_backup", "device_protection",
    "tech_support", "streaming_tv", "streaming_movies", "gender", "senior_citizen", "partner", "dependents",
    "multiple_lines", "target"
]

stats = {}
for col in counts_columns:
		# посчитайте уникальные значения для колонок, где немного уникальных значений (переменная counts_columns)
    column_stat = df[col].value_counts().to_dict()
    column_stat = {f"{col}_{key}": value for key, value in column_stat.items()}
    stats.update(column_stat)

stats["data_length"] = df.shape[0]
stats["monthly_charges_min"] = df["monthly_charges"].min()
stats["monthly_charges_max"] = df["monthly_charges"].max()
stats["monthly_charges_mean"] = df["monthly_charges"].mean()
stats["monthly_charges_median"] = df["monthly_charges"].median()
stats["total_charges_min"] = df["total_charges"].min()
stats["total_charges_max"] = df["total_charges"].max()
stats["total_charges_mean"] = df["total_charges"].mean()
stats["total_charges_median"] = df["total_charges"].median()
stats["unique_customers_number"] = df["customer_id"].nunique()
stats["end_date_nan"] = df["end_date"].isna().sum()

In [7]:
stats

{'type_Month-to-month': 3875,
 'type_Two year': 1695,
 'type_One year': 1473,
 'paperless_billing_Yes': 4171,
 'paperless_billing_No': 2872,
 'internet_service_Fiber optic': 3096,
 'internet_service_DSL': 2421,
 'online_security_No': 3498,
 'online_security_Yes': 2019,
 'online_backup_No': 3088,
 'online_backup_Yes': 2429,
 'device_protection_No': 3095,
 'device_protection_Yes': 2422,
 'tech_support_No': 3473,
 'tech_support_Yes': 2044,
 'streaming_tv_No': 2810,
 'streaming_tv_Yes': 2707,
 'streaming_movies_No': 2785,
 'streaming_movies_Yes': 2732,
 'gender_Male': 3555,
 'gender_Female': 3488,
 'senior_citizen_0': 5901,
 'senior_citizen_1': 1142,
 'partner_No': 3641,
 'partner_Yes': 3402,
 'dependents_No': 4933,
 'dependents_Yes': 2110,
 'multiple_lines_No': 3390,
 'multiple_lines_Yes': 2971,
 'target_0': 5174,
 'target_1': 1869,
 'data_length': 7043,
 'monthly_charges_min': 18.25,
 'monthly_charges_max': 118.75,
 'monthly_charges_mean': 64.76169246059918,
 'monthly_charges_median': 70

In [43]:
import os

import mlflow

from dotenv import load_dotenv
load_dotenv()

# определяем основные credentials, которые нужны для подключения к MLflow
# важно, что credentials мы передаём для себя как пользователей Tracking Service
# у вас должен быть доступ к бакету, в который вы будете складывать артефакты
os.environ["MLFLOW_S3_ENDPOINT_URL"] = "https://storage.yandexcloud.net" #endpoint бакета от YandexCloud
os.environ["AWS_ACCESS_KEY_ID"] = os.getenv("AWS_ACCESS_KEY_ID") # получаем id ключа бакета, к которому подключён MLFlow, из .env
os.environ["AWS_SECRET_ACCESS_KEY"] = os.getenv("AWS_SECRET_ACCESS_KEY") # получаем ключ бакета, к которому подключён MLFlow, из .env

# определяем глобальные переменные
# поднимаем MLflow локально
TRACKING_SERVER_HOST = "127.0.0.1"
TRACKING_SERVER_PORT = 5000

#YOUR_NAME = "Less_8" # введите своё имя для создания уникального эксперимента
#assert YOUR_NAME, "введите своё имя в переменной YOUR_NAME для создания уникального эксперимента"

# название эксперимента и запуска (run) внутри него
#EXPERIMENT_NAME = f"test_connection_experiment_{YOUR_NAME}"
EXPERIMENT_NAME = "Lesson_8"
RUN_NAME = "users_churn_df_statistics"

# данные для логирования
#METRIC_NAME = stats
#METRIC_VALUE = 0

# устанавливаем host, который будет отслеживать наши эксперименты
mlflow.set_tracking_uri(f"http://{TRACKING_SERVER_HOST}:{TRACKING_SERVER_PORT}")

# создаём эксперимент и записываем в него данные
experiment_id = mlflow.get_experiment_by_name(EXPERIMENT_NAME).experiment_id
if not experiment_id:
    experiment_id = mlflow.create_experiment(EXPERIMENT_NAME)

with mlflow.start_run(run_name=RUN_NAME, experiment_id=experiment_id) as run:
    run_id = run.info.run_id
    mlflow.log_artifact('columns.txt', 'dataframe')

##### 4. Проверяем себя, что в MLflow:
# - создался `experiment` с нашим именем
# - внутри эксперимента появился запуск `run`
# - внутри `run` записалась наша тестовая `metric`
experiment = mlflow.get_experiment_by_name(EXPERIMENT_NAME)
run = mlflow.get_run(run_id)

assert "active" == experiment.lifecycle_stage
assert mlflow.get_run(run_id)
assert run.info.status == 'FINISHED'
#assert METRIC_VALUE == run.data.metrics[METRIC_NAME]
 

In [None]:
# задаём название эксперимента и имя запуска для логирования в MLflow
EXPERIMENT_NAME = "churn_fio"
RUN_NAME = "data_check"

# создаём новый эксперимент в MLflow с указанным названием
# если эксперимент с таким именем уже существует, 
# MLflow возвращает идентификатор существующего эксперимента
experiment_id = mlflow.create_experiment(EXPERIMENT_NAME)

with mlflow.start_run(run_name=RUN_NAME, experiment_id=experiment_id) as run:
    # получаем уникальный идентификатор запуска эксперимента
    run_id = run.info.run_id
    
    # логируем метрики эксперимента
    # предполагается, что переменная stats содержит словарь с метриками,
    # объявлять переменную stats не надо,
    # где ключи — это названия метрик, а значения — числовые значения метрик
    mlflow.log_metrics(stats)
    
    # логируем файлы как артефакты эксперимента — 'columns.txt' и 'users_churn.csv'
    mlflow.log_artifact('columns.txt', 'dataframe')
    mlflow.log_artifact('users_churn.csv', 'dataframe')


experiment = mlflow.get_experiment_by_name(EXPERIMENT_NAME)
# получаем данные о запуске эксперимента по его уникальному идентификатору
run = mlflow.get_run(run_id)


# проверяем, что статус запуска эксперимента изменён на 'FINISHED'
# это утверждение (assert) можно использовать для автоматической проверки того, 
# что эксперимент был завершён успешно
#assert "active" == experiment.lifecycle_stage
#assert mlflow.get_run(run_id)
assert run.info.status == 'FINISHED'

# удаляем файлы 'columns.txt' и 'users_churn.csv' из файловой системы,
# чтобы очистить рабочую среду после логирования артефактов
os.remove('columns.txt')
os.remove('users_churn.csv')