In [2]:
import pandas as pd

In [3]:
# Первая функция: Замена выбросов медианой
def iqr_filter(df: pd.DataFrame, column: str, lower_bound=True, upper_bound=True, multp=3):
    """
    Гибкая функция для удаления выбросов с использованием настраиваемого порога IQR.
    """
    q1, q3 = np.percentile(df[column], [25, 75])
    iqr = (q3 - q1) * multp
    low_bound = q1 - iqr
    up_bound = q3 + iqr

    median = 0  # Значение медианы для замены
    outlines = 0  # Индексы строк, которые нужно заменить медианой

    # Находим выбросы и вычисляем медиану для ненавязчивых значений
    if lower_bound and upper_bound:
        outlines = df[(df[column] < low_bound) | (df[column] > up_bound)].index
        median = df[(df[column] >= low_bound) & (df[column] <= up_bound)][column].median()
    elif lower_bound:
        outlines = df[df[column] < low_bound].index
        median = df[df[column] >= low_bound][column].median()
    elif upper_bound:
        outlines = df[df[column] > up_bound].index
        median = df[df[column] <= up_bound][column].median()

    # Заменяем выбросы на медиану
    df.loc[outlines, column] = median

    return df


# Вторая функция: Ограничение и винзоризация для выбросов
def remove_outliers_and_handle_skewness(df, columns, threshold=1.5, cap_percentiles=(0.01, 0.99)):
    """
    Удаляет выбросы на основе асимметрии и применяет ограничение или преобразование.
    """
    df_cleaned = df.copy()

    for col in columns:
        # Обрабатываем асимметрию, применяя логарифмическое преобразование (если сильно асимметрично)
        if df[col].skew() > 1:
            df_cleaned[col] = np.log1p(df_cleaned[col])

        # Рассчитываем IQR для обнаружения выбросов
        Q1 = df_cleaned[col].quantile(0.25)
        Q3 = df_cleaned[col].quantile(0.75)
        IQR = Q3 - Q1

        # Рассчитываем нижний и верхний пределы для выбросов
        lower_bound = Q1 - threshold * IQR
        upper_bound = Q3 + threshold * IQR

        # Применяем ограничение для выбросов по IQR и заменяем выбросы на медиану или ограниченные значения
        df_cleaned[col] = df_cleaned[col].clip(lower=lower_bound, upper=upper_bound)

        # В качестве альтернативы, применяем винзоризацию, ограничивая значения на указанных процентилях (если нужно более мягкое ограничение)
        lower_cap = df_cleaned[col].quantile(cap_percentiles[0])
        upper_cap = df_cleaned[col].quantile(cap_percentiles[1])
        df_cleaned[col] = df_cleaned[col].clip(lower=lower_cap, upper=upper_cap)

    return df_cleaned


# Новая гибридная функция, которая решает, применять ли ограничение или замену медианой в зависимости от асимметрии и степени выбросов
def hybrid_outlier_handling(df: pd.DataFrame, columns: list, threshold=1.5, multp=3, cap_percentiles=(0.01, 0.99)):
    """
    Гибридная функция для выбора между ограничением или заменой выбросов медианой,
    в зависимости от асимметрии столбца и степени выбросов.
    """
    df_cleaned = df.copy()

    for col in columns:
        # Обрабатываем асимметрию до обработки выбросов
        if df[col].skew() > 1:  # Если сильно асимметрично, сначала применим преобразование
            df_cleaned[col] = np.log1p(df_cleaned[col])

        # Рассчитываем IQR для обнаружения выбросов
        Q1 = df_cleaned[col].quantile(0.25)
        Q3 = df_cleaned[col].quantile(0.75)
        IQR = Q3 - Q1
        lower_bound = Q1 - threshold * IQR
        upper_bound = Q3 + threshold * IQR

        # Проверяем, есть ли экстремальные выбросы (за пределами 3x IQR)
        if any(df_cleaned[col] < (Q1 - 3 * IQR)) or any(df_cleaned[col] > (Q3 + 3 * IQR)):
            # Если есть экстремальные выбросы, заменяем их на медиану с помощью iqr_filter
            df_cleaned = iqr_filter(df_cleaned, col, lower_bound=True, upper_bound=True, multp=multp)
        else:
            # В противном случае ограничиваем выбросы с помощью remove_outliers_and_handle_skewness
            df_cleaned = remove_outliers_and_handle_skewness(df_cleaned, [col], threshold=threshold, cap_percentiles=cap_percentiles)

    return df_cleaned

In [4]:
# Функция для отображения гистограмм для всех колонок
def draw_histograms_for_all_columns(df, highlight_columns=[]):
    columns_to_check = df.columns[1:-1]
    plt.figure(figsize=(20, 30))
    
    for i, col in enumerate(columns_to_check, 1):
        plt.subplot(6, 4, i)
        
        color = "orange" if col in highlight_columns else "blue"
        sns.histplot(df[col], kde=True, bins=30, color=color)
        plt.title(col)
        
    plt.tight_layout()
    plt.show()


# Функция для отображения boxplot для всех колонок
def draw_boxplots_for_all_columns(df, highlight_columns=[]):
    columns_to_check = df.columns[1:-1]
    plt.figure(figsize=(20, 30))
    
    for i, col in enumerate(columns_to_check, 1):
        plt.subplot(6, 4, i)
        
        boxprops = {"facecolor": "orange"} if col in highlight_columns else {}
        sns.boxplot(y=df[col], boxprops=boxprops)
        plt.title(col)
        
    plt.tight_layout()
    plt.show()