In [82]:
import numpy as np
import pandas as pd
import ctgan
from ctgan import CTGAN

In [83]:
import dataclasses
@dataclasses.dataclass
class Card:
    name: str
    model: CTGAN
    target: str
    schedule_path: str
    real_data: pd.DataFrame
    synt_data: pd.DataFrame
    jensen_shannon_divergence: pd.DataFrame
    real_score: float
    synt_score: float
    def __init__(self, name: str, target: str, dataset_path: str, model_path: str, schedule_path: str):
        self.name = name
        self.target = target
        self.model = ctgan.CTGAN.load(model_path)
        self.real_data = pd.read_csv(dataset_path)
        self.synt_data = self.model.sample(len(self.real_data))
        self.schedule_path = schedule_path
        self.jensen_shannon_divergence = pd.DataFrame()
        self.real_score = 0.0
        self.sint_score = 0.0


In [84]:
from pathlib import Path

def load_cards_from_params(params_dir: str) -> dict[str, Card]:
    params_path = Path(params_dir)
    cards: dict[str, Card] = {}
    if not params_path.exists() or not params_path.is_dir():
        return cards

    for file in params_path.iterdir():
        if not file.is_file():
            continue

        lines = [ln.strip() for ln in file.read_text(encoding='utf-8').splitlines() if ln.strip()]
        if not lines:
            continue
        name = lines[0]
        target = lines[1] if len(lines) > 1 else ""
        dataset_path = lines[2] if len(lines) > 2 else ""
        model_path = lines[3] if len(lines) > 3 else ""
        schedule_path = lines[4] if len(lines) > 4 else ""
        card = Card(name=name, target=target, dataset_path=dataset_path,
                    model_path=model_path, schedule_path=schedule_path)
        cards[name] = card

    return cards

In [85]:
cards = load_cards_from_params('../models/params/')

UnpicklingError: Memo value not found at index 97

In [79]:
import warnings
from sklearn.exceptions import ConvergenceWarning
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

# Подавляем предупреждения для чистоты вывода
warnings.filterwarnings("ignore", category=ConvergenceWarning)
warnings.filterwarnings("ignore", category=FutureWarning)

def evaluate_card(card: Card) -> None:
    X_real = card.real_data.drop(columns=[card.target])
    y_real = card.real_data[card.target]
    X_synt = card.synt_data.drop(columns=[card.target])
    y_synt = card.synt_data[card.target]

    # Стратифицированный split для сохранения распределения классов
    X_train_real, X_test_real, y_train_real, y_test_real = train_test_split(
        X_real, y_real, test_size=0.5, random_state=42, stratify=y_real
    )
    X_train_synt, _, y_train_synt, _ = train_test_split(
        X_synt, y_synt, test_size=0.5, random_state=42, stratify=y_synt
    )

    # Pipeline: StandardScaler + LogisticRegression без устаревшего параметра multi_class
    clf_real = make_pipeline(
        StandardScaler(),
        LogisticRegression(solver='saga', max_iter=20000, random_state=42)
    )
    clf_real.fit(X_train_real, y_train_real)
    y_pred_real = clf_real.predict(X_test_real)
    real_score = accuracy_score(y_test_real, y_pred_real)

    clf_synt = make_pipeline(
        StandardScaler(),
        LogisticRegression(solver='saga', max_iter=20000, random_state=42)
    )
    clf_synt.fit(X_train_synt, y_train_synt)
    y_pred_synt = clf_synt.predict(X_test_real)
    synt_score = accuracy_score(y_test_real, y_pred_synt)

    card.real_score = real_score
    card.synt_score = synt_score


In [80]:
import numpy as np
import pandas as pd

def calculate_distributions(p_series: pd.Series, q_series: pd.Series):
    """
    Вычисляет распределения вероятностей для двух серий (колонок) pandas.
    Возвращает p, q на общем наборе уникальных значений.
    """
    # 1. Получаем массив уникальных элементов из обеих колонок
    all_values = pd.Index(p_series.unique()).union(q_series.unique())

    # 2. Считаем вероятности для каждого уникального элемента
    p_dist = p_series.value_counts(normalize=True).reindex(all_values, fill_value=0)
    q_dist = q_series.value_counts(normalize=True).reindex(all_values, fill_value=0)

    return p_dist, q_dist

def calculate_metrics(p_df: pd.DataFrame, q_df: pd.DataFrame) -> pd.DataFrame:
    """
    Вычисляет энтропию, KL и JSD дивергенции для каждой колонки.
    """
    metrics = []
    epsilon = 1e-10  # Малая константа для избежания деления на ноль

    for col in p_df.columns:
        if col not in q_df.columns:
            continue

        p, q = calculate_distributions(p_df[col], q_df[col])

        # --- Энтропия Шеннона для реальных данных H(P) ---
        # Используем только ненулевые вероятности, так как 0*log(0) = 0
        p_nonzero = p[p > 0]
        shannon_entropy = -np.sum(p_nonzero * np.log2(p_nonzero))

        # --- Дивергенция Кульбака-Лейблера D_KL(P || Q) ---
        # Добавляем epsilon к q, чтобы избежать log(0) или деления на 0
        q_smooth = q + epsilon
        kl_divergence = np.sum(p_nonzero * np.log2(p_nonzero / q_smooth[p_nonzero.index]))

        # --- Дивергенция Йенсена-Шеннона JSD(P || Q) ---
        m = 0.5 * (p + q)
        m_smooth = m + epsilon

        # D_KL(P || M)
        kl_p_m = np.sum(p_nonzero * np.log2(p_nonzero / m_smooth[p_nonzero.index]))

        # D_KL(Q || M)
        q_nonzero = q[q > 0]
        kl_q_m = np.sum(q_nonzero * np.log2(q_nonzero / m_smooth[q_nonzero.index]))

        jensen_shannon_divergence = 0.5 * kl_p_m + 0.5 * kl_q_m

        metrics.append({
            'column': col,
            'shannon_entropy': shannon_entropy,
            'kl_divergence': kl_divergence,
            'jensen_shannon_divergence': jensen_shannon_divergence
        })

    return pd.DataFrame(metrics)[['column','jensen_shannon_divergence']]

# Пример использования с вашим объектом Card
# for card in cards.values():
#     # Вычисляем метрики
#     metrics_df = calculate_metrics(card.real_data, card.synt_data)
#     # Сохраняем результат (например, в новый атрибут)
#     card.metrics = metrics_df
#     print(f"Метрики для {card.name}:")
#     print(card.metrics)
#     print("-" * 30)


In [81]:
for card in cards.values():
    evaluate_card(card)
    card.jensen_shannon_divergence = calculate_metrics(card.real_data, card.synt_data)