### **PIPELINE ДЛЯ BASELINE ДАННЫХ (TRAIN.CSV)**  
---

Данный ноутбук обрабатывает train.csv и выгружает целевую таблицу inference_baseline с true_label и prediction_score в Платформенную БД.

In [None]:
pip install psycopg2-binary=="2.9.9" boto3=="1.35.0" pyarrow fastparquet

In [None]:
import numpy as np
import pandas as pd
import boto3
import tempfile
import os
from sqlalchemy import create_engine
from sklearn.preprocessing import LabelEncoder
import boto3
import tempfile
import os
from sqlalchemy import create_engine
from sklearn.preprocessing import LabelEncoder

In [None]:
def setup_s3_client():
    project_s3 = boto3.resource(
        "s3",
        endpoint_url=os.environ.get("FEAST_S3_ENDPOINT_URL"),
        aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID"), 
        aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY")
    )
    
    bucket_name = os.environ.get("S3_BUCKET")
    print(f"Бакет: {bucket_name}")
    
    return project_s3, bucket_name

In [None]:
def create_db_connection(password):
    DATABASE_USER = "vectoradmin"
    DATABASE_PASSWORD = password
    DATABASE_HOST = os.environ.get("PGVECTOR_HOST_NAME")
    DATABASE_PORT = 5432
    DATABASE_DBNAME = os.environ.get("PGVECTOR_DB_NAME")
    
    connection_string = f"postgresql://{DATABASE_USER}:{DATABASE_PASSWORD}@{DATABASE_HOST}:{DATABASE_PORT}/{DATABASE_DBNAME}"
    
    engine = create_engine(connection_string)
    
    # Проверка подключения
    try:
        with engine.connect() as conn:
            print("Подключение к БД установлено")
        return engine
    except Exception as e:
        print(f"Ошибка подключения к БД: {e}")
        raise

In [None]:
def upload_df_to_s3_parquet(project_s3, df, bucket_name, s3_key):
    """
    Загружает DataFrame в S3 как Parquet файл
    
    Args:
        project_s3: S3 клиент
        df: DataFrame для сохранения
        bucket_name: Имя S3 бакета
        s3_key: Ключ (путь) для сохранения файла в S3
    """
    with tempfile.NamedTemporaryFile(suffix='.parquet', delete=False) as tmp_file:
        df.to_parquet(tmp_file.name, index=False)
        
        project_s3.Bucket(bucket_name).upload_file(tmp_file.name, s3_key)

        os.unlink(tmp_file.name)

In [None]:
def find_and_read_first_parquet(project_s3, bucket_name, folder_path=""):
    """
    Находит и читает первый parquet файл в указанной папке S3
    
    Args:
        project_s3: S3 клиент
        bucket_name: Имя S3 бакета
        folder_path: Путь к папке в S3
        
    Returns:
        DataFrame с данными из первого найденного parquet файла
    """
    if folder_path and not folder_path.endswith('/'):
        folder_path += '/'
  
    objects = list(project_s3.Bucket(bucket_name).objects.filter(Prefix=folder_path))
    
    parquet_files = [obj for obj in objects if obj.key.endswith('.parquet')]
    
    if not parquet_files:
        raise FileNotFoundError(f"Parquet файлы не найдены в s3://{bucket_name}/{folder_path}")
    
    # Берем первый найденный parquet файл
    first_parquet = parquet_files[0]
    s3_key = first_parquet.key
    
    print(f"Найден файл: {s3_key}")
    
    with tempfile.NamedTemporaryFile(suffix='.parquet', delete=False) as tmp_file:
        project_s3.Bucket(bucket_name).download_file(s3_key, tmp_file.name)
        df = pd.read_parquet(tmp_file.name)
        os.unlink(tmp_file.name)
    return df

In [None]:
def process_train_data(project_s3, train_csv_path, bucket_name):
    """
    Обрабатывает train.csv файл для создания baseline данных
    
    Шаги:
    1. Загрузка данных из CSV
    2. Приведение признаков к целевому виду
    3. Сохранение метаданных для последующего объединения
    4. Сохранение features в S3 для батч-инференса
    
    Args:
        project_s3: S3 клиент
        train_csv_path: Путь к train.csv файлу
        bucket_name: Имя S3 бакета
    """
    
    df = pd.read_csv(train_csv_path)
    
    print("Исходные данные train:")
    print(f"Размер: {df.shape}")
    print(f"Колонки: {list(df.columns)}")
    
    df['app_date'] = pd.to_datetime(df['app_date'], format='%d%b%Y')
    metadata_cols = df[['client_id', 'app_date', 'default']].copy()
    
    # Применяем преобразования

    df['age'] = np.log(df['age'] + 1)
    df['decline_app_cnt'] = np.log(df['decline_app_cnt'] + 1)
    df['bki_request_cnt'] = np.log(df['bki_request_cnt'] + 1)
    df['income'] = np.log(df['income'] + 1)

    df['education'] = df['education'].fillna('SCH')
    
    start = df.app_date.min()
    df['days'] = (df.app_date - start).dt.days.astype('int')

    bin_cols = ['sex', 'car', 'car_type', 'good_work', 'foreign_passport']
    label_encoder = LabelEncoder()
    for column in bin_cols:
        df[column] = label_encoder.fit_transform(df[column])

    cat_cols = ['education', 'region_rating', 'home_address', 'work_address', 'sna', 'first_time']
    df = pd.get_dummies(df, prefix=cat_cols, columns=cat_cols, dtype=int)
    
    # Удаляем мета-колонки для батч-версии
    columns_to_drop = ['app_date', 'client_id', 'default']
    df_for_batch = df.drop(columns_to_drop, axis=1, errors='ignore')
    
    print("После преобразований:")
    print(f"Для batch инференса: {df_for_batch.shape}")
    print(f"Колонки для batch: {list(df_for_batch.columns)}")
    
    upload_df_to_s3_parquet(project_s3, df_for_batch, bucket_name, 'baseline_features.parquet')
    
    print(f"Train данные обработаны. Batch features сохранены в S3")
    
    return df_for_batch, metadata_cols

