### 🔧 Utility Functions
Все вспомогательные функции для загрузки, метрик, визуализаций, фильтров и т.д.

In [7]:
# Библиотеки
import numpy as np
import pandas as pd

from config import TEST_SIZE, RANDOM_STATE
from config import PROCESSED_CSV_PATH, RESULTS_DIR

from shapely.geometry import Polygon
from shapely.ops import unary_union
from shapely.errors import TopologicalError

from IPython.display import HTML
from IPython.display import display


In [8]:
# Функции разделение данных на обучающую и тестовую выборки
from sklearn.model_selection import train_test_split

def split_data(df, x_features, y_features, Keep_Numbers=True, test_size=TEST_SIZE):

    if Keep_Numbers:
        # Получаем список уникальных номеров участков
        unique_numbers = df["number"].unique()

        # Делим их на train/test
        train_numbers, test_numbers = train_test_split(unique_numbers, test_size=test_size, random_state=RANDOM_STATE)

        # Отбираем строки по номеру
        train_mask = df["number"].isin(train_numbers)
        test_mask = df["number"].isin(test_numbers)

        X_train_full = df[train_mask].copy()
        X_test_full = df[test_mask].copy()
    else:
        X_train_full, X_test_full = train_test_split(df, test_size=test_size, random_state=RANDOM_STATE)
        
    y_train = X_train_full[y_features]
    y_test = X_test_full[y_features]
    
    X_train = X_train_full[x_features]
    X_test = X_test_full[x_features]

    return X_train, X_test, y_train, y_test, X_train_full, X_test_full



In [9]:
# сохранение результатов в файл
def save_model_predictions(y_true, y_pred, model_name, filename=None, X_original=None, lon_factor=None, lat_factor=None):
    if isinstance(y_true, pd.DataFrame):
        y_true = y_true.copy()
    else:
        raise ValueError("y_true должен быть DataFrame с названиями колонок")

    col_1, col_2 = y_true.columns[:2]

    df = pd.DataFrame({
        f"true_{col_1}": y_true[col_1].values,
        f"true_{col_2}": y_true[col_2].values,
        f"pred_{col_1}": y_pred[:, 0],
        f"pred_{col_2}": y_pred[:, 1],
    })

    if X_original is not None:
        for col in ["sc63_x", "sc63_y", "sc63_centroid_x", "sc63_centroid_y", "number", "vertex_id"]:
            if col in X_original.columns:
                df[col] = X_original[col].values
    
    # если в X_original есть baseline координаты — восстанавливаем центр
    if X_original is not None and "wgs84_centroid_baseline_lon" in X_original.columns and "wgs84_centroid_baseline_lat" in X_original.columns:
        df["pred_lon"] = X_original["wgs84_centroid_baseline_lon"].values - df[f"pred_{col_1}"]
        df["pred_lat"] = X_original["wgs84_centroid_baseline_lat"].values - df[f"pred_{col_2}"]

    print(f"✅ {model_name} saving corrected predictions")

    df[f"err_{col_1}"] = df[f"pred_{col_1}"] - df[f"true_{col_1}"]
    df[f"err_{col_2}"] = df[f"pred_{col_2}"] - df[f"true_{col_2}"]
    df["err_dist_deg"] = np.sqrt(df[f"err_{col_1}"]**2 + df[f"err_{col_2}"]**2)

    if lon_factor is None or lat_factor is None:
        mean_lat = df[f"true_{col_2}"].mean()
        lon_factor = 111320 * np.cos(np.radians(mean_lat))
        lat_factor = 111000

    df[f"err_{col_1}_m"] = df[f"err_{col_1}"] * lon_factor
    df[f"err_{col_2}_m"] = df[f"err_{col_2}"] * lat_factor
    df["err_dist_m"] = np.sqrt(df[f"err_{col_1}_m"]**2 + df[f"err_{col_2}_m"]**2)

    df["model"] = model_name

    if filename:
        df.to_csv(filename, index=False)
        print(f"✅ Saved: {filename}")

    return df


