In [1]:
import os
import time
import warnings
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

from matplotlib import font_manager

# Настройки
warnings.filterwarnings("ignore", module="matplotlib.font_manager")
np.random.seed(42)

pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)
pd.set_option('display.max_colwidth', None)
pd.options.display.float_format = lambda x: ('%.12f' % x).rstrip('0').rstrip('.')

In [2]:
PROJECT_PATH = os.environ.get('PROJECT_PATH', '.')

RAW_DATA_PATH = os.path.join(PROJECT_PATH, 'data/raw/MK_RAW_DATA.csv')
PROCESSED_DATA_PATH = os.path.join(PROJECT_PATH, 'data/processed/MK_PROCESSED_DATA.csv')

In [None]:
# Укажем путь к файлам проекта:
# -> $PROJECT_PATH при запуске в Airflow
# -> иначе - текущая директория при локальном запуске
# path = os.environ.get('PROJECT_PATH', '.')

In [23]:
import pandas as pd
import numpy as np

# Функция загрузки сырых данных
def load_raw_data(path):
    return pd.read_csv(path, sep='\t')

# Предобработка 
def preprocess(df):
    df = df.dropna(subset=['Income'])
    return df

# Функция для удаления выбросов по IQR
def remove_outliers_iqr(data, column):
    Q1 = data[column].quantile(0.25)
    Q3 = data[column].quantile(0.75)
    IQR = Q3 - Q1
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR
    filtered_data = data[(data[column] >= lower_bound) & (data[column] <= upper_bound)].copy()
    return filtered_data

# Создание признаков
def engineer_features(df):
    df = df.copy()
    df['Age'] = 2025 - df['Year_Birth']
    df['HasChildren'] = (df[['Kidhome', 'Teenhome']].sum(axis=1) > 0).astype(int)
    df['MaritalFlag'] = df['Marital_Status'].isin(['Married', 'Together']).astype(int)
    
    mnt_cols = ['MntWines', 'MntFruits', 'MntMeatProducts', 'MntFishProducts', 'MntSweetProducts', 'MntGoldProds']
    df['TotalSpent'] = df[mnt_cols].sum(axis=1)
    
    df['Dt_Customer'] = pd.to_datetime(df['Dt_Customer'], dayfirst=True)
    df['Customer_Since_Days'] = (pd.Timestamp.today() - df['Dt_Customer']).dt.days
    df['Customer_Since_Years'] = (df['Customer_Since_Days'] / 365.25).astype(int)
    
    df['TotalPurchaseActivity'] = df[['NumCatalogPurchases', 'NumWebPurchases', 'NumStorePurchases', 'NumDealsPurchases']].sum(axis=1)
    
    df['TotalAcceptedCmp'] = df[['AcceptedCmp1', 'AcceptedCmp3', 'AcceptedCmp5', 'AcceptedCmp2', 'AcceptedCmp4']].sum(axis=1)
    
    higher_edu = ['Graduation', 'Master', 'PhD']
    df['HigherEducation'] = df['Education'].isin(higher_edu).astype(int)
    
    # Создаем IncomeGroup для последующего признака IsHighIncome
    df['IncomeGroup'] = pd.qcut(df['Income'], q=4, labels=['Q1', 'Q2', 'Q3', 'Q4'])
    
    # Создаём бинарный признак богатых покупателей
    df['IsHighIncome'] = (df['IncomeGroup'] == 'Q4').astype(int)
    
    # Логарифмируем Income и TotalSpent для модели
    df['IncomeLog'] = np.log1p(df['Income'])
    df['TotalSpentLog'] = np.log1p(df['TotalSpent'])
    
    # Новый признак — откликался ли хоть на одну кампанию
    df['AcceptedAnyCmp'] = (df['TotalAcceptedCmp'] > 0).astype(int)
    
    return df