In [None]:
def load_and_combine_baseline(project_s3, bucket_name, predictions_s3_key, original_metadata):
    """
    Загружает baseline предсказания из S3 и объединяет с оригинальными метаданными
    
    Args:
        project_s3: S3 клиент
        bucket_name: Имя S3 бакета
        predictions_s3_key: Путь к папке с предсказаниями в S3
        original_metadata: DataFrame с метаданными (client_id, app_date, default)
        
    """
    df_batch = find_and_read_first_parquet(project_s3, bucket_name, predictions_s3_key)

    df_batch_reset = df_batch.reset_index(drop=True)
    metadata_reset = original_metadata.reset_index(drop=True)
    
    df_combined = pd.concat([metadata_reset, df_batch_reset], axis=1)
    
    # Переименовываем колонку default в true_label
    if 'default' in df_combined.columns:
        df_combined = df_combined.rename(columns={'default': 'true_label'})
        print("Колонка 'default' переименована в 'true_label'")
    else:
        print("Колонка 'default' не найдена для переименования")

    df_combined['app_date'] = pd.to_datetime(df_combined['app_date'])
    df_combined['app_date'] = df_combined['app_date'].apply(lambda x: x.replace(year=2024))
    
    print(f"Данные объединены: {df_combined.shape}")
    print(f"Колонки итоговые: {list(df_combined.columns)}")
    
    return df_combined

In [None]:
from sqlalchemy import text

def load_predictions_to_db(df, table_name, vectoradmin_password):
    """
    Загружает DataFrame с предсказаниями в PostgreSQL базу данных
    
    Args:
        df: DataFrame для загрузки
        table_name: Имя таблицы в БД
        vectoradmin_password: Пароль пользователя БД
    """
    db_connection = create_db_connection(vectoradmin_password)
    
    df.to_sql(table_name, db_connection, if_exists='replace', index=False)
    
    print(f"Создана/обновлена таблица '{table_name}' с {len(df)} записями")
    
    # Проверяем, что таблица создана и содержит данные
    with db_connection.connect() as conn:
        result = conn.execute(text(f"SELECT COUNT(*) FROM {table_name}"))
        count = result.scalar()
        print(f"Проверка: в таблице {table_name} {count} записей")
        
        result = conn.execute(text(f"SELECT * FROM {table_name} LIMIT 3"))
        print(f"Пример записей:")
        for row in result:
            print(row)

### **MAIN PIPELINE**
---

Основной пайплайн для обработки **baseline** данных<br>
Выполнять последовательно все шаги:<br>
1. Настройка подключений<br>
2. Обработка train.csv<br>
3. **(Ручной шаг) Запуск батч-сервиса на baseline_features.parquet**<br>
4. Загрузка и обработка предсказаний<br>
5. **Перед следующим шагом необходимо посмотреть пароль для бд пгвектора и ввести параметр**<br>
6. Загрузка в БД

### **Настройка подключений**

In [None]:
project_s3, bucket_name = setup_s3_client()

### **Обработка train данных**

In [None]:
baseline_data, metadata_cols = process_train_data(
        project_s3, 
        "data/train.csv", 
        bucket_name
    )
print("РУЧНОЙ ШАГ: ЗАПУСТИТЕ БАТЧ-СЕРВИС")
print("Используйте baseline_features.parquet из S3 для инференса")
print(f"INPUT_DATA: s3a://{bucket_name}/baseline_features.parquet")
print(f"OUTPUT_DIRECTORY: s3a://{bucket_name}/inference_baseline")

### **Загрузка baseline предсказаний**

In [None]:
 df_combined = load_and_combine_baseline(
        project_s3, 
        bucket_name, 
        "inference_baseline/", 
        metadata_cols
    )

In [None]:
df_combined

### **Загрузка в базу данных**

In [None]:
load_predictions_to_db(df_combined, "inference_baseline", "vectoradmin_password")