In [None]:
# Функции расчета метрик модели
from sklearn.metrics import mean_squared_error, mean_absolute_error, root_mean_squared_error, r2_score

# Базовые метрики
def calculate_metrics(y_true, y_pred):
    mae = mean_absolute_error(y_true, y_pred)
    rmse = root_mean_squared_error(y_true, y_pred)
    mae_per_target = mean_absolute_error(y_true, y_pred, multioutput='raw_values')
    rmse_per_target = root_mean_squared_error(y_true, y_pred, multioutput='raw_values')    
    r2_per_target = r2_score(y_true, y_pred, multioutput='raw_values')
    r2 = r2_score(y_true, y_pred)    

    return mae, rmse, mae_per_target, rmse_per_target, r2, r2_per_target

# Печать метрик
def print_metrics(mae, rmse, mae_per_target, rmse_per_target, r2, r2_per_target):

    print("Metrics:")
    print(f"MAE = {mae:.6f}")
    print(f"RMSE = {rmse:.6f}")
    print(f"MAE per target = {mae_per_target}")
    print(f"RMSE per target = {rmse_per_target}")
    print(f"R2 per target = {r2_per_target}")
    print(f"R2 = {r2}")

# Расчет дополнительных метрик
def print_advanced_metrics(y_true, y_pred, mae_per_target, rmse_per_target, r2_score, r2_score_per_target, round_digits=2):
    col_1, col_2 = y_true.columns[:2]

    # Средняя широта — для пересчёта в метры
    mean_lat = y_true[col_2].mean()
    lon_factor = 111320 * np.cos(np.radians(mean_lat))
    lat_factor = 111000

    mae_lon_deg, mae_lat_deg = mae_per_target
    rmse_lon_deg, rmse_lat_deg = rmse_per_target

    r2_score_lon, r2_score_lat = r2_score_per_target

    # Перевод в метры
    mae_lon_m = mae_lon_deg * lon_factor
    mae_lat_m = mae_lat_deg * lat_factor

    # MAPE
    mape = np.mean(np.abs((y_true.values - y_pred) / y_true.values)) * 100
    mape_col1 = np.mean(np.abs((y_true[col_1] - y_pred[:, 0]) / y_true[col_1])) * 100
    mape_col2 = np.mean(np.abs((y_true[col_2] - y_pred[:, 1]) / y_true[col_2])) * 100

    results = {
        "MAE (°)": np.mean(mae_per_target),
        "RMSE (°)": np.mean(rmse_per_target),
        f"MAE по {col_1} (°)": mae_lon_deg,
        f"MAE по {col_2} (°)": mae_lat_deg,
        f"RMSE по {col_1} (°)": rmse_lon_deg,
        f"RMSE по {col_2} (°)": rmse_lat_deg,
        f"MAE по {col_1} (м)": mae_lon_m,
        f"MAE по {col_2} (м)": mae_lat_m,
        "MAPE (%)": mape,
        f"MAPE по {col_1} (%)": mape_col1,
        f"MAPE по {col_2} (%)": mape_col2,
        f"R2 по {col_1}": r2_score_lon,
        f"R2 по {col_2}": r2_score_lat,
        f"R2 ": r2_score
    }

    df_results = pd.DataFrame(results, index=[0]).T
    df_results.columns = ["Value"]
    mask = df_results.index.str.contains(r"\(м\)|\(%\)")
    df_results.loc[mask, "Value"] = df_results.loc[mask, "Value"].round(round_digits)
    df_results.index.name = "Metric"
    
    display(HTML(df_results.to_html()))
    return df_results


# Оценка геометрии полигона
def evaluate_polygon_geometry(true_coords, pred_coords):
    """
    true_coords и pred_coords — списки [(x1, y1), (x2, y2), ...] в одной системе координат (например, WGS84).
    """
    try:
        poly_true = Polygon(true_coords)
        poly_pred = Polygon(pred_coords)

        if not poly_true.is_valid or not poly_pred.is_valid:
            return {"error": "invalid geometry"}

        area_true = poly_true.area
        area_pred = poly_pred.area
        area_error_pct = abs(area_true - area_pred) / area_true * 100 if area_true > 0 else None

        intersection = poly_true.intersection(poly_pred).area
        union = poly_true.union(poly_pred).area
        iou = intersection / union if union > 0 else None

        hausdorff = poly_true.hausdorff_distance(poly_pred)

        return {
            "area_true": area_true,
            "area_pred": area_pred,
            "area_error_pct": round(area_error_pct, 2),
            "iou": round(iou, 4) if iou is not None else None,
            "hausdorff": round(hausdorff, 2)
        }
    except TopologicalError as e:
        return {"error": f"TopologicalError: {str(e)}"}
    except Exception as e:
        return {"error": f"Exception: {str(e)}"}
    
# Функция для оценки геометрии полигона
def save_evaluation_polygon_geometry(df, model, version, path):
    results = []

    for number in df["number"].unique():
        group = df[df["number"] == number]
        
        true = list(zip(group["true_lon"], group["true_lat"]))
        pred = list(zip(group["pred_lon"], group["pred_lat"]))

        metrics = evaluate_polygon_geometry(true, pred)
        metrics["number"] = number
        results.append(metrics)

    df_geo_metrics = pd.DataFrame(results)
    filename = f"polygon_geometry_metrics_{model}_{version}.csv"
    df_geo_metrics.to_csv(path+filename, index=False)
    print(f"✅ Saved: {path+filename}")
    return df_geo_metrics

In [11]:
# Функции визуализации
import matplotlib.pyplot as plt
import seaborn as sns

def plot_predictions_vs_true(y_true, y_pred, title="Качество предсказаний модели"):
    col_1, col_2 = y_true.columns[:2]
    true_1 = y_true[col_1]
    true_2 = y_true[col_2]
    pred_1 = y_pred[:, 0]
    pred_2 = y_pred[:, 1]

    fig, axs = plt.subplots(1, 2, figsize=(14, 6))

    # Первая координата (обычно долгота)
    axs[0].scatter(true_1, pred_1, alpha=0.5, s=10, color='blue')
    axs[0].plot([true_1.min(), true_1.max()],
                [true_1.min(), true_1.max()],
                linestyle='--', color='gray')
    axs[0].set_title(f"{col_1}: предсказано vs истинно")
    axs[0].set_xlabel(f"Истинное значение ({col_1})")
    axs[0].set_ylabel(f"Предсказанное значение ({col_1})")
    axs[0].grid(True)
    axs[0].axis("equal")

    # Вторая координата (обычно широта)
    axs[1].scatter(true_2, pred_2, alpha=0.5, s=10, color='green')
    axs[1].plot([true_2.min(), true_2.max()],
                [true_2.min(), true_2.max()],
                linestyle='--', color='gray')
    axs[1].set_title(f"{col_2}: предсказано vs истинно")
    axs[1].set_xlabel(f"Истинное значение ({col_2})")
    axs[1].set_ylabel(f"Предсказанное значение ({col_2})")
    axs[1].grid(True)
    axs[1].axis("equal")

    plt.suptitle(title, fontsize=16)
    plt.tight_layout()
    plt.show()

# Функция для загрузки метрик полигона
def load_polygon_metrics(models):
    dfs = []
    for model in models:
        path = RESULTS_DIR + f"polygon_geometry_metrics_{model}.csv"
        df = pd.read_csv(path)
        df["model"] = model
        dfs.append(df)
    return pd.concat(dfs, ignore_index=True)

# 📊 Визуализация метрик формы полигона
def visualisation_polygon_form_metrics(models, highlight_model):

    polygon_metrics = load_polygon_metrics(models)
    
    # --- 1. Гистограмма по относительной ошибке площади --- #
    plt.figure(figsize=(8, 4))
    sns.histplot(data=polygon_metrics, x="area_error_pct", bins=50, kde=True, color="skyblue", hue="model")
    plt.title("Относительная ошибка площади (%)")
    plt.xlabel("% ошибки")
    plt.ylabel("Кол-во участков")
    plt.grid(True)
    plt.tight_layout()
    plt.show()

    # --- 2. Гистограмма по IoU --- #
    plt.figure(figsize=(8, 4))
    sns.histplot(data=polygon_metrics, x="iou", bins=50, kde=True, color="lightgreen", hue="model")
    plt.title("IoU (пересечение предсказанного и истинного полигона)")
    plt.xlabel("IoU")
    plt.ylabel("Кол-во участков")
    plt.grid(True)
    plt.tight_layout()
    plt.show()

    # --- 3. Гистограмма по Hausdorff --- #
    plt.figure(figsize=(8, 4))
    sns.histplot(data=polygon_metrics, x="hausdorff", bins=50, kde=True, color="salmon", hue="model")
    plt.title("Hausdorff расстояние между полигонами")
    plt.xlabel("Расстояние")
    plt.ylabel("Кол-во участков")
    plt.grid(True)
    plt.tight_layout()
    plt.show()

    polygon_metrics = polygon_metrics[polygon_metrics["model"]==highlight_model]

    # --- 4. Топ-10 участков с наихудшим IoU --- #
    worst_iou = polygon_metrics.sort_values("iou").head(10)
    print(f"\n🔴 Топ-10 для модели {highlight_model}  по наименьшему IoU:")
    display(HTML(worst_iou[["number", "iou", "hausdorff", "area_error_pct"]].to_html(index=False)))

    # --- 5. Топ-10 участков с наибольшим Hausdorff --- #
    worst_hausdorff = polygon_metrics.sort_values("hausdorff", ascending=False).head(10)
    print(f"\n🔴 Топ-10 для модели {highlight_model} по наибольшему Hausdorff:")
    display(HTML(worst_hausdorff[["number", "iou", "hausdorff", "area_error_pct"]].to_html(index=False)))

In [12]:
# визуализации, фильтрации, координатные преобразования
import folium
from shapely.geometry import Polygon

def plot_predicted_vs_true(df_points, df_predictions, number, show_sc63=True):
    """
    Визуализирует на карте участок по номеру:
    - зелёный контур: истинные координаты
    - красный контур: предсказанные координаты
    
    number может быть строкой или списком строк
    """
    if isinstance(number, str):
        numbers = [number]
    else:
        numbers = number

    for num in numbers:
        true_df = df_points[df_points["number"] == num]
        pred_df = df_predictions[df_predictions["number"] == num]

        true_coords = true_df[["wgs84_lat", "wgs84_lon"]].values
        pred_coords = pred_df[["pred_lat", "pred_lon"]].values

        fig = folium.Figure(width=800, height=400)
        m = folium.Map(tiles="OpenStreetMap", zoom_start=12, height='400px', width='800px')
        fig.add_child(m)        

        all_coords = list(true_coords) + list(pred_coords)
        if all_coords:
            center_lat = sum([c[0] for c in all_coords]) / len(all_coords)
            center_lon = sum([c[1] for c in all_coords]) / len(all_coords)
            m.location = [center_lat, center_lon]
            bounds = [[min(c[0] for c in all_coords), min(c[1] for c in all_coords)],
                      [max(c[0] for c in all_coords), max(c[1] for c in all_coords)]]
            m.fit_bounds(bounds)

        if len(true_coords) > 2:
            folium.Polygon(locations=true_coords, color="green", fill=False,
                           tooltip=f"Истинный контур: {num}").add_to(m)

        if len(pred_coords) > 2:
            folium.Polygon(locations=pred_coords, color="red", fill=False,
                           tooltip=f"Предсказанный контур: {num}").add_to(m)

        display(m)

        # --- Отрисовка в SC63
        if show_sc63:
            sc_pred = pred_df[["sc63_x", "sc63_y"]].values

            fig, ax = plt.subplots(figsize=(6, 6))

            if len(sc_pred) > 2:
                pred_poly = Polygon(sc_pred)
                xs, ys = pred_poly.exterior.xy
                ax.plot(xs, ys, color="red", label="Контур в SC63")

            ax.set_title(f"SC63: {num}")
            ax.set_aspect('equal')
            ax.grid(True)
            ax.legend()
            plt.xticks(rotation=90) 
            plt.show()