# Финальная очистка
def final_cleaning_step(df):
    df = df.copy()
    
    # Удаляем излишние столбцы
    cols_to_drop = [
        'ID', 'Year_Birth', 'Education', 'Marital_Status', 'Kidhome', 'Teenhome',
        'MntWines', 'MntFruits', 'MntMeatProducts', 'MntFishProducts', 'MntSweetProducts',
        'MntGoldProds', 'NumDealsPurchases', 'NumWebPurchases', 'NumCatalogPurchases', 'NumStorePurchases',
        'AcceptedCmp3', 'AcceptedCmp4', 'AcceptedCmp5', 'AcceptedCmp1', 'AcceptedCmp2',
        'Z_CostContact', 'Z_Revenue', 'Dt_Customer', 'Customer_Since_Days', 'Customer_Since_Years',
        'Complain', 'IncomeGroup', 'TotalAcceptedCmp', 'TotalPurchaseActivity', 'Income', 'TotalSpent'
    ]
    
    df.drop(columns=cols_to_drop, inplace=True, errors='ignore')
    
    # Удаляем дубликаты
    df.drop_duplicates(inplace=True)
    
    # Удаляем выбросы по некоторым признакам
    for col in ['Age', 'TotalSpentLog', 'IncomeLog', 'TotalPurchaseActivity', 'NumWebVisitsMonth']:
        if col in df.columns:
            original_len = len(df)
            df = remove_outliers_iqr(df, col)
            removed = original_len - len(df)
            print(f"Удалено {removed} выбросов из '{col}'")
    
    return df

In [24]:
def run_data_preparation_pipeline(raw_data_path):
    raw = load_raw_data(raw_data_path)
    clean = preprocess(raw)
    features = engineer_features(clean)
    final_df = final_cleaning_step(features)
    return final_df

In [21]:
def run_model_training_pipeline(df):
    from sklearn.linear_model import LogisticRegression, RidgeClassifier
    from sklearn.svm import SVC
    from catboost import CatBoostClassifier

    # Определяем признаки
    target_column = "Response" 
    feature_columns = [
        'Recency', 'NumWebVisitsMonth', 'Age', 'HasChildren',
        'MaritalFlag', 'HigherEducation', 'IsHighIncome',
        'AcceptedAnyCmp', 'TotalSpentLog'
    ]
    numeric_columns = ['Recency', 'NumWebVisitsMonth', 'Age', 'TotalSpentLog']

    X = df[feature_columns]
    y = df[target_column]

    # Разделение на train/val/test
    X_trainval, X_test, y_trainval, y_test = train_test_split(X, y, test_size=0.15, stratify=y, random_state=42)
    X_train, X_val, y_train, y_val = train_test_split(X_trainval, y_trainval, test_size=0.1765, stratify=y_trainval, random_state=42)
    # 0.1765 ≈ 15% из 85%, чтобы получить 70/15/15 split

    # Препроцессор
    preprocessor = ColumnTransformer(
        transformers=[
            ('num', MinMaxScaler(), numeric_columns)
        ],
        remainder='passthrough'
    )

    # Модели и параметры
    models_with_params = {
        "CatBoost": (
            Pipeline([
                ('preprocessor', preprocessor),
                ('classifier', CatBoostClassifier(random_state=42, verbose=0, class_weights=[1, 3]))
            ]),
            {
                'classifier__iterations': [50, 100],
                'classifier__learning_rate': [0.01, 0.05],
                'classifier__depth': [3, 4],
                'classifier__l2_leaf_reg': [10, 20, 30]
            }
        ),
        "Ridge Classifier": (
            Pipeline([
                ('preprocessor', preprocessor),
                ('classifier', RidgeClassifier(random_state=42))
            ]),
            {
                'classifier__alpha': [0.1, 1.0, 10.0],
                'classifier__class_weight': [None, 'balanced']
            }
        ),
        "Logistic Regression": (
            Pipeline([
                ('preprocessor', preprocessor),
                ('classifier', LogisticRegression(solver="liblinear", random_state=42))
            ]),
            {
                'classifier__C': [0.1, 1.0, 10.0],
                'classifier__penalty': ['l1', 'l2'],
                'classifier__class_weight': [None, 'balanced']
            }
        ),
        "SVM - Linear Kernel": (
            Pipeline([
                ('preprocessor', preprocessor),
                ('classifier', SVC(kernel="linear", probability=True, random_state=42))
            ]),
            {
                'classifier__C': [0.1, 1.0, 10.0],
                'classifier__class_weight': [None, 'balanced']
            }
        )
    }

    # Обучение
    trained_models = {}
    for name, (pipeline, params) in models_with_params.items():
        print(f"Обучение модели: {name}")
        clf = GridSearchCV(pipeline, params, cv=3, scoring='f1', verbose=0, n_jobs=-1)
        clf.fit(X_train, y_train)
        score = clf.score(X_val, y_val)
        print(f"{name} F1 на валидации: {score:.4f}")
        trained_models[name] = clf

    return trained_models, preprocessor