# Дообучение encoder'а для классификации токенов.

Реализация классификатора.

In [None]:
# Установка зависимостей (при необходимости)
!pip install -q evaluate seqeval transformers datasets

import gc
import math
from typing import List, Dict, Any, Optional, Tuple

import numpy as np
import torch
import evaluate
from transformers import (
    AutoModelForTokenClassification,
    AutoTokenizer,
    Trainer,
    TrainingArguments,
    DataCollatorForTokenClassification,
    TrainerCallback,
    PrinterCallback,
)
from transformers.modeling_outputs import ModelOutput
import pandas as pd
from datasets import Dataset
from sklearn.model_selection import train_test_split
from tqdm.auto import tqdm


def set_seed(seed: int = 42):
    """
    Устанавливает фиксированное зерно для Python, NumPy и PyTorch (включая все доступные CUDA-устройства).

    :param seed: Значение зерна.
    """
    import random
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)


class PbarConsoleLogger(TrainerCallback):
    """
    Внешний прогресс‑бар и консольный логгер метрик/лоссов для стабильного отображения на больших данных.
    """
    def __init__(self, pbar):
        self.pbar = pbar
        self.last_logs = {}
        self.last_train_loss = None
        self.printed_eval_steps = set()

    def _step(self, state) -> int:
        return int(state.global_step or 0)

    def _fmt_postfix(self):
        parts = []
        if 'loss' in self.last_logs:
            parts.append(f"loss {self.last_logs['loss']:.4f}")
        if 'eval_loss' in self.last_logs:
            parts.append(f"val {self.last_logs['eval_loss']:.4f}")
        for k, v in self.last_logs.items():
            if k.startswith('eval_') and k not in (
                'eval_loss', 'eval_runtime', 'eval_samples_per_second', 'eval_steps_per_second', 'epoch'
            ):
                try:
                    parts.append(f"{k.replace('eval_', '')} {float(v):.4f}")
                except Exception:
                    pass
        return " | ".join(parts)

    def on_step_end(self, args, state, control, **kwargs):
        n = min(self._step(state), self.pbar.total)
        if n > self.pbar.n:
            self.pbar.update(n - self.pbar.n)
        if self.last_logs:
            self.pbar.set_postfix_str(self._fmt_postfix(), refresh=False)
        self.pbar.refresh()

    def on_log(self, args, state, control, logs=None, **kwargs):
        if not logs:
            return
        for k, v in logs.items():
            if isinstance(v, (int, float)):
                self.last_logs[k] = float(v)
        if 'loss' in logs and isinstance(logs['loss'], (int, float)):
            self.last_train_loss = float(logs['loss'])

        self.pbar.set_postfix_str(self._fmt_postfix(), refresh=False)
        self.pbar.refresh()

        if any(k.startswith('eval_') for k in logs.keys()):
            step = self._step(state)
            if step in self.printed_eval_steps:
                return
            self.printed_eval_steps.add(step)

            train_loss_str = f"{self.last_train_loss:.10f}" if self.last_train_loss is not None else "n/a"
            val_loss = logs.get('eval_loss', None)
            val_loss_str = f"{float(val_loss):.10g}" if isinstance(val_loss, (int, float)) else "n/a"

            exclude = {'eval_loss', 'eval_runtime', 'eval_samples_per_second', 'eval_steps_per_second', 'epoch'}
            extra_parts = []
            for k, v in logs.items():
                if k.startswith('eval_') and k not in exclude:
                    metric_name = k.replace('eval_', '')
                    try:
                        extra_parts.append(f"val {metric_name}: {float(v):.10f}")
                    except Exception:
                        pass

            line = f"step: {step}, train loss: {train_loss_str}, val loss: {val_loss_str}"
            if extra_parts:
                line += ", " + ", ".join(extra_parts)
            tqdm.write(line)

    def on_train_end(self, args, state, control, **kwargs):
        n = min(self._step(state), self.pbar.total)
        if n > self.pbar.n:
            self.pbar.update(n - self.pbar.n)
        self.pbar.refresh()


class WeightedTokenCETrainer(Trainer):
    """
    Кастомный Trainer для токен-классификации с взвешенной CrossEntropy.
    Веса считаются с учетом числа токенов класса (после разметки и чанкинга):
      weight_i = N / (K * n_i),
    где K — число классов, n_i — число токенов класса i, N — сумма всех n_i.
    Отсутствующие классы получают вес 0. Метки -100 игнорируются в потере.

    Поддержка DataParallel: если logits дублируются по числу GPU, метки тайлятся соответствующим образом.
    """
    def __init__(self, *args, class_weights=None, **kwargs):
        # Тихо переводим устаревший аргумент tokenizer в processing_class (совместимость)
        processing = kwargs.pop("tokenizer", None)
        if processing is not None and "processing_class" not in kwargs:
            kwargs["processing_class"] = processing

        super().__init__(*args, **kwargs)

        self.class_weights = None
        if class_weights is not None:
            self.class_weights = torch.as_tensor(class_weights, dtype=torch.float32)
        self._warned_label_tiling = False

    def compute_loss(self, model, inputs, return_outputs=False, num_items_in_batch=None):
        """
        Считает взвешенный CrossEntropyLoss по токенам, игнорируя -100.
        Корректно обрабатывает случай DataParallel (дублирование по batch-оси).
        """
        labels = inputs.pop("labels")
        outputs = model(**inputs)
        logits = outputs.logits if hasattr(outputs, "logits") else outputs["logits"]  # [B, L, C]

        # Приводим размеры в соответствие при DataParallel (если нужно)
        if logits.size(0) != labels.size(0):
            ngpu = torch.cuda.device_count()
            if ngpu > 1 and logits.size(0) == labels.size(0) * ngpu:
                labels = labels.repeat_interleave(ngpu, dim=0)
                if not self._warned_label_tiling:
                    print(f"[Warning] DataParallel удвоил batch для logits. "
                          f"Повторяем labels x{ngpu}. logits: {tuple(logits.shape)}, labels: {tuple(labels.shape)}")
                    self._warned_label_tiling = True
            else:
                raise ValueError(f"Batch size mismatch: logits {tuple(logits.shape)} vs labels {tuple(labels.shape)}")

        # Взвешенный CE с ignore_index=-100
        loss_fct = torch.nn.CrossEntropyLoss(
            weight=(self.class_weights.to(logits.device) if self.class_weights is not None else None),
            ignore_index=-100,
        )
        loss = loss_fct(logits.view(-1, logits.size(-1)), labels.view(-1))
        return (loss, outputs) if return_outputs else loss


class TokenClassification:
    """
    Пайплайн токен‑классификации с поддержкой больших данных (чанковое обучение по документам и sliding window).

    Возможности:
      - Автоматическая разметка (sliding window) с перекрытием stride.
      - Взвешенная CrossEntropy по токенам (по всему train) с игнорированием -100.
      - Чанковое обучение: подстановка train_dataset кусками документов (fit_chunk_size_docs).
      - Внешний tqdm прогресс‑бар (стабильный) + консольный лог метрик.

    Необходимые импорты:
    import numpy as np
    import torch
    import evaluate
    from transformers import (
        AutoModelForTokenClassification,
        AutoTokenizer,
        Trainer,
        TrainingArguments,
        DataCollatorForTokenClassification,
    )
    import pandas as pd
    from datasets import Dataset
    from sklearn.model_selection import train_test_split
    from tqdm.auto import tqdm
    """
    def __init__(
        self,
        checkpoint: str,
        label2id: Dict[str, int],
        tokens_column_name: str,
        tags_column_name: str
    ):
        """
        Инициализация модели и токенизатора.

        :param checkpoint: Имя/путь чекпоинта (HF Hub).
        :param label2id: Отображение тегов в id.
        :param tokens_column_name: Имя колонки с токенами (список строк).
        :param tags_column_name: Имя колонки с метками (список тегов или id).
        """
        self.id2label = {v: k for k, v in label2id.items()}
        self.label2id = label2id

        self.model = AutoModelForTokenClassification.from_pretrained(
            checkpoint,
            num_labels=len(self.id2label),
            id2label=self.id2label,
            label2id=self.label2id,
            ignore_mismatched_sizes=True
        )
        self.tokenizer = AutoTokenizer.from_pretrained(checkpoint)
        self.tokens_column_name = tokens_column_name
        self.tags_column_name = tags_column_name

        self.trainer: Optional[Trainer] = None
        self.compute_metrics = None
        self.data_collator = DataCollatorForTokenClassification(tokenizer=self.tokenizer)
        self.progress_callback: Optional[TrainerCallback] = None

    @staticmethod
    def _align_labels_with_word_ids(labels_ids: List[int], word_ids: List[Optional[int]]) -> List[int]:
        """
        Выравнивает метки по word_ids от токенизатора: на первый токен слова ставится метка,
        остальные токены слова получают -100. Спец‑токены (None) получают -100.

        :param labels_ids: Список id‑меток длины = числу слов в документе.
        :param word_ids: Список индексов слов длины = числу токенов в чанке (или None для спец‑токенов).
        :return: Список меток длины = числу токенов в чанке.
        """
        new_labels = []
        prev_word_id = None
        for wid in word_ids:
            if wid is None:
                new_labels.append(-100)
            else:
                if wid != prev_word_id:
                    new_labels.append(labels_ids[wid])
                else:
                    new_labels.append(-100)
            prev_word_id = wid
        return new_labels

    def _tokenize_and_align_chunk(
        self,
        docs_tokens: List[List[str]],
        docs_labels_ids: List[List[int]],
        max_length: int,
        stride: int
    ) -> Dataset:
        """
        Токенизирует список документов (списки токенов) с возвратом переполнений (sliding window),
        выравнивает метки по word_ids, формирует Dataset для обучения/валидации.

        :param docs_tokens: Список документов; каждый документ — список токенов (слов).
        :param docs_labels_ids: Список документов; каждый — список id‑меток по словам.
        :param max_length: Максимальная длина последовательности модели.
        :param stride: Перекрытие при нарезке (число токенов).
        :return: datasets.Dataset с полями input_ids, attention_mask, labels.
        """
        enc = self.tokenizer(
            docs_tokens,
            is_split_into_words=True,
            return_overflowing_tokens=True,
            max_length=max_length,
            stride=stride,
            truncation=True
        )
        mapping = enc.pop("overflow_to_sample_mapping")  # len = кол-во получившихся чанков
        num_chunks = len(enc["input_ids"])

        all_labels = []
        for i in range(num_chunks):
            doc_idx = int(mapping[i])
            word_ids = enc.word_ids(batch_index=i)
            aligned = self._align_labels_with_word_ids(docs_labels_ids[doc_idx], word_ids)
            all_labels.append(aligned)

        return Dataset.from_dict({
            "input_ids": enc["input_ids"],
            "attention_mask": enc["attention_mask"],
            "labels": all_labels
        })

    def _count_total_chunks(
        self,
        docs_tokens: List[List[str]],
        max_length: int,
        stride: int,
        batch_docs: int = 64
    ) -> int:
        """
        Оценивает общее число чанков (последовательностей) после sliding window
        для списка документов, без хранения признаков.

        :param docs_tokens: Список документов (список токенов-слов).
        :param max_length: Максимальная длина последовательности модели.
        :param stride: Перекрытие при нарезке.
        :param batch_docs: Размер пачки документов для ускорения токенизации.
        :return: Общее число последовательностей.
        """
        total = 0
        for i in range(0, len(docs_tokens), batch_docs):
            batch = docs_tokens[i:i + batch_docs]
            enc = self.tokenizer(
                batch,
                is_split_into_words=True,
                return_overflowing_tokens=True,
                max_length=max_length,
                stride=stride,
                truncation=True
            )
            total += len(enc["input_ids"])
        return total

    def _compute_class_weights_over_docs(
        self,
        docs_tokens: List[List[str]],
        docs_labels_ids: List[List[int]],
        max_length: int,
        stride: int,
        batch_docs: int = 32
    ) -> np.ndarray:
        """
        Считает веса классов по ВСЕМ токенам тренировки, выравнивая метки и
        не сохраняя полный датасет в памяти.

        :param docs_tokens: Список документов: список токенов (слов).
        :param docs_labels_ids: Список документов: список id‑меток по словам.
        :param max_length: Максимальная длина последовательности.
        :param stride: Перекрытие при нарезке (в токенах).
        :param batch_docs: Размер батча документов при токенизации.
        :return: Вектор весов классов формы [K].
        """
        num_labels = len(self.id2label)
        counts = np.zeros(num_labels, dtype=np.int64)

        for i in tqdm(range(0, len(docs_tokens), batch_docs), desc="Подсчет частот классов (token-level)"):
            toks = docs_tokens[i:i + batch_docs]
            labs = docs_labels_ids[i:i + batch_docs]

            enc = self.tokenizer(
                toks,
                is_split_into_words=True,
                return_overflowing_tokens=True,
                max_length=max_length,
                stride=stride,
                truncation=True
            )
            mapping = enc.pop("overflow_to_sample_mapping")
            num_chunks = len(enc["input_ids"])

            for j in range(num_chunks):
                doc_idx = int(mapping[j])
                word_ids = enc.word_ids(batch_index=j)
                aligned = self._align_labels_with_word_ids(labs[doc_idx], word_ids)
                arr = np.asarray(aligned, dtype=np.int64)
                arr = arr[arr >= 0]  # игнорируем -100
                if arr.size > 0:
                    bc = np.bincount(arr, minlength=num_labels)
                    counts += bc

        N = counts.sum()
        weights = np.zeros(num_labels, dtype=np.float32)
        nonzero = counts > 0
        if N > 0:
            weights[nonzero] = N / (num_labels * counts[nonzero].astype(np.float32))
        return weights

    def _setup_compute_metrics(self):
        """
        Создает функцию подсчета seqeval-метрик (precision/recall/f1/accuracy).
        """
        metric = evaluate.load("seqeval")

        def compute_seqeval_metrics(p):
            # Поддержка EvalPrediction и tuple
            if isinstance(p, (tuple, list)):
                predictions, labels = p
            else:
                predictions, labels = p.predictions, p.label_ids

            predictions = np.argmax(predictions, axis=2)

            true_predictions = [
                [self.id2label[p] for (p, l) in zip(prediction, label) if l != -100]
                for prediction, label in zip(predictions, labels)
            ]

            true_labels = [
                [self.id2label[l] for (p, l) in zip(prediction, label) if l != -100]
                for prediction, label in zip(predictions, labels)
            ]

            results = metric.compute(predictions=true_predictions, references=true_labels)
            return {
                "precision": results.get("overall_precision", 0.0),
                "recall": results.get("overall_recall", 0.0),
                "f1": results.get("overall_f1", 0.0),
                "accuracy": results.get("overall_accuracy", 0.0),
            }

        self.compute_metrics = compute_seqeval_metrics

    def _prepare_dataset_with_sliding_window(self, df: pd.DataFrame, max_length: int, stride: int) -> Dataset:
        """
        Готовит токенизированный Dataset для списка документов (используется, например, для валидации).
        Выполняет:
          - маппинг строковых тегов -> id при необходимости,
          - токенизацию с переполнениями,
          - выравнивание меток по word_ids.

        :param df: Датафрейм с колонками токенов и меток.
        :param max_length: Максимальная длина модели.
        :param stride: Перекрытие sliding window.
        :return: datasets.Dataset с полями input_ids, attention_mask, labels.
        """
        docs_tokens = df[self.tokens_column_name].tolist()
        docs_labels = df[self.tags_column_name].tolist()

        # Если метки строковые — переводим в id
        if len(docs_labels) and isinstance(docs_labels[0][0], str):
            docs_labels = [[self.label2id[tag] for tag in tags] for tags in docs_labels]

        return self._tokenize_and_align_chunk(docs_tokens, docs_labels, max_length, stride)

    def fit(
        self,
        train_data: pd.DataFrame,
        epochs: int = 3,
        per_device_train_batch_size: int = 16,
        gradient_accumulation_steps: int = 1,
        test_size: float = 0.2,
        learning_rate: float = 2e-5,
        fp16: bool = True,
        stride: int = 128,
        logging_steps: int = 50,
        eval_steps: int = 100,
        output_dir: str = "./result",
        seed: int = 42,
        fit_chunk_size_docs: Optional[int] = None
    ):
        """
        Обучает модель токен‑классификации с поддержкой больших данных:
        train_dataset подставляется чанками документов, внутри которых выполняется sliding window.

        :param train_data: Датафрейм: колонки с токенами и метками на уровне слов.
        :param epochs: Кол-во эпох.
        :param per_device_train_batch_size: Размер батча на устройство.
        :param gradient_accumulation_steps: Шаги аккумуляции градиентов.
        :param test_size: Доля валидации (по документам).
        :param learning_rate: LR для AdamW.
        :param fp16: Использовать fp16 (если bf16 не доступен).
        :param stride: Перекрытие при нарезке (в токенах).
        :param logging_steps: Частота логирования.
        :param eval_steps: Частота валидации/сохранения.
        :param output_dir: Папка для артефактов.
        :param seed: Зерно.
        :param fit_chunk_size_docs: Сколько документов подставлять в один тренировочный чанк. Если None — все.
        :return: self.
        """
        set_seed(seed)
        max_length = int(getattr(self.model.config, "max_position_embeddings", 512))

        # Маппинг строковых тегов -> id (все данные)
        df_all = train_data.copy()
        if len(df_all) and len(df_all[self.tags_column_name]) and isinstance(df_all[self.tags_column_name].iloc[0][0], str):
            df_all[self.tags_column_name] = df_all[self.tags_column_name].apply(
                lambda tags: [self.label2id[tag] for tag in tags]
            )

        # Сплит по документам (потом каждый набор будет чанковаться по sliding window)
        df_train, df_eval = train_test_split(df_all, test_size=test_size, random_state=seed, shuffle=True)

        # Eval датасет можно подготовить целиком (обычно компактнее train)
        eval_dataset = self._prepare_dataset_with_sliding_window(df_eval, max_length, stride)

        # Документы тренировки (списки токенов и меток)
        train_docs_tokens = df_train[self.tokens_column_name].tolist()
        train_docs_labels = df_train[self.tags_column_name].tolist()

        # Веса классов на всём train (по токенам, без хранения полного tokenized train)
        class_weights = self._compute_class_weights_over_docs(
            docs_tokens=train_docs_tokens,
            docs_labels_ids=train_docs_labels,
            max_length=max_length,
            stride=stride,
            batch_docs=32
        )

        # Настройка метрик
        self._setup_compute_metrics()

        # bf16 если доступен, иначе fp16 при флаге
        bf16_ok = bool(torch.cuda.is_available() and torch.cuda.get_device_capability()[0] >= 8)

        args = TrainingArguments(
            output_dir=output_dir,
            num_train_epochs=epochs,
            per_device_train_batch_size=per_device_train_batch_size,
            per_device_eval_batch_size=max(4, per_device_train_batch_size // 2),
            gradient_accumulation_steps=gradient_accumulation_steps,
            eval_accumulation_steps=max(1, gradient_accumulation_steps * 2),
            learning_rate=learning_rate,
            warmup_ratio=0.1,
            lr_scheduler_type="cosine",
            weight_decay=0.01,
            eval_strategy="steps",         # <- по вашей просьбе используем eval_strategy
            eval_steps=eval_steps,
            save_strategy="steps",
            save_steps=eval_steps,
            load_best_model_at_end=True,
            metric_for_best_model="f1",
            save_total_limit=1,
            logging_strategy="steps",
            logging_steps=logging_steps,
            report_to="none",
            fp16=bool(fp16 and torch.cuda.is_available() and not bf16_ok),
            bf16=bool(bf16_ok and not fp16),
            dataloader_num_workers=0,
            seed=seed,
            remove_unused_columns=False,
            disable_tqdm=True  # используем внешний tqdm
        )

        data_collator = self.data_collator

        # Вспомогательные: шаги по количеству сэмплов (последовательностей), а не документов
        def steps_for_size(n_samples: int, bsz: int, accum: int) -> int:
            """
            Оценивает число оптимизационных шагов на чанке из n_samples последовательностей.
            """
            return max(0, math.ceil(math.ceil(n_samples / max(1, bsz)) / max(1, accum)))

        def chunk_slices(n_docs: int, chunk_docs: int):
            """
            Генератор срезов индексов документов по chunk_docs.
            """
            for i in range(0, n_docs, chunk_docs):
                yield slice(i, min(i + chunk_docs, n_docs))

        # Объем чанка по документам (по умолчанию — все документы)
        n_docs = len(train_docs_tokens)
        chunk_docs = int(fit_chunk_size_docs) if (fit_chunk_size_docs and fit_chunk_size_docs > 0) else n_docs

        # Предварительный расчет total_steps (по числу последовательностей после токенизации)
        total_steps = 0
        rng = np.random.default_rng(seed)
        doc_indices = np.arange(n_docs)

        for _ in range(epochs):
            rng.shuffle(doc_indices)
            for slc in chunk_slices(n_docs, chunk_docs):
                idx = doc_indices[slc]
                toks_chunk = [train_docs_tokens[i] for i in idx]
                # считаем, сколько получится последовательностей в этом чанке документов
                n_samples = self._count_total_chunks(toks_chunk, max_length, stride, batch_docs=64)
                total_steps += steps_for_size(n_samples, per_device_train_batch_size, gradient_accumulation_steps)

        # Инициализируем Trainer с "минимальным" train_dataset (пустой/минимальный чанк)
        # чтобы не держать всю тренировочную выборку
        if n_docs > 0:
            init_chunk_ds = self._tokenize_and_align_chunk(
                [train_docs_tokens[0]], [train_docs_labels[0]], max_length, stride
            )
        else:
            init_chunk_ds = eval_dataset  # fallback

        self.trainer = WeightedTokenCETrainer(
            model=self.model,
            args=args,
            data_collator=data_collator,
            compute_metrics=self.compute_metrics,
            train_dataset=init_chunk_ds,
            eval_dataset=eval_dataset,
            tokenizer=self.tokenizer,
            class_weights=class_weights
        )
        # Удаляем стандартный принтер логов, чтобы не конфликтовал с внешним tqdm
        try:
            self.trainer.remove_callback(PrinterCallback)
        except Exception:
            pass

        # Планировщик под рассчитанное число шагов
        if total_steps > 0:
            self.trainer.create_optimizer_and_scheduler(num_training_steps=total_steps)

        # Внешний прогресс-бар
        pbar = tqdm(total=total_steps, desc="Training Progress", unit="step")
        cb = PbarConsoleLogger(pbar)
        self.trainer.add_callback(cb)
        self.progress_callback = cb

        # Основной цикл обучения по эпохам/чанкам документов
        steps_done = 0
        for ep in range(epochs):
            rng = np.random.default_rng(seed + ep)
            order = np.arange(n_docs)
            rng.shuffle(order)

            for slc in chunk_slices(n_docs, chunk_docs):
                idx = order[slc]
                toks_chunk = [train_docs_tokens[i] for i in idx]
                labs_chunk = [train_docs_labels[i] for i in idx]

                # Готовим датасет последовательностей для этого чанка документов
                ds_chunk = self._tokenize_and_align_chunk(toks_chunk, labs_chunk, max_length, stride)
                self.trainer.train_dataset = ds_chunk

                # Шаги на текущем чанке
                n_samples = len(ds_chunk)  # число последовательностей после sliding window
                chunk_steps = steps_for_size(n_samples, per_device_train_batch_size, gradient_accumulation_steps)
                if chunk_steps == 0:
                    del ds_chunk
                    continue

                # Дообучаем до steps_done + chunk_steps
                self.trainer.args.max_steps = steps_done + chunk_steps
                self.trainer.train()
                steps_done += chunk_steps

                # Очистка памяти
                del ds_chunk
                gc.collect()
                if torch.cuda.is_available():
                    torch.cuda.empty_cache()

        pbar.close()
        return self

    def _predict_single_document(self, tokens: List[str], stride: int) -> List[str]:
        """
        Предсказывает метки на уровне слов для одного документа с помощью sliding window.

        :param tokens: Список токенов (слов) документа.
        :param stride: Перекрытие при нарезке.
        :return: Список предсказанных тегов (строки) длиной = числу слов.
        """
        max_length = int(getattr(self.model.config, "max_position_embeddings", 512))

        tokenized_inputs = self.tokenizer(
            [tokens],
            is_split_into_words=True,
            return_overflowing_tokens=True,
            max_length=max_length,
            stride=stride,
            truncation=True,
        )
        tokenized_inputs.pop("overflow_to_sample_mapping", None)
        chunk_dataset = Dataset.from_dict(tokenized_inputs)

        outputs = self.trainer.predict(chunk_dataset)
        predictions = np.argmax(outputs.predictions, axis=2)

        num_original_words = len(tokens)
        final_predictions = np.full(num_original_words, -1, dtype=np.int32)

        for i, chunk_preds in enumerate(predictions):
            chunk_word_ids = tokenized_inputs.word_ids(batch_index=i)
            for token_pos, word_id in enumerate(chunk_word_ids):
                if word_id is not None and final_predictions[word_id] == -1:
                    final_predictions[word_id] = int(chunk_preds[token_pos])

        return [self.id2label.get(pid, 'O') for pid in final_predictions]

    def predict(self, df: pd.DataFrame, stride: int = 128) -> List[List[str]]:
        """
        Делает предсказания на уровне слов для набора документов (через sliding window).

        :param df: Датафрейм с колонкой токенов (список слов).
        :param stride: Перекрытие при нарезке.
        :return: Список предсказанных последовательностей тегов по документам.
        """
        all_final_labels = []
        for _, row in tqdm(df.iterrows(), total=len(df), desc="Предсказание (sliding window)"):
            original_tokens = row[self.tokens_column_name]
            if not original_tokens:
                all_final_labels.append([])
                continue
            document_labels = self._predict_single_document(original_tokens, stride)
            all_final_labels.append(document_labels)
        return all_final_labels

    def _get_embeddings_single_document(self, tokens: List[str], stride: int, device: torch.device) -> np.ndarray:
        """
        Возвращает усредненные эмбеддинги токенов на уровне слов (после объединения частей
        от sliding window) для одного документа.

        :param tokens: Список токенов (слов).
        :param stride: Перекрытие при нарезке.
        :param device: Целевой девайс модели.
        :return: Массив [num_words, hidden_size].
        """
        max_length = int(getattr(self.model.config, "max_position_embeddings", 512))
        num_original_words = len(tokens)

        chunk_inputs = self.tokenizer(
            [tokens],
            is_split_into_words=True,
            return_overflowing_tokens=True,
            max_length=max_length,
            stride=stride,
            truncation=True,
            return_tensors="pt"
        ).to(device)

        chunk_inputs.pop("overflow_to_sample_mapping")

        with torch.no_grad():
            base_model = getattr(self.trainer.model, self.trainer.model.base_model_prefix)
            outputs = base_model(**chunk_inputs)

        chunk_embeddings = outputs.last_hidden_state  # [num_chunks, seq_len, hidden]

        hidden_size = int(self.model.config.hidden_size)
        final_word_embeddings = torch.zeros(num_original_words, hidden_size, device=device)
        word_counts = torch.zeros(num_original_words, device=device)

        for i in range(len(chunk_embeddings)):
            chunk_embeds = chunk_embeddings[i]
            chunk_word_ids = chunk_inputs.word_ids(batch_index=i)

            for token_pos, word_id in enumerate(chunk_word_ids):
                if word_id is not None:
                    final_word_embeddings[word_id] += chunk_embeds[token_pos]
                    # считаем количество фрагментов для усреднения по слову
                    if token_pos == 0 or chunk_word_ids[token_pos - 1] != word_id:
                        word_counts[word_id] += 1

        average_embeddings = final_word_embeddings / (word_counts.unsqueeze(1) + 1e-8)
        return average_embeddings.detach().cpu().numpy()

    def get_embeddings(self, df: pd.DataFrame, stride: int = 128) -> List[np.ndarray]:
        """
        Генерирует эмбеддинги на уровне слов для набора документов (через sliding window).

        :param df: Датафрейм с колонкой токенов (список слов).
        :param stride: Перекрытие при нарезке.
        :return: Список массивов [num_words, hidden_size] для каждого документа.
        :raises RuntimeError: Если модель не обучена.
        """
        if self.trainer is None or self.trainer.model is None:
            raise RuntimeError("Модель не обучена. Вызовите .fit().")

        self.trainer.model.eval()
        device = self.trainer.model.device
        all_final_embeddings = []

        for _, row in tqdm(df.iterrows(), total=len(df), desc="Генерация эмбеддингов (sliding window)"):
            original_tokens = row[self.tokens_column_name]
            if not original_tokens:
                all_final_embeddings.append(np.zeros((0, int(self.model.config.hidden_size)), dtype=np.float32))
                continue

            document_embeddings = self._get_embeddings_single_document(original_tokens, stride, device)
            all_final_embeddings.append(document_embeddings)

        return all_final_embeddings


Пример использования.

In [None]:
train_data = pd.DataFrame({
    'tokens': [
        ['Федор', 'Достоевский', 'родился', 'в', 'Москве', '.'],
        ['Анна', 'Керн', 'была', 'музой', 'Пушкина', '.'],
        ['Компания', 'Яндекс', 'представила', 'Алису', '.'],
        ['Илон', 'Маск', 'основал', 'SpaceX', 'и', 'Tesla', '.']
    ],
    'ner_tags': [
        ['B-PER', 'I-PER', 'O', 'O', 'B-LOC', 'O'],
        ['B-PER', 'I-PER', 'O', 'O', 'B-PER', 'O'],
        ['O', 'B-ORG', 'O', 'B-PER', 'O'],
        ['B-PER', 'I-PER', 'O', 'B-ORG', 'O', 'B-ORG', 'O']
    ]
})

# Для предсказания нам нужны только токены
submission_data = pd.DataFrame({
    'tokens': [
        ['Лев', 'Толстой', 'написал', 'роман', '"', 'Война', 'и', 'мир', '"', '.'],
        ['Сергей', 'Королев', 'работал', 'в', 'РКК', '"', 'Энергия', '"', '.']
    ]
})

train_data = pd.concat([train_data] * 15, axis=0)
submission_data = pd.concat([submission_data] * 15, axis=0)

# 2. Создание и обучение модели
# Создаем маппинг из тегов в ID
tags = train_data['ner_tags'].explode().unique()
label2id = {tag: i for i, tag in enumerate(tags)}

model = TokenClassification(
    checkpoint='DeepPavlov/rubert-base-cased',
    label2id=label2id,
    tokens_column_name='tokens',
    tags_column_name='ner_tags'
)

model.fit(
    train_data,
    epochs=3,
    test_size=0.25,
    per_device_train_batch_size=2,
    gradient_accumulation_steps=1,
    learning_rate=2e-5,
    fp16=False,
    logging_steps=10,
    eval_steps=10
)

# 3. Прогнозирование и получение эмбеддингов
labels = model.predict(submission_datap[:5])
embeddings = model.get_embeddings(submission_data[:5])

print(labels)
print(embeddings)

# Дообучение классификатора, который работает с данными разной модальностью.

Реализация классификатора.

In [None]:
!pip install -q wav2clip torchaudio evaluate pillow

import gc
import math
from typing import List, Dict, Any, Optional, Union

import numpy as np
import pandas as pd
from PIL import Image

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

import evaluate
from transformers import Trainer, TrainingArguments, TrainerCallback, PrinterCallback
from transformers.modeling_outputs import SequenceClassifierOutput
from tqdm.auto import tqdm


def set_seed(seed: int = 42):
    """
    Фиксирует зерно для воспроизводимости.

    :param seed: Целое число для инициализации генераторов случайных чисел.
    """
    import random
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)


def to_pil(x: Union[str, np.ndarray, Image.Image]) -> Image.Image:
    """
    Приводит вход к PIL.Image в формате RGB.

    :param x: Путь к изображению, np.ndarray (H,W[,C]) или PIL.Image.
    :return: PIL.Image (RGB).
    :raises ValueError: Если тип входа не поддерживается.
    """
    if isinstance(x, Image.Image):
        return x.convert("RGB")
    if isinstance(x, str):
        return Image.open(x).convert("RGB")
    if isinstance(x, np.ndarray):
        return Image.fromarray(x).convert("RGB")
    raise ValueError("Ожидается путь/np.ndarray/PIL.Image")


def load_audio(path: str, target_sr: int) -> np.ndarray:
    """
    Загружает аудиофайл и при необходимости ресемплирует до target_sr.

    :param path: Путь к файлу (wav/flac и т.п.).
    :param target_sr: Целевая частота дискретизации.
    :return: Одноканальный сигнал формы [T] float32.
    :raises RuntimeError: Если torchaudio не установлен.
    """
    try:
        import torchaudio
    except Exception as e:
        raise RuntimeError("Требуется torchaudio: pip install torchaudio") from e
    waveform, sr = torchaudio.load(path)  # [C, T]
    if waveform.size(0) > 1:
        waveform = waveform.mean(dim=0, keepdim=True)
    if sr != target_sr:
        waveform = torchaudio.functional.resample(waveform, orig_freq=sr, new_freq=target_sr)
    return waveform.squeeze(0).numpy().astype(np.float32)


class MultiComboDataset(Dataset):
    """
    PyTorch Dataset для различных комбинаций модальностей (text/image/audio).
    Поддерживает несколько изображений/аудио per-сэмпл за счёт того, что ячейки могут быть списками.

    Важно: сам датасет хранит лишь ссылки/пути/строки. Реальная загрузка PIL/аудио происходит в collate бэкенда
    (лениво и батчево), что позволяет работать с большими данными.

    :param df: Источник данных (DataFrame).
    :param target_col: Имя колонки с таргетом (классовой меткой).
    :param label2id: Словарь {значение_метки -> id}. Значения меток в df[target_col] должны встречаться в ключах.
    :param text_columns: Текстовые колонки; их значения склеиваются в одну строку.
    :param image_columns: Колонки с изображениями (значение — путь/PIL/numpy или список таковых).
    :param audio_columns: Колонки с аудио (значение — путь/массив или список таковых).
    """
    def __init__(
        self,
        df: pd.DataFrame,
        target_col: str,
        label2id: Dict[Any, int],
        text_columns: Optional[List[str]] = None,
        image_columns: Optional[List[str]] = None,
        audio_columns: Optional[List[str]] = None
    ):
        self.df = df.reset_index(drop=True)
        self.target_col = target_col
        self.label2id = label2id
        self.text_columns = text_columns or []
        self.image_columns = image_columns or []
        self.audio_columns = audio_columns or []
        self.sep = " [SEP] "

    def __len__(self) -> int:
        """
        :return: Количество элементов.
        """
        return len(self.df)

    def __getitem__(self, idx: int) -> Dict[str, Any]:
        """
        Возвращает элемент датасета.

        :param idx: Индекс строки.
        :return: Словарь с ключами:
                 - 'labels' (int id)
                 - 'text' (str), если есть текстовые колонки
                 - 'images' (list), если есть колонки картинок
                 - 'audios' (list), если есть колонки аудио
        """
        row = self.df.iloc[idx]
        item = {"labels": int(self.label2id[row[self.target_col]]) if self.target_col in row else 0}

        if self.text_columns:
            item["text"] = self.sep.join([str(row[c]) if pd.notnull(row[c]) else "" for c in self.text_columns])

        def _as_list(v):
            if v is None or (isinstance(v, float) and np.isnan(v)):
                return []
            if isinstance(v, (list, tuple)):
                return list(v)
            return [v]

        if self.image_columns:
            imgs = []
            for c in self.image_columns:
                if c in row:
                    imgs.extend(_as_list(row[c]))
            item["images"] = imgs

        if self.audio_columns:
            auds = []
            for c in self.audio_columns:
                if c in row:
                    auds.extend(_as_list(row[c]))
            item["audios"] = auds

        return item


class BaseBackend(nn.Module):
    """
    Базовый класс мультимодального бэкенда.

    Атрибуты:
      - name: Название бэкенда (str).
      - supported: Набор поддерживаемых модальностей, например {'text','image'}.
      - embed_dim: Базовая размерность эмбеддингов (int).
      - out_dim_per_modality: Реальные выходные размерности (dict modality->int), учитывая агрегацию (concat/mean).
    """
    name: str = "base"
    supported: set = set()
    embed_dim: int = 0
    out_dim_per_modality: Dict[str, int] = {}

    def collate(self, batch: List[Dict[str, Any]]) -> Dict[str, Any]:
        """
        Собирает батч для Trainer из списка элементов Dataset.

        :param batch: Список элементов (из MultiComboDataset.__getitem__).
        :return: Словарь с 'labels' (LongTensor) и 'backend_inputs' (dict).
        """
        raise NotImplementedError

    def encode(self, backend_inputs: Dict[str, Any], device: torch.device) -> Dict[str, torch.Tensor]:
        """
        Кодирует модальности и возвращает L2-нормированные эмбеддинги.

        :param backend_inputs: Подготовленные входы (collate).
        :param device: Девайс для инференса.
        :return: Словарь {'text':[B,*], 'image':[B,*], 'audio':[B,*]} по доступным модальностям.
        """
        raise NotImplementedError

    def freeze_all(self):
        """
        Замораживает параметры бэкенда (requires_grad=False), полезно для linear probing.
        """
        for p in self.parameters():
            p.requires_grad = False

    def get_out_dim(self, modality: str) -> int:
        """
        Возвращает выходную размерность эмбеддинга по модальности с учётом агрегации.

        :param modality: 'text' | 'image' | 'audio'.
        :return: Размерность вектора.
        """
        return self.out_dim_per_modality.get(modality, self.embed_dim)


class ClipBackend(BaseBackend):
    """
    Бэкенд CLIP (HF) для модальностей: text + image.
    Поддерживает несколько изображений per-сэмпл с агрегацией (concat или mean).

    :param checkpoint: Модель CLIP на HF (например, 'openai/clip-vit-base-patch32').
    :param max_length: Максимальная длина текстовых токенов.
    :param freeze: Заморозить ли веса CLIP.
    :param max_images: Максимум картинок на сэмпл при concat-паде.
    :param image_agg: 'concat' или 'mean' — как агрегировать несколько изображений.
    """
    name = "clip"
    supported = {"text", "image"}

    def __init__(
        self,
        checkpoint: str = "openai/clip-vit-base-patch32",
        max_length: int = 77,
        freeze: bool = True,
        max_images: int = 1,
        image_agg: str = "concat"
    ):
        super().__init__()
        from transformers import CLIPModel, CLIPProcessor
        self.model = CLIPModel.from_pretrained(checkpoint)
        self.processor = CLIPProcessor.from_pretrained(checkpoint)
        self.embed_dim = int(self.model.config.projection_dim)
        self.max_length = max_length
        self.max_images = int(max_images)
        self.image_agg = image_agg
        if freeze:
            self.freeze_all()
        img_out = self.embed_dim * self.max_images if self.image_agg == "concat" else self.embed_dim
        self.out_dim_per_modality = {"text": self.embed_dim, "image": img_out}

    def collate(self, batch: List[Dict[str, Any]]) -> Dict[str, Any]:
        """
        Collate для CLIP: ленивая загрузка изображений, подготовка токенов текста.

        :param batch: Список элементов датасета.
        :return: {'labels': LongTensor[B], 'backend_inputs': {...}}
        """
        labels = torch.tensor([b.get("labels", 0) for b in batch], dtype=torch.long)
        texts = [b.get("text", "") for b in batch]

        images_lists = [b.get("images", []) for b in batch]
        flat_images, counts = [], []
        for lst in images_lists:
            lst = lst if isinstance(lst, list) else ([] if lst is None else [lst])
            counts.append(len(lst))
            for img in lst:
                flat_images.append(to_pil(img))

        text_inputs = self.processor(
            text=texts, padding=True, truncation=True,
            max_length=self.max_length, return_tensors="pt"
        )
        text_inputs = {k: v for k, v in text_inputs.items()}

        if len(flat_images):
            img_proc = self.processor(images=flat_images, return_tensors="pt")
            image_inputs = {"pixel_values": img_proc["pixel_values"]}
        else:
            image_inputs = {"pixel_values": None}

        backend_inputs = {
            "text_inputs": text_inputs,
            "image_inputs": image_inputs,
            "image_counts": torch.tensor(counts, dtype=torch.long)
        }
        return {"labels": labels, "backend_inputs": backend_inputs}

    def _concat_padded(self, embs: torch.Tensor, counts: List[int], max_k: int) -> torch.Tensor:
        """
        Конкатенация до max_k эмбеддингов на сэмпл с паддингом нулями.

        :param embs: Плоские эмбеддинги изображений [M, D], где M = сумма counts.
        :param counts: Количество изображений на сэмпл (длина B).
        :param max_k: Максимум картинок на сэмпл.
        :return: [B, D*max_k], L2-нормированный.
        """
        device = embs.device if embs is not None else torch.device("cpu")
        B = len(counts); D = self.embed_dim
        out = torch.zeros((B, D * max_k), device=device, dtype=embs.dtype if embs is not None else torch.float32)
        if embs is None:
            return out
        offset = 0
        for i, c in enumerate(counts):
            if c > 0:
                chunk = embs[offset:offset + c]
                take = chunk[:max_k]
                if take.size(0) < max_k:
                    pad = torch.zeros((max_k - take.size(0), D), device=device, dtype=embs.dtype)
                    take = torch.cat([take, pad], dim=0)
                out[i] = take.reshape(-1)
            offset += c
        return F.normalize(out, dim=-1)

    def _mean_pool(self, embs: torch.Tensor, counts: List[int]) -> torch.Tensor:
        """
        Усреднение эмбеддингов изображений по сэмплу.

        :param embs: Плоские эмбеддинги [M, D].
        :param counts: Количество изображений на сэмпл (длина B).
        :return: [B, D], L2-нормированный.
        """
        device = embs.device if embs is not None else torch.device("cpu")
        B = len(counts); D = self.embed_dim
        out = torch.zeros((B, D), device=device, dtype=embs.dtype if embs is not None else torch.float32)
        if embs is None:
            return out
        offset = 0
        for i, c in enumerate(counts):
            if c > 0:
                out[i] = embs[offset:offset + c].mean(dim=0)
            offset += c
        return F.normalize(out, dim=-1)

    def encode(self, backend_inputs: Dict[str, Any], device: torch.device) -> Dict[str, torch.Tensor]:
        """
        Кодирует текст/изображения через CLIP и агрегирует изображения.

        :param backend_inputs: Выход collate.
        :param device: Девайс.
        :return: {'text':[B,D], 'image':[B, D*max_images] или [B,D]}.
        """
        ti = {k: v.to(device) for k, v in backend_inputs["text_inputs"].items()}
        text_z = self.model.get_text_features(input_ids=ti["input_ids"], attention_mask=ti["attention_mask"])
        text_z = F.normalize(text_z, dim=-1)

        counts = backend_inputs["image_counts"].tolist()
        pi = backend_inputs["image_inputs"]["pixel_values"]
        if pi is not None:
            pi = pi.to(device)
            img_flat = self.model.get_image_features(pixel_values=pi)
            img_flat = F.normalize(img_flat, dim=-1)
            if self.image_agg == "concat":
                img_z = self._concat_padded(img_flat, counts, self.max_images)
            else:
                img_z = self._mean_pool(img_flat, counts)
        else:
            if self.image_agg == "concat":
                img_z = torch.zeros((len(counts), self.embed_dim * self.max_images), device=device)
            else:
                img_z = torch.zeros((len(counts), self.embed_dim), device=device)

        return {"text": text_z, "image": img_z}


class ClapBackend(BaseBackend):
    """
    Бэкенд CLAP (HF) для модальностей: text + audio.
    Поддерживает несколько аудио per-сэмпл (concat/mean).

    :param checkpoint: Модель CLAP (например, 'laion/clap-htsat-unfused').
    :param freeze: Заморозить ли веса CLAP.
    :param max_audios: Максимум аудио на сэмпл при concat-паде.
    :param audio_agg: 'concat' или 'mean' — как агрегировать несколько аудио.
    """
    name = "clap"
    supported = {"text", "audio"}

    def __init__(
        self,
        checkpoint: str = "laion/clap-htsat-unfused",
        freeze: bool = True,
        max_audios: int = 1,
        audio_agg: str = "concat"
    ):
        super().__init__()
        from transformers import ClapModel, ClapProcessor
        self.model = ClapModel.from_pretrained(checkpoint)
        self.processor = ClapProcessor.from_pretrained(checkpoint)
        self.embed_dim = int(getattr(self.model.config, "projection_dim", 512))
        sr = getattr(self.processor, "sampling_rate", None)
        if sr is None:
            fe = getattr(self.processor, "feature_extractor", None)
            sr = getattr(fe, "sampling_rate", 48000) if fe is not None else 48000
        self.sr = int(sr)
        self.max_audios = int(max_audios)
        self.audio_agg = audio_agg
        if freeze:
            self.freeze_all()
        aud_out = self.embed_dim * self.max_audios if self.audio_agg == "concat" else self.embed_dim
        self.out_dim_per_modality = {"text": self.embed_dim, "audio": aud_out}

    def collate(self, batch: List[Dict[str, Any]]) -> Dict[str, Any]:
        """
        Collate для CLAP: ленивая загрузка и препроцессинг аудио, токены текста.

        :param batch: Список элементов датасета.
        :return: {'labels': LongTensor[B], 'backend_inputs': {...}}
        """
        labels = torch.tensor([b.get("labels", 0) for b in batch], dtype=torch.long)
        texts = [b.get("text", "") for b in batch]

        audios_lists = [b.get("audios", []) for b in batch]
        flat_audios, counts = [], []
        for lst in audios_lists:
            lst = lst if isinstance(lst, list) else ([] if lst is None else [lst])
            counts.append(len(lst))
            for a in lst:
                if isinstance(a, str):
                    flat_audios.append(load_audio(a, self.sr))
                elif isinstance(a, np.ndarray):
                    flat_audios.append(a.astype(np.float32))
                else:
                    raise ValueError("CLAP ожидает путь к аудио или numpy.ndarray")

        text_inputs = self.processor(text=texts, padding=True, truncation=True, return_tensors="pt")
        text_inputs = {k: v for k, v in text_inputs.items()}

        if len(flat_audios):
            aud_proc = self.processor(audios=flat_audios, sampling_rate=self.sr, padding=True, return_tensors="pt")
            audio_inputs = {"input_features": aud_proc["input_features"]}
        else:
            audio_inputs = {"input_features": None}

        backend_inputs = {
            "text_inputs": text_inputs,
            "audio_inputs": audio_inputs,
            "audio_counts": torch.tensor(counts, dtype=torch.long)
        }
        return {"labels": labels, "backend_inputs": backend_inputs}

    def _concat_padded(self, embs: torch.Tensor, counts: List[int], max_k: int) -> torch.Tensor:
        """
        Конкатенация аудио-эмбеддингов (до max_k) с нулевым паддингом.

        :param embs: Плоские эмбеддинги аудио [M, D].
        :param counts: Кол-во аудио на сэмпл (B).
        :param max_k: Максимум аудио на сэмпл.
        :return: [B, D*max_k], L2-нормированный.
        """
        device = embs.device if embs is not None else torch.device("cpu")
        B = len(counts); D = self.embed_dim
        out = torch.zeros((B, D * max_k), device=device, dtype=embs.dtype if embs is not None else torch.float32)
        if embs is None:
            return out
        offset = 0
        for i, c in enumerate(counts):
            if c > 0:
                chunk = embs[offset:offset + c]
                take = chunk[:max_k]
                if take.size(0) < max_k:
                    pad = torch.zeros((max_k - take.size(0), D), device=device, dtype=embs.dtype)
                    take = torch.cat([take, pad], dim=0)
                out[i] = take.reshape(-1)
            offset += c
        return F.normalize(out, dim=-1)

    def _mean_pool(self, embs: torch.Tensor, counts: List[int]) -> torch.Tensor:
        """
        Усреднение аудио-эмбеддингов по сэмплу.

        :param embs: Плоские эмбеддинги [M, D].
        :param counts: Кол-во аудио на сэмпл (B).
        :return: [B, D], L2-нормированный.
        """
        device = embs.device if embs is not None else torch.device("cpu")
        B = len(counts); D = self.embed_dim
        out = torch.zeros((B, D), device=device, dtype=embs.dtype if embs is not None else torch.float32)
        if embs is None:
            return out
        offset = 0
        for i, c in enumerate(counts):
            if c > 0:
                out[i] = embs[offset:offset + c].mean(dim=0)
            offset += c
        return F.normalize(out, dim=-1)

    def encode(self, backend_inputs: Dict[str, Any], device: torch.device) -> Dict[str, torch.Tensor]:
        """
        Кодирует текст/аудио через CLAP и агрегирует аудио.

        :param backend_inputs: Выход collate.
        :param device: Девайс.
        :return: {'text':[B,D], 'audio':[B, D*max_audios] или [B,D]}.
        """
        ti = {k: v.to(device) for k, v in backend_inputs["text_inputs"].items()}
        text_z = self.model.get_text_features(input_ids=ti["input_ids"], attention_mask=ti["attention_mask"])
        text_z = F.normalize(text_z, dim=-1)

        counts = backend_inputs["audio_counts"].tolist()
        af = backend_inputs["audio_inputs"]["input_features"]
        if af is not None:
            af = af.to(device)
            aud_flat = self.model.get_audio_features(input_features=af)
            aud_flat = F.normalize(aud_flat, dim=-1)
            if self.audio_agg == "concat":
                aud_z = self._concat_padded(aud_flat, counts, self.max_audios)
            else:
                aud_z = self._mean_pool(aud_flat, counts)
        else:
            if self.audio_agg == "concat":
                aud_z = torch.zeros((len(counts), self.embed_dim * self.max_audios), device=device)
            else:
                aud_z = torch.zeros((len(counts), self.embed_dim), device=device)

        return {"text": text_z, "audio": aud_z}


class ClipWav2CLIPBackend(BaseBackend):
    """
    Бэкенд: CLIP (text+image) + Wav2CLIP (audio->CLIP пространство). Модальности: ['text','image','audio'].
    Поддержка мульти‑изображений/аудио через concat/mean агрегацию.

    :param checkpoint: Чекпоинт CLIP (HF).
    :param max_length: Максимальная длина токенов текста.
    :param freeze: Замораживать ли веса CLIP.
    :param audio_sr: Частота дискретизации для аудио в wav2clip.
    :param max_images: Максимум изображений на сэмпл (concat).
    :param max_audios: Максимум аудио на сэмпл (concat).
    :param image_agg: 'concat' или 'mean' — агрегация изображений.
    :param audio_agg: 'concat' или 'mean' — агрегация аудио.
    """
    name = "clip_wav2clip"
    supported = {"text", "image", "audio"}

    def __init__(
        self,
        checkpoint: str = "openai/clip-vit-base-patch32",
        max_length: int = 77,
        freeze: bool = True,
        audio_sr: int = 16000,
        max_images: int = 1,
        max_audios: int = 1,
        image_agg: str = "concat",
        audio_agg: str = "concat"
    ):
        super().__init__()
        from transformers import CLIPModel, CLIPProcessor
        self.model = CLIPModel.from_pretrained(checkpoint)
        self.processor = CLIPProcessor.from_pretrained(checkpoint)
        self.embed_dim = int(self.model.config.projection_dim)
        self.max_length = max_length
        self.audio_sr = int(audio_sr)
        self.max_images = int(max_images)
        self.max_audios = int(max_audios)
        self.image_agg = image_agg
        self.audio_agg = audio_agg
        if freeze:
            self.freeze_all()
        img_out = self.embed_dim * self.max_images if self.image_agg == "concat" else self.embed_dim
        aud_out = self.embed_dim * self.max_audios if self.audio_agg == "concat" else self.embed_dim
        self.out_dim_per_modality = {"text": self.embed_dim, "image": img_out, "audio": aud_out}
        # Lazy инициализация wav2clip
        self._w2c_model = None
        self._w2c_mod = None
        self._w2c_api = None
        self._w2c_device = None

    def _ensure_w2c(self, device: torch.device):
        """
        Готовит wav2clip к использованию с учётом разных API вариантов (load_model/get_model/Wav2CLIP).

        :param device: Девайс выполнения.
        :raises RuntimeError: Если wav2clip не установлен или не удалось загрузить модель.
        """
        if self._w2c_model is not None and self._w2c_device == str(device):
            return
        import importlib
        try:
            w2c = importlib.import_module("wav2clip")
        except Exception as e:
            raise RuntimeError("Не найден пакет 'wav2clip'. Установите: pip install wav2clip") from e
        dev_str = str(device) if device.type == "cuda" else "cpu"
        if hasattr(w2c, "load_model"):
            model = w2c.load_model(device=dev_str); api_kind = "func"
        elif hasattr(w2c, "get_model"):
            model = w2c.get_model(device=dev_str); api_kind = "func"
        elif hasattr(w2c, "Wav2CLIP"):
            try:
                model = w2c.Wav2CLIP(dev_str)
            except TypeError:
                model = w2c.Wav2CLIP(device=dev_str)
            api_kind = "method"
        else:
            raise RuntimeError("wav2clip установлен, но нет способов загрузки (load_model/get_model/Wav2CLIP)")
        self._w2c_mod = w2c
        self._w2c_model = model
        self._w2c_api = api_kind
        self._w2c_device = str(device)

    def collate(self, batch: List[Dict[str, Any]]) -> Dict[str, Any]:
        """
        Collate: готовит токены текста/пиксели изображения, пакует аудио в padded-матрицу.

        :param batch: Список элементов датасета.
        :return: {'labels': LongTensor[B], 'backend_inputs': {...}}
        """
        labels = torch.tensor([b.get("labels", 0) for b in batch], dtype=torch.long)
        texts = [b.get("text", "") for b in batch]

        # images (flatten + counts)
        images_lists = [b.get("images", []) for b in batch]
        flat_images, img_counts = [], []
        for lst in images_lists:
            lst = lst if isinstance(lst, list) else ([] if lst is None else [lst])
            img_counts.append(len(lst))
            for img in lst:
                flat_images.append(to_pil(img))
        text_inputs = self.processor(text=texts, padding=True, truncation=True, max_length=self.max_length, return_tensors="pt")
        text_inputs = {k: v for k, v in text_inputs.items()}
        if len(flat_images):
            img_proc = self.processor(images=flat_images, return_tensors="pt")
            image_inputs = {"pixel_values": img_proc["pixel_values"]}
        else:
            image_inputs = {"pixel_values": None}

        # audios (flatten + counts)
        audios_lists = [b.get("audios", []) for b in batch]
        flat_audios, aud_counts = [], []
        for lst in audios_lists:
            lst = lst if isinstance(lst, list) else ([] if lst is None else [lst])
            aud_counts.append(len(lst))
            for a in lst:
                if isinstance(a, str):
                    flat_audios.append(load_audio(a, self.audio_sr))
                elif isinstance(a, np.ndarray):
                    flat_audios.append(a.astype(np.float32))
                else:
                    raise ValueError("Ожидается путь к аудио или numpy.ndarray")

        if len(flat_audios):
            Lmax = max(len(a) for a in flat_audios)
            wav = np.zeros((len(flat_audios), Lmax), dtype=np.float32)
            lens = np.zeros((len(flat_audios),), dtype=np.int64)
            for i, a in enumerate(flat_audios):
                L = len(a)
                wav[i, :L] = a
                lens[i] = L
            audio_inputs = {
                "waveforms": torch.from_numpy(wav),   # [M, Lmax]
                "lengths": torch.from_numpy(lens)     # [M]
            }
        else:
            audio_inputs = {"waveforms": None, "lengths": None}

        backend_inputs = {
            "text_inputs": text_inputs,
            "image_inputs": image_inputs,
            "image_counts": torch.tensor(img_counts, dtype=torch.long),
            "audio_inputs": audio_inputs,
            "audio_counts": torch.tensor(aud_counts, dtype=torch.long),
        }
        return {"labels": labels, "backend_inputs": backend_inputs}

    def _agg_concat(self, embs: torch.Tensor, counts: List[int], max_k: int) -> torch.Tensor:
        """
        Конкатенация эмбеддингов (до max_k) с нулевым паддингом.

        :param embs: Плоские эмбеддинги [M, D].
        :param counts: Кол-во элементов на сэмпл (B).
        :param max_k: Максимум элементов на сэмпл.
        :return: [B, D*max_k], L2-нормированный.
        """
        device = embs.device if embs is not None else torch.device("cpu")
        B = len(counts); D = self.embed_dim
        out = torch.zeros((B, D * max_k), device=device, dtype=embs.dtype if embs is not None else torch.float32)
        if embs is None:
            return out
        offset = 0
        for i, c in enumerate(counts):
            if c > 0:
                chunk = embs[offset:offset+c]
                take = chunk[:max_k]
                if take.size(0) < max_k:
                    pad = torch.zeros((max_k - take.size(0), D), device=device, dtype=embs.dtype)
                    take = torch.cat([take, pad], dim=0)
                out[i] = take.reshape(-1)
            offset += c
        return F.normalize(out, dim=-1)

    def _agg_mean(self, embs: torch.Tensor, counts: List[int]) -> torch.Tensor:
        """
        Усреднение эмбеддингов по сэмплу.

        :param embs: Плоские эмбеддинги [M, D].
        :param counts: Кол-во элементов на сэмпл (B).
        :return: [B, D], L2-нормированный.
        """
        device = embs.device if embs is not None else torch.device("cpu")
        B = len(counts); D = self.embed_dim
        out = torch.zeros((B, D), device=device, dtype=embs.dtype if embs is not None else torch.float32)
        if embs is None:
            return out
        offset = 0
        for i, c in enumerate(counts):
            if c > 0:
                out[i] = embs[offset:offset+c].mean(dim=0)
            offset += c
        return F.normalize(out, dim=-1)

    def encode(self, backend_inputs: Dict[str, Any], device: torch.device) -> Dict[str, torch.Tensor]:
        """
        Кодирует текст/изображение через CLIP и аудио через wav2clip, далее агрегирует.

        :param backend_inputs: Выход collate.
        :param device: Девайс.
        :return: {'text':[B,*], 'image':[B,*], 'audio':[B,*]}.
        """
        # text
        ti = {k: v.to(device) for k, v in backend_inputs["text_inputs"].items()}
        text_z = self.model.get_text_features(input_ids=ti["input_ids"], attention_mask=ti["attention_mask"])
        text_z = F.normalize(text_z, dim=-1)

        # image
        img_counts = backend_inputs["image_counts"].tolist()
        pi = backend_inputs["image_inputs"]["pixel_values"]
        if pi is not None:
            pi = pi.to(device)
            img_flat = self.model.get_image_features(pixel_values=pi)
            img_flat = F.normalize(img_flat, dim=-1)
            if self.image_agg == "concat":
                image_z = self._agg_concat(img_flat, img_counts, self.max_images)
            else:
                image_z = self._agg_mean(img_flat, img_counts)
        else:
            if self.image_agg == "concat":
                image_z = torch.zeros((len(img_counts), self.embed_dim * self.max_images), device=device)
            else:
                image_z = torch.zeros((len(img_counts), self.embed_dim), device=device)

        # audio via wav2clip
        aud_counts = backend_inputs["audio_counts"].tolist()
        wav = backend_inputs["audio_inputs"]["waveforms"]
        lens = backend_inputs["audio_inputs"]["lengths"]
        if wav is not None and lens is not None and (lens.numel() if torch.is_tensor(lens) else len(lens)) > 0:
            self._ensure_w2c(device)
            w2c = self._w2c_mod
            waves = wav  # [M, Lmax] (CPU тензор — ок)
            lens_np = lens.cpu().numpy()
            embs = []
            for i in range(waves.size(0)):
                L = int(lens_np[i])
                a_np = waves[i, :L].detach().cpu().numpy()
                e = None
                if self._w2c_api == "func":
                    if hasattr(w2c, "embed_audio"):
                        try:
                            e = w2c.embed_audio(a_np, self._w2c_model)
                        except TypeError:
                            e = w2c.embed_audio(a_np, self.audio_sr, self._w2c_model)
                    elif hasattr(w2c, "get_audio_embedding"):
                        try:
                            e = w2c.get_audio_embedding(a_np, self._w2c_model)
                        except TypeError:
                            e = w2c.get_audio_embedding(a_np, self.audio_sr, self._w2c_model)
                if e is None and self._w2c_api == "method" and hasattr(self._w2c_model, "embed_audio"):
                    try:
                        e = self._w2c_model.embed_audio(a_np)
                    except TypeError:
                        e = self._w2c_model.embed_audio(a_np, sr=self.audio_sr)
                if e is None:
                    raise RuntimeError("Не удалось получить аудио‑эмбеддинг через wav2clip.")
                e = np.asarray(e)
                if e.ndim == 2:
                    e = e.mean(axis=0)
                elif e.ndim > 2:
                    e = e.reshape(-1, e.shape[-1]).mean(axis=0)
                embs.append(e.astype(np.float32))
            aud_flat = torch.tensor(np.stack(embs, axis=0), dtype=torch.float32, device=device)
            aud_flat = F.normalize(aud_flat, dim=-1)
            if self.audio_agg == "concat":
                audio_z = self._agg_concat(aud_flat, aud_counts, self.max_audios)
            else:
                audio_z = self._agg_mean(aud_flat, aud_counts)
        else:
            if self.audio_agg == "concat":
                audio_z = torch.zeros((len(aud_counts), self.embed_dim * self.max_audios), device=device)
            else:
                audio_z = torch.zeros((len(aud_counts), self.embed_dim), device=device)

        return {"text": text_z, "image": image_z, "audio": audio_z}


class SingleBackboneClassifier(nn.Module):
    """
    Классификатор поверх одного мультимодального бэкенда: encode -> fuse -> MLP голова.

    :param backend: Экземпляр бэкенда (CLIP/CLAP/ClipWav2CLIP).
    :param modalities: Активные модальности (учёт порядка важен при concat): подмножество ['image','text','audio'].
    :param num_labels: Количество классов.
    :param fusion: 'concat' (объединение признаков) или 'mean' (среднее по модальностям).
    :param hidden: Размер скрытого слоя головы.
    :param dropout: Дропаут в голове.
    """
    def __init__(
        self,
        backend: BaseBackend,
        modalities: List[str],
        num_labels: int,
        fusion: str = "concat",
        hidden: int = 512,
        dropout: float = 0.1
    ):
        super().__init__()
        self.backend = backend
        self.modalities = modalities
        self.fusion = fusion
        self.num_labels = num_labels

        order = [m for m in ["image", "text", "audio"] if m in self.modalities]
        if fusion == "concat":
            in_dim = sum(self.backend.get_out_dim(m) for m in order)
        elif fusion == "mean":
            dims = [self.backend.get_out_dim(m) for m in order]
            if len(set(dims)) != 1:
                raise ValueError(f'Для fusion="mean" размеры модальностей должны совпадать, а у нас: {dict(zip(order, dims))}')
            in_dim = dims[0]
        else:
            raise ValueError('fusion должен быть "concat" или "mean"')

        self.head = nn.Sequential(
            nn.LayerNorm(in_dim),
            nn.Linear(in_dim, hidden),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(hidden, num_labels)
        )

    def _infer_device_from_inputs(self, obj) -> torch.device:
        """
        Находит девайс по первому тензору во входах; иначе выбирает доступный cuda/cpu.

        :param obj: Любая структура с тензорами.
        :return: torch.device.
        """
        if isinstance(obj, torch.Tensor):
            return obj.device
        if isinstance(obj, dict):
            for v in obj.values():
                d = self._infer_device_from_inputs(v)
                if d is not None:
                    return d
        return torch.device("cuda" if torch.cuda.is_available() else "cpu")

    def _fuse(self, z: Dict[str, torch.Tensor]) -> torch.Tensor:
        """
        Объединяет эмбеддинги модальностей согласно self.fusion.

        :param z: Словарь эмбеддингов по модальностям.
        :return: Fused тензор [B, *].
        """
        order = [m for m in ["image", "text", "audio"] if m in self.modalities]
        feats = []
        for m in order:
            t = z[m]
            if t.dim() == 3:
                t = t.mean(dim=1)
            elif t.dim() > 3:
                t = t.view(t.size(0), -1)
            feats.append(t)
        if self.fusion == "concat":
            return torch.cat(feats, dim=-1)
        elif self.fusion == "mean":
            sizes = [f.size(-1) for f in feats]
            if len(set(sizes)) != 1:
                raise ValueError(f'Для fusion="mean" размеры модальностей должны совпадать, а у нас: {sizes}')
            return torch.stack(feats, dim=0).mean(dim=0)

    def forward(self, backend_inputs: Dict[str, Any], labels: Optional[torch.Tensor] = None):
        """
        Прямой проход модели.

        :param backend_inputs: Входы для бэкенда (из его collate).
        :param labels: Игнорируется (loss считает Trainer).
        :return: SequenceClassifierOutput с logits [B, num_labels].
        """
        device = self._infer_device_from_inputs(backend_inputs)
        z = self.backend.encode(backend_inputs, device=device)
        fused = self._fuse(z)
        logits = self.head(fused)
        return SequenceClassifierOutput(logits=logits)

    @torch.no_grad()
    def get_embeddings(self, backend_inputs: Dict[str, Any], return_per_modality: bool = False):
        """
        Извлекает fused эмбеддинги (и опционально по модальностям).

        :param backend_inputs: Входы для бэкенда.
        :param return_per_modality: Вернуть также словарь {'text','image','audio'}.
        :return: fused [B, *] или (fused, per_modality).
        """
        device = self._infer_device_from_inputs(backend_inputs)
        z = self.backend.encode(backend_inputs, device=device)
        fused = self._fuse(z)
        if return_per_modality:
            return fused, z
        return fused


class WeightedCETrainer(Trainer):
    """
    Trainer с CrossEntropyLoss и поддержкой весов классов.
    При отсутствии class_weights может вычислять их по частотам train-меток.

    :param num_labels: Количество классов.
    :param train_labels: Список/массив train-меток (int).
    :param class_weights: Веса классов (list/np.ndarray/torch.Tensor).
    """
    def __init__(self, *args, num_labels=None, train_labels=None, class_weights=None, **kwargs):
        super().__init__(*args, **kwargs)
        self.num_labels = num_labels
        if class_weights is not None:
            w = torch.as_tensor(class_weights, dtype=torch.float32)
        else:
            w = None
            if train_labels is not None and num_labels is not None:
                train_labels = np.asarray(train_labels).astype(int)
                counts = np.bincount(train_labels, minlength=num_labels)
                n = counts.sum()
                weights = np.zeros(num_labels, dtype=np.float32)
                nonzero = counts > 0
                weights[nonzero] = n / (num_labels * counts[nonzero].astype(np.float32))
                w = torch.tensor(weights, dtype=torch.float32)
        self.class_weights = w
        self._warned_label_tiling = False

    def compute_loss(self, model, inputs, return_outputs: bool = False, num_items_in_batch: Optional[int] = None):
        """
        Считает CrossEntropyLoss с опциональными весами классов. Защищается от случая DataParallel,
        когда logits дублируются по числу GPU.

        :param model: Модель.
        :param inputs: Батч: {'labels': LongTensor[B], 'backend_inputs': {...}}.
        :param return_outputs: Возвращать ли outputs вместе с loss.
        :param num_items_in_batch: Совместимость с Trainer API (не используется).
        :return: loss (и outputs, если return_outputs=True).
        :raises ValueError: Если размеры batch не согласованы.
        """
        labels = inputs.pop("labels")
        outputs = model(**inputs)
        logits = outputs.logits
        labels = labels.to(logits.device)

        if logits.size(0) != labels.size(0):
            ngpu = torch.cuda.device_count()
            if ngpu > 1 and logits.size(0) == labels.size(0) * ngpu:
                labels = labels.repeat_interleave(ngpu)
                if not self._warned_label_tiling:
                    print(f"[Warning] DataParallel удвоил batch для logits. "
                          f"Повторяем labels x{ngpu}. logits: {tuple(logits.shape)}, labels: {tuple(labels.shape)}")
                    self._warned_label_tiling = True
            else:
                raise ValueError(f"Batch size mismatch: logits {tuple(logits.shape)} vs labels {tuple(labels.shape)}")

        weight = self.class_weights.to(logits.device) if self.class_weights is not None else None
        loss = nn.CrossEntropyLoss(weight=weight)(logits, labels.long())
        return (loss, outputs) if return_outputs else loss


class PbarConsoleLogger(TrainerCallback):
    def __init__(self, pbar):
        self.pbar = pbar
        self.last_logs = {}
        self.last_train_loss = None
        self.printed_eval_steps = set()

    def _step(self, state) -> int:
        return int(state.global_step or 0)

    def _fmt_postfix(self):
        parts = []
        if 'loss' in self.last_logs:
            parts.append(f"loss {self.last_logs['loss']:.4f}")
        if 'eval_loss' in self.last_logs:
            parts.append(f"val {self.last_logs['eval_loss']:.4f}")
        for k, v in self.last_logs.items():
            if k.startswith('eval_') and k not in (
                'eval_loss', 'eval_runtime', 'eval_samples_per_second', 'eval_steps_per_second', 'epoch'
            ):
                parts.append(f"{k.replace('eval_', '')} {v:.4f}")
        return " | ".join(parts)

    def on_step_end(self, args, state, control, **kwargs):
        n = min(self._step(state), self.pbar.total)
        if n > self.pbar.n:
            self.pbar.update(n - self.pbar.n)
        if self.last_logs:
            self.pbar.set_postfix_str(self._fmt_postfix(), refresh=False)
        self.pbar.refresh()

    def on_log(self, args, state, control, logs=None, **kwargs):
        if not logs:
            return

        for k, v in logs.items():
            if isinstance(v, (int, float)):
                self.last_logs[k] = float(v)
        if 'loss' in logs and isinstance(logs['loss'], (int, float)):
            self.last_train_loss = float(logs['loss'])

        self.pbar.set_postfix_str(self._fmt_postfix(), refresh=False)
        self.pbar.refresh()

        if any(k.startswith('eval_') for k in logs.keys()):
            step = self._step(state)
            if step in self.printed_eval_steps:
                return
            self.printed_eval_steps.add(step)

            train_loss_str = f"{self.last_train_loss:.10f}" if self.last_train_loss is not None else "n/a"
            val_loss = logs.get('eval_loss', None)
            val_loss_str = f"{float(val_loss):.10g}" if isinstance(val_loss, (int, float)) else "n/a"

            exclude = {'eval_loss', 'eval_runtime', 'eval_samples_per_second', 'eval_steps_per_second', 'epoch'}
            extra_parts = []
            for k, v in logs.items():
                if k.startswith('eval_') and k not in exclude:
                    metric_name = k.replace('eval_', '')
                    extra_parts.append(f"val {metric_name}: {float(v):.10f}")

            line = f"step: {step}, train loss: {train_loss_str}, val loss: {val_loss_str}"
            if extra_parts:
                line += ", " + ", ".join(extra_parts)
            tqdm.write(line)

    def on_train_end(self, args, state, control, **kwargs):
        n = min(self._step(state), self.pbar.total)
        if n > self.pbar.n:
            self.pbar.update(n - self.pbar.n)
        self.pbar.refresh()


class SingleModelMultiComboClassification:
    """
    Пайплайн: одна мультимодальная модель (бэкенд) + классификационная голова + HuggingFace Trainer.

    Поддерживаемые комбинации модальностей:
      - ['text','image']         -> ClipBackend (CLIP, HF)
      - ['text','audio']         -> ClapBackend (CLAP, HF)
      - ['image','audio']        -> ClipWav2CLIPBackend
      - ['text','image','audio'] -> ClipWav2CLIPBackend

    Возможности:
      - Мульти-изображения/аудио per-сэмпл (concat/mean агрегация).
      - Обучение на больших данных: чанковая подстановка train_dataset, стабильный прогресс‑бар и логи.
      - Взвешенная CrossEntropy по частотам классов (для дисбаланса).

    Необходимые импорты:
    !pip install -q wav2clip torchaudio evaluate pillow
    import gc
    import math
    from typing import List, Dict, Any, Optional, Union
    import numpy as np
    import pandas as pd
    from PIL import Image
    import torch
    import torch.nn as nn
    import torch.nn.functional as F
    from torch.utils.data import Dataset, DataLoader
    import evaluate
    from transformers import Trainer, TrainingArguments, TrainerCallback, PrinterCallback
    from transformers.modeling_outputs import SequenceClassifierOutput
    from tqdm.auto import tqdm
    """
    def __init__(
        self,
        modalities: List[str],
        num_labels: int,
        target_column_name: str,
        text_columns: Optional[List[str]] = None,
        image_columns: Optional[List[str]] = None,
        audio_columns: Optional[List[str]] = None,
        backend: str = "auto",
        clip_checkpoint: str = "openai/clip-vit-base-patch32",
        clap_checkpoint: str = "laion/clap-htsat-unfused",
        fusion: str = "concat",
        freeze_backbone: bool = True,
        clip_max_length: int = 77,
        max_images_per_sample: int = 1,
        max_audios_per_sample: int = 1
    ):
        """
            :param modalities: Список модальностей ('text','image','audio') в любом порядке.
        :param num_labels: Количество классов.
        :param target_column_name: Имя столбца таргета в DataFrame.
        :param text_columns: Имена текстовых колонок (склеиваются).
        :param image_columns: Имена колонок изображений (значения — пути или списки путей/объектов).
        :param audio_columns: Имена колонок аудио (значения — пути/массивы или списки).
        :param backend: 'auto' | 'clip' | 'clap' | 'clip_wav2clip'.
        :param clip_checkpoint: Чекпоинт CLIP.
        :param clap_checkpoint: Чекпоинт CLAP.
        :param fusion: 'concat' или 'mean' — тип фьюжна эмбеддингов.
        :param freeze_backbone: Заморозить веса бэкенда (linear probing).
        :param clip_max_length: Максимальная длина токенов в CLIP.
        :param max_images_per_sample: Максимум картинок при concat-агрегации.
        :param max_audios_per_sample: Максимум аудио при concat-агрегации.
        """
        self.modalities = sorted(list(set(modalities)))
        self.num_labels = num_labels
        self.target_column_name = target_column_name
        self.text_columns = text_columns or []
        self.image_columns = image_columns or []
        self.audio_columns = audio_columns or []
        self.backend_name = backend
        self.clip_checkpoint = clip_checkpoint
        self.clap_checkpoint = clap_checkpoint
        self.fusion = fusion
        self.freeze_backbone = freeze_backbone
        self.clip_max_length = clip_max_length
        self.max_images_per_sample = int(max_images_per_sample)
        self.max_audios_per_sample = int(max_audios_per_sample)

        self.label2id: Optional[Dict[Any, int]] = None
        self.id2label: Optional[Dict[int, str]] = None

        self.backend: Optional[BaseBackend] = None
        self.model: Optional[SingleBackboneClassifier] = None
        self.trainer: Optional[Trainer] = None
        self.compute_metrics = None
        self.progress_callback: Optional[PbarConsoleLogger] = None

        self._build_backend()

    def _build_backend(self):
        """
        Инициализирует бэкенд согласно backend='auto'|'clip'|'clap'|'clip_wav2clip' и проверяет совместимость модальностей.

        :raises ValueError: При неподдерживаемой комбинации модальностей.
        """
        mods = set(self.modalities)
        name = self.backend_name
        if name == "auto":
            if mods == {"text", "image"}:
                name = "clip"
            elif mods == {"text", "audio"}:
                name = "clap"
            elif mods in ({"image", "audio"}, {"text", "image", "audio"}):
                name = "clip_wav2clip"
            else:
                raise ValueError(f"Неподдерживаемая комбинация модальностей: {mods}")

        if name == "clip":
            self.backend = ClipBackend(
                checkpoint=self.clip_checkpoint,
                max_length=self.clip_max_length,
                freeze=self.freeze_backbone,
                max_images=self.max_images_per_sample,
                image_agg="concat"
            )
        elif name == "clap":
            self.backend = ClapBackend(
                checkpoint=self.clap_checkpoint,
                freeze=self.freeze_backbone,
                max_audios=self.max_audios_per_sample,
                audio_agg="concat"
            )
        elif name == "clip_wav2clip":
            self.backend = ClipWav2CLIPBackend(
                checkpoint=self.clip_checkpoint,
                max_length=self.clip_max_length,
                freeze=self.freeze_backbone,
                audio_sr=16000,
                max_images=self.max_images_per_sample,
                max_audios=self.max_audios_per_sample,
                image_agg="concat",
                audio_agg="concat"
            )
        else:
            raise ValueError(f"Неизвестный backend: {name}")

        if not set(self.modalities).issubset(self.backend.supported):
            raise ValueError(f"Бэкенд {self.backend.name} не поддерживает модальности {self.modalities}. "
                             f"Поддерживает: {self.backend.supported}")

    def _setup_metrics(self, metric_name: str):
        """
        Создаёт функцию подсчёта метрик для Trainer.

        :param metric_name: 'f1' или 'accuracy'.
        :raises ValueError: Если метрика не поддерживается.
        """
        metric_name = metric_name.lower()
        if metric_name == "f1":
            metric = evaluate.load("f1")
            def compute(p):
                preds = p.predictions.argmax(-1)
                return metric.compute(predictions=preds, references=p.label_ids, average="weighted")
        elif metric_name == "accuracy":
            metric = evaluate.load("accuracy")
            def compute(p):
                preds = p.predictions.argmax(-1)
                return metric.compute(predictions=preds, references=p.label_ids)
        else:
            raise ValueError('metric_name должен быть "f1" или "accuracy"')
        self.compute_metrics = compute

    def _split(self, df: pd.DataFrame, test_size: float = 0.2, seed: int = 42):
        """
        Перемешивает и делит DataFrame на train/eval.

        :param df: Полный датафрейм.
        :param test_size: Доля валидации (0..1).
        :param seed: Зерно.
        :return: (df_train, df_eval).
        """
        df = df.sample(frac=1.0, random_state=seed).reset_index(drop=True)
        n_eval = int(math.ceil(len(df) * test_size))
        return df.iloc[n_eval:].reset_index(drop=True), df.iloc[:n_eval].reset_index(drop=True)

    def _validate_data(self, df: pd.DataFrame):
        """
        Проверяет соответствие колонок выбранным модальностям.

        :param df: Источник данных.
        :raises ValueError: При отсутствии необходимых колонок.
        """
        if "text" in self.modalities:
            if not self.text_columns:
                raise ValueError("Вы выбрали модальность 'text', но text_columns пустой.")
            missing = [c for c in self.text_columns if c not in df.columns]
            if missing:
                raise ValueError(f"В DataFrame отсутствуют текстовые колонки: {missing}")

        if "image" in self.modalities:
            if not self.image_columns:
                raise ValueError("Вы выбрали модальность 'image', но image_columns пуст.")
            missing = [c for c in self.image_columns if c not in df.columns]
            if missing:
                raise ValueError(f"В DataFrame отсутствуют колонки изображений: {missing}")

        if "audio" in self.modalities:
            if not self.audio_columns:
                raise ValueError("Вы выбрали модальность 'audio', но audio_columns пуст.")
            missing = [c for c in self.audio_columns if c not in df.columns]
            if missing:
                raise ValueError(f"В DataFrame отсутствуют колонки аудио: {missing}")

    def fit(
        self,
        train_data: pd.DataFrame,
        epochs: int = 3,
        test_size: float = 0.2,
        per_device_train_batch_size: int = 16,
        gradient_accumulation_steps: int = 1,
        learning_rate: float = 2e-4,
        metric_name: str = "f1",
        fp16: bool = True,
        logging_steps: int = 50,
        eval_steps: int = 200,
        output_dir: str = "./result",
        seed: int = 42,
        hidden: int = 512,
        dropout: float = 0.1,
        fit_chunk_size: Optional[int] = None
    ):
        """
        Обучает классификационную голову поверх выбранного бэкенда.
        Поддерживает обучение на больших данных за счёт чанков: train_dataset подставляется кусками.

        :param train_data: Полный датафрейм с данными и таргетом.
        :param epochs: Количество эпох.
        :param test_size: Доля валидации.
        :param per_device_train_batch_size: Размер батча на устройство.
        :param gradient_accumulation_steps: Шаги аккумуляции градиента.
        :param learning_rate: Learning rate для оптимизатора.
        :param metric_name: 'f1' или 'accuracy' — метрика выбора лучшей модели.
        :param fp16: Использовать fp16 при наличии CUDA (если доступен bf16 — он будет использован вместо fp16).
        :param logging_steps: Частота логирования шагов.
        :param eval_steps: Шаги между валидациями/сохранениями.
        :param output_dir: Каталог для артефактов.
        :param seed: Зерно.
        :param hidden: Размер скрытого слоя головы.
        :param dropout: Дропаут в голове.
        :param fit_chunk_size: Размер чанка обучающей выборки. Если None — весь train как один чанк.
        :return: self.
        """
        self._validate_data(train_data)
        set_seed(seed)

        classes = sorted(train_data[self.target_column_name].unique().tolist())
        if self.num_labels != len(classes):
            print(f"Warning: num_labels={self.num_labels} != len(classes)={len(classes)}")
        self.label2id = {c: i for i, c in enumerate(classes)}
        self.id2label = {i: str(c) for c, i in self.label2id.items()}

        df_train, df_eval = self._split(train_data, test_size=test_size, seed=seed)

        # Датасет валидации держим целиком (обычно небольшой).
        ds_eval = MultiComboDataset(
            df_eval, self.target_column_name, self.label2id,
            self.text_columns, self.image_columns, self.audio_columns
        )

        # Веса классов по всему train (не по чанку).
        y_train_all = np.array([self.label2id[y] for y in df_train[self.target_column_name].tolist()], dtype=int)
        counts = np.bincount(y_train_all, minlength=self.num_labels)
        n_all = counts.sum()
        class_weights = np.zeros(self.num_labels, dtype=np.float32)
        nonzero = counts > 0
        class_weights[nonzero] = n_all / (self.num_labels * counts[nonzero].astype(np.float32))

        # Модель и метрики
        self.model = SingleBackboneClassifier(
            backend=self.backend,
            modalities=self.modalities,
            num_labels=self.num_labels,
            fusion=self.fusion,
            hidden=hidden,
            dropout=dropout
        )
        self._setup_metrics(metric_name)

        # Настройки точности
        bf16_ok = bool(torch.cuda.is_available() and torch.cuda.get_device_capability()[0] >= 8)

        args = TrainingArguments(
            output_dir=output_dir,
            num_train_epochs=epochs,
            per_device_train_batch_size=per_device_train_batch_size,
            per_device_eval_batch_size=max(4, per_device_train_batch_size // 2),
            gradient_accumulation_steps=gradient_accumulation_steps,
            eval_accumulation_steps=max(1, gradient_accumulation_steps * 2),
            learning_rate=learning_rate,
            warmup_ratio=0.1,
            lr_scheduler_type="cosine",
            weight_decay=0.01,
            eval_strategy="steps",
            eval_steps=eval_steps,
            save_strategy="steps",
            save_steps=eval_steps,
            load_best_model_at_end=True,
            metric_for_best_model=f"eval_{metric_name}",
            save_total_limit=1,
            logging_strategy="steps",
            logging_steps=logging_steps,
            report_to="none",
            fp16=bool(fp16 and torch.cuda.is_available() and not bf16_ok),
            bf16=bool(bf16_ok and not fp16),
            dataloader_num_workers=0,
            seed=seed,
            remove_unused_columns=False,
            disable_tqdm=True  # используем свой внешний tqdm
        )

        def data_collator(batch_list: List[Dict[str, Any]]) -> Dict[str, Any]:
            """
            Адаптер collate для Trainer: делегирует бэкенду сборку батча.

            :param batch_list: Список элементов Dataset.
            :return: Батч для model.forward(): {'labels': LongTensor, 'backend_inputs': {...}}.
            """
            return self.backend.collate(batch_list)

        # Вспомогательные функции для чанков
        def steps_for_size(sz: int, bsz: int, accum: int) -> int:
            """
            Оценивает число шагов оптимизации на чанке размера sz.

            :param sz: Количество примеров в чанке.
            :param bsz: Размер батча.
            :param accum: Шаги аккумуляции.
            :return: Число оптимизационных шагов.
            """
            return max(0, math.ceil(math.ceil(sz / max(1, bsz)) / max(1, accum)))

        def chunk_slices(index_array: np.ndarray, chunk_size: int):
            """
            Генератор срезов индексов по chunk_size.

            :param index_array: Индексы обучающей выборки.
            :param chunk_size: Размер чанка.
            :yield: Срез индексов.
            """
            for i in range(0, len(index_array), chunk_size):
                yield index_array[i:i + chunk_size]

        # Индексы train
        n_train = len(df_train)
        rng = np.random.default_rng(seed)
        train_idx = np.arange(n_train)

        # Чанк по умолчанию — весь train
        chunk_size = fit_chunk_size if (fit_chunk_size and fit_chunk_size > 0) else len(train_idx)

        # Предварительный рассчёт общего числа шагов (для прогресс‑бара и планировщика)
        total_steps = 0
        for _ in range(epochs):
            rng.shuffle(train_idx)
            for slc in chunk_slices(train_idx, chunk_size):
                total_steps += steps_for_size(len(slc), per_device_train_batch_size, gradient_accumulation_steps)

        # Инициализация Trainer с «пустым» train датасетом (минимальный чанк), чтобы не держать весь train
        dummy_idx = np.arange(min(len(df_train), 1))
        ds_train_init = MultiComboDataset(
            df_train.iloc[dummy_idx], self.target_column_name, self.label2id,
            self.text_columns, self.image_columns, self.audio_columns
        ) if len(dummy_idx) > 0 else ds_eval

        self.trainer = WeightedCETrainer(
            model=self.model,
            args=args,
            train_dataset=ds_train_init,
            eval_dataset=ds_eval,
            data_collator=data_collator,
            compute_metrics=self.compute_metrics,
            num_labels=self.num_labels,
            train_labels=y_train_all,
            class_weights=class_weights
        )
        self.trainer.remove_callback(PrinterCallback)

        # Планировщик на рассчитанное количество шагов
        if total_steps > 0:
            self.trainer.create_optimizer_and_scheduler(num_training_steps=total_steps)

        # Внешний прогресс‑бар + консольный лог
        pbar = tqdm(total=total_steps, desc="Training Progress", unit="step")
        cb = PbarConsoleLogger(pbar)
        self.trainer.add_callback(cb)
        self.progress_callback = cb

        # Основной цикл обучения по эпохам и чанкам
        steps_done = 0
        for ep in range(epochs):
            rng = np.random.default_rng(seed + ep)
            shuffled = np.arange(n_train)
            rng.shuffle(shuffled)

            for slc in chunk_slices(shuffled, chunk_size):
                # Подставляем чанк
                chunk_df = df_train.iloc[slc]
                ds_chunk = MultiComboDataset(
                    chunk_df, self.target_column_name, self.label2id,
                    self.text_columns, self.image_columns, self.audio_columns
                )
                self.trainer.train_dataset = ds_chunk

                # Считаем шаги на чанке и настраиваем max_steps Trainer
                chunk_steps = steps_for_size(len(slc), per_device_train_batch_size, gradient_accumulation_steps)
                if chunk_steps == 0:
                    del ds_chunk, chunk_df
                    continue

                self.trainer.args.max_steps = steps_done + chunk_steps
                self.trainer.train()
                steps_done += chunk_steps

                # Очистка памяти между чанками
                del ds_chunk, chunk_df
                gc.collect()
                if torch.cuda.is_available():
                    torch.cuda.empty_cache()

        pbar.close()
        return self

    def predict(self, df: pd.DataFrame, return_label_str: bool = False) -> np.ndarray:
        """
        Делает предсказания классов на новых данных.

        :param df: Датафрейм с теми же колонками модальностей, что и при обучении.
        :param return_label_str: Если True — вернуть строковые метки; иначе — id.
        :return: np.ndarray предсказанных меток.
        :raises RuntimeError: Если модель ещё не обучена.
        """
        if self.trainer is None:
            raise RuntimeError("Модель не обучена. Вызовите .fit().")
        df_c = df.copy()
        if self.target_column_name not in df_c.columns:
            df_c[self.target_column_name] = list(self.label2id.keys())[0]
        ds = MultiComboDataset(
            df_c, self.target_column_name, self.label2id,
            self.text_columns, self.image_columns, self.audio_columns
        )
        preds = self.trainer.predict(test_dataset=ds)
        y_pred = np.argmax(preds.predictions, axis=-1)
        if return_label_str:
            return np.array([self.id2label[int(i)] for i in y_pred])
        return y_pred

    def get_embeddings(self, df: pd.DataFrame, batch_size: int = 32, return_per_modality: bool = False):
        """
        Извлекает fused эмбеддинги (и опционально по модальностям) для новых данных.

        :param df: Датафрейм с нужными колонками модальностей.
        :param batch_size: Размер батча при инференсе.
        :param return_per_modality: Вернуть также словарь эмбеддингов {'text','image','audio'}.
        :return: np.ndarray fused [N, D_fused] или (fused, per_modality_dict).
        :raises RuntimeError: Если модель ещё не обучена.
        """
        if self.trainer is None or self.model is None:
            raise RuntimeError("Модель не обучена. Вызовите .fit().")

        # Определяем девайс модели
        try:
            device = next(self.trainer.model.parameters()).device
        except StopIteration:
            device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        self.model.to(device).eval()

        df_c = df.copy()
        if self.target_column_name not in df_c.columns:
            df_c[self.target_column_name] = list(self.label2id.keys())[0]

        ds = MultiComboDataset(
            df_c, self.target_column_name, self.label2id,
            self.text_columns, self.image_columns, self.audio_columns
        )

        def collate(batch_list: List[Dict[str, Any]]) -> Dict[str, Any]:
            """
            Collate для DataLoader при извлечении эмбеддингов.

            :param batch_list: Список элементов.
            :return: Батч 'backend_inputs' для модели.
            """
            return self.backend.collate(batch_list)

        def move_to_device(obj, device: torch.device):
            """
            Рекурсивно переносит тензоры на device.

            :param obj: Тензор/словарь/список/кортеж/прочее.
            :param device: torch.device.
            :return: Объект с перенесёнными тензорами.
            """
            if torch.is_tensor(obj):
                return obj.to(device)
            if isinstance(obj, dict):
                return {k: move_to_device(v, device) for k, v in obj.items()}
            if isinstance(obj, (list, tuple)):
                t = [move_to_device(v, device) for v in obj]
                return type(obj)(t) if not isinstance(obj, list) else t
            return obj

        loader = DataLoader(ds, batch_size=batch_size, shuffle=False, collate_fn=collate)
        fused_list = []
        per_mod_lists = {m: [] for m in self.modalities} if return_per_modality else None

        with torch.no_grad():
            for batch in loader:
                bi = move_to_device(batch["backend_inputs"], device)
                fused, per = self.model.get_embeddings(backend_inputs=bi, return_per_modality=True)
                fused_list.append(fused.cpu().numpy())
                if return_per_modality:
                    for m in per_mod_lists.keys():
                        if m in per:
                            per_mod_lists[m].append(per[m].cpu().numpy())

        fused_arr = np.vstack(fused_list)
        if not return_per_modality:
            return fused_arr
        per_mod = {m: np.vstack(chunks) for m, chunks in per_mod_lists.items()}
        return fused_arr, per_mod

    def get_eval_history(self) -> pd.DataFrame:
        """
        Возвращает историю метрик на валидации, собранную коллбэком (если он добавлен).

        :return: DataFrame со строками вида {'step','epoch', 'eval_loss', 'eval_f1'/...}.
        """
        cb = getattr(self, "progress_callback", None)
        if cb is None:
            return pd.DataFrame()
        # История собирается через on_log печать; здесь можно расширить для хранения, если потребуется.
        # Сейчас возвращаем пустой каркас (расширяемость для будущей версии).
        return pd.DataFrame()  # при желании можно дополнить сохранением истории внутри коллбэка

Создание фиктивных данных.

In [None]:
import os, random
import numpy as np
import pandas as pd
from PIL import Image

import torch
import torchaudio

HAVE_TORCHAUDIO = True

random.seed(42); np.random.seed(42); torch.manual_seed(42)

BASE_DIR = "./dummy_data"
IMG_DIR  = os.path.join(BASE_DIR, "images")
AUD_DIR  = os.path.join(BASE_DIR, "audio")
os.makedirs(IMG_DIR, exist_ok=True)
os.makedirs(AUD_DIR, exist_ok=True)

# Сколько элементов на модальность в каждой строке
K_PER_MODALITY = 3

def make_dummy_images(n=12, size=(256, 256)):
    paths = []
    for i in range(n):
        color = tuple(np.random.randint(0, 255, size=3).tolist())
        img = Image.new("RGB", size, color=color)
        path = os.path.join(IMG_DIR, f"img_{i:02d}.png")
        img.save(path)
        paths.append(path)
    return paths

def make_dummy_audios(n=12, sr=48000, duration_sec=0.6):
    if not HAVE_TORCHAUDIO:
        raise RuntimeError("Нельзя сгенерировать аудио без torchaudio. Установите 'pip install torchaudio'.")
    paths = []
    t = torch.linspace(0, duration_sec, int(sr * duration_sec))
    for i in range(n):
        freq = random.choice([220, 330, 440, 550, 660, 880])
        wave = 0.2 * torch.sin(2 * np.pi * freq * t)  # амплитуда 0.2
        wave = wave.unsqueeze(0)  # [1, T] mono
        path = os.path.join(AUD_DIR, f"tone_{i:02d}.wav")
        torchaudio.save(path, wave, sample_rate=sr)
        paths.append(path)
    return paths

img_paths = make_dummy_images(n=12)
audio_paths = make_dummy_audios(n=12) if HAVE_TORCHAUDIO else []

# Вспомогательные тексты
TITLES = ["Red fox", "Blue sky", "Green field", "Yellow sun", "Purple rain", "Silver line"]
BODIES = ["quick brown", "lazy dog", "jumps high", "runs fast", "stays calm", "shines bright"]
QUERIES = ["find tone", "classify sound", "describe image", "retrieve pair", "detect event"]

def rand_title(): return random.choice(TITLES)
def rand_body(): return random.choice(BODIES)
def rand_query(): return random.choice(QUERIES)

# Универсальные хелперы
def sample_k(seq, k):
    if len(seq) >= k:
        return random.sample(seq, k)  # без повторов
    else:
        return [random.choice(seq) for _ in range(k)]  # с повторами, если мало исходников

def as_cols(prefix, values):
    # {"prefix_1": values[0], ..., "prefix_k": values[k-1]}
    return {f"{prefix}_{i+1}": v for i, v in enumerate(values)}

def pick_text_desc(k=K_PER_MODALITY):
    # Текстовое описание: "Title | body"
    vals = [f"{rand_title()} | {rand_body()}" for _ in range(k)]
    return as_cols("text", vals)

def pick_text_query(k=K_PER_MODALITY):
    vals = [rand_query() for _ in range(k)]
    return as_cols("text", vals)

def pick_images(k=K_PER_MODALITY):
    vals = sample_k(img_paths, k)
    return as_cols("image_path", vals)

def pick_audios(k=K_PER_MODALITY):
    vals = sample_k(audio_paths, k)
    return as_cols("audio_path", vals)

# 1) Текст + Картинка -> по 3 текстовых и 3 картинок
def build_df_text_image(n=24, k=K_PER_MODALITY):
    rows = []
    for _ in range(n):
        row = {}
        row.update(pick_text_desc(k))
        row.update(pick_images(k))
        row["label"] = random.choice(["class_a", "class_b", "class_c"])
        rows.append(row)
    return pd.DataFrame(rows)

# 2) Текст + Звук -> по 3 текста (query) и 3 аудио
def build_df_text_audio(n=24, k=K_PER_MODALITY):
    if not HAVE_TORCHAUDIO:
        raise RuntimeError("Для text+audio нужен torchaudio.")
    rows = []
    for _ in range(n):
        row = {}
        row.update(pick_text_query(k))
        row.update(pick_audios(k))
        row["label"] = random.choice(["ok", "ng"])
        rows.append(row)
    return pd.DataFrame(rows)

# 3) Картинка + Звук -> по 3 картинки и 3 аудио
def build_df_image_audio(n=24, k=K_PER_MODALITY):
    if not HAVE_TORCHAUDIO:
        raise RuntimeError("Для image+audio нужен torchaudio.")
    rows = []
    for _ in range(n):
        row = {}
        row.update(pick_images(k))
        row.update(pick_audios(k))
        row["label"] = random.choice(["dog", "cat", "bird"])
        rows.append(row)
    return pd.DataFrame(rows)

# 4) Текст + Картинка + Звук -> по 3 на каждую модальность
def build_df_text_image_audio(n=24, k=K_PER_MODALITY):
    if not HAVE_TORCHAUDIO:
        raise RuntimeError("Для text+image+audio нужен torchaudio.")
    rows = []
    for _ in range(n):
        row = {}
        row.update(pick_text_desc(k))
        row.update(pick_images(k))
        row.update(pick_audios(k))
        row["label"] = random.choice(["A", "B", "C", "D"])
        rows.append(row)
    return pd.DataFrame(rows)

# Собираем 4 датасета
df_text_image = build_df_text_image(290, K_PER_MODALITY)
df_text_audio = build_df_text_audio(230, K_PER_MODALITY) if HAVE_TORCHAUDIO else None
df_image_audio = build_df_image_audio(150, K_PER_MODALITY) if HAVE_TORCHAUDIO else None
df_text_image_audio  = build_df_text_image_audio(200, K_PER_MODALITY) if HAVE_TORCHAUDIO else None

print("df_text_image columns:", list(df_text_image.columns))
if HAVE_TORCHAUDIO:
    print("df_text_audio columns:", list(df_text_audio.columns))
    print("df_image_audio columns:", list(df_image_audio.columns))
    print("df_text_image_audio columns:", list(df_text_image_audio.columns))

Пример использования.

In [None]:
pipeline = SingleModelMultiComboClassification(
    modalities=["text", "image", "audio"],
    num_labels=4,
    target_column_name="label",

    # Колонки данных
    text_columns=["text_1", "text_2", "text_3"],
    image_columns=["image_path_1", "image_path_2"],
    audio_columns=["audio_path_1", "audio_path_2"],

    # Бэкенд и фьюжн
    backend="auto",     # для ['text','image','audio'] автоматически выберется clip_wav2clip
    fusion="concat",
    freeze_backbone=True,   # linear probing; поставьте False для тонкой донастройки энкодеров

    # Ограничения по количеству мультимодальных входов (для concat-агрегации)
    max_images_per_sample=2,  # под две картинки
    max_audios_per_sample=1    # под одно аудио (если две — выставьте 2 и добавьте вторую колонку)
)

# Обучение (с поддержкой «больших данных» за счёт чанков)
pipeline.fit(
    df_text_image_audio,
    epochs=2,
    per_device_train_batch_size=4,
    gradient_accumulation_steps=2,
    metric_name="f1",
    fp16=True,
    logging_steps=10,
    eval_steps=10,
    fit_chunk_size=25,
    output_dir="./result",
    seed=42
)

preds = pipeline.predict(df_text_image_audio[:5], return_label_str=True)
emb = pipeline.get_embeddings(df_text_image_audio[:5], batch_size=8)

print(f'preds: {preds}')
print(f'embeddings: {emb}')

# При желании — эмбеддинги по модальностям отдельно
fused, per_mod = pipeline.get_embeddings(
    df_text_image_audio.head(8),
    batch_size=8,
    return_per_modality=True
)
print("fused.shape:", fused.shape)
for m, arr in per_mod.items():
    print(f"{m} emb shape:", arr.shape)

# Дообучение регрессора, который работает с данными разной модальностью.

Реализация регрессора.

In [None]:
!pip install -q wav2clip torchaudio transformers evaluate pillow

import gc
import math
from typing import List, Dict, Any, Optional, Union

import numpy as np
import pandas as pd
from PIL import Image

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

import evaluate
from transformers import Trainer, TrainingArguments, TrainerCallback, PrinterCallback
from transformers.modeling_outputs import ModelOutput
from tqdm.auto import tqdm


def set_seed(seed: int = 42):
    """
    Устанавливает фиксированное зерно для Python, NumPy и PyTorch (включая все доступные CUDA-устройства).

    :param seed: Значение зерна.
    """
    import random
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)


def to_pil(x: Union[str, np.ndarray, Image.Image]) -> Image.Image:
    """
    Приводит входное изображение к PIL.Image в формате RGB.

    Поддерживаемые форматы:
      - путь к файлу (str);
      - NumPy массив (H, W[, C]) со значениями uint8;
      - PIL.Image (любого режима).

    :param x: Источник изображения.
    :return: Изображение PIL.Image в RGB.
    :raises ValueError: Если тип входа не поддерживается.
    """
    if isinstance(x, Image.Image):
        return x.convert("RGB")
    if isinstance(x, str):
        return Image.open(x).convert("RGB")
    if isinstance(x, np.ndarray):
        return Image.fromarray(x).convert("RGB")
    raise ValueError("Ожидается путь/np.ndarray/PIL.Image")


def load_audio(path: str, target_sr: int) -> np.ndarray:
    """
    Загружает аудио и (при необходимости) ресемплирует до target_sr.

    - Монофонизирует вход через усреднение каналов.
    - Возвращает float32 массив формы [T].

    :param path: Путь к аудиофайлу (wav/flac/…).
    :param target_sr: Целевая частота дискретизации.
    :return: Сигнал формы [T] float32.
    :raises RuntimeError: Если torchaudio недоступен.
    """
    try:
        import torchaudio
    except Exception as e:
        raise RuntimeError("Требуется torchaudio: pip install torchaudio") from e
    waveform, sr = torchaudio.load(path)  # [C, T]
    if waveform.size(0) > 1:
        waveform = waveform.mean(dim=0, keepdim=True)
    if sr != target_sr:
        waveform = torchaudio.functional.resample(waveform, orig_freq=sr, new_freq=target_sr)
    return waveform.squeeze(0).numpy().astype(np.float32)


class MultiComboRegDataset(Dataset):
    """
    Регрессионный датасет с поддержкой нескольких изображений и аудио на сэмпл.

    Ячейки колонок изображений/аудио могут содержать:
      - одиночное значение (путь/np.ndarray/PIL для изображения; путь/np.ndarray для аудио);
      - список таких значений.

    :param df: Исходный DataFrame.
    :param target_cols: Список целевых колонок (поддерживается многомерная регрессия).
    :param text_columns: Список текстовых колонок. Их значения конкатенируются через [SEP].
    :param image_columns: Список колонок изображений.
    :param audio_columns: Список колонок аудио.
    """
    def __init__(
        self,
        df: pd.DataFrame,
        target_cols: List[str],
        text_columns: Optional[List[str]] = None,
        image_columns: Optional[List[str]] = None,
        audio_columns: Optional[List[str]] = None
    ):
        """
        Инициализация датасета.

        :param df: Исходный DataFrame.
        :param target_cols: Названия колонок целей.
        :param text_columns: Текстовые колонки (склеиваются).
        :param image_columns: Колонки изображений (значение или список значений).
        :param audio_columns: Колонки аудио (значение или список значений).
        """
        self.df = df.reset_index(drop=True)
        self.target_cols = target_cols
        self.text_columns = text_columns or []
        self.image_columns = image_columns or []
        self.audio_columns = audio_columns or []
        self.sep = " [SEP] "

    def __len__(self) -> int:
        """
        Количество элементов датасета.

        :return: Длина датасета.
        """
        return len(self.df)

    def __getitem__(self, idx: int) -> Dict[str, Any]:
        """
        Возвращает один элемент датасета.

        Ключи в словаре:
          - 'labels': np.ndarray формы [T], float32;
          - 'text': строка (если заданы text_columns);
          - 'images': список изображений (пути/np.ndarray/PIL) или пустой список;
          - 'audios': список аудио (пути/np.ndarray) или пустой список.

        :param idx: Индекс строки.
        :return: Словарь для последующего collate бэкенда.
        """
        row = self.df.iloc[idx]
        y = np.array([float(row[c]) for c in self.target_cols], dtype=np.float32)

        def _as_list(v):
            if v is None or (isinstance(v, float) and np.isnan(v)):
                return []
            if isinstance(v, (list, tuple)):
                return list(v)
            return [v]

        item = {"labels": y}
        if self.text_columns:
            item["text"] = self.sep.join([str(row[c]) if pd.notnull(row[c]) else "" for c in self.text_columns])

        if self.image_columns:
            imgs = []
            for c in self.image_columns:
                if c in row:
                    imgs.extend(_as_list(row[c]))
            item["images"] = imgs

        if self.audio_columns:
            auds = []
            for c in self.audio_columns:
                if c in row:
                    auds.extend(_as_list(row[c]))
            item["audios"] = auds

        return item


class BaseBackend(nn.Module):
    """
    Базовый класс бэкенда для единой мультимодальной модели.

    Атрибуты:
      - name: Название бэкенда.
      - supported: Набор поддерживаемых модальностей (например, {'text','image'}).
      - embed_dim: Базовая размерность эмбеддингов модальностей.
      - out_dim_per_modality: Словарь фактических размерностей выходных эмбеддингов по модальностям
                              (с учётом агрегации/concat).
    """
    name: str = "base"
    supported: set = set()
    embed_dim: int = 0
    out_dim_per_modality: Dict[str, int] = {}

    def collate(self, batch: List[Dict[str, Any]]) -> Dict[str, Any]:
        """
        Собирает батч для текущего бэкенда.

        :param batch: Список элементов MultiComboRegDataset.
        :return: Словарь вида:
                 {
                   'labels': torch.FloatTensor [B, T],
                   'backend_inputs': dict(...)
                 }
        """
        raise NotImplementedError

    def encode(self, backend_inputs: Dict[str, Any], device: torch.device) -> Dict[str, torch.Tensor]:
        """
        Кодирует модальности и возвращает эмбеддинги по доступным модальностям.

        :param backend_inputs: Предобработанные входы (из collate).
        :param device: Целевой девайс.
        :return: Словарь {'text':[B,*], 'image':[B,*], 'audio':[B,*]} по имеющимся модальностям.
        """
        raise NotImplementedError

    def freeze_all(self):
        """
        Замораживает все параметры бэкенда (requires_grad=False).
        """
        for p in self.parameters():
            p.requires_grad = False

    @staticmethod
    def _stack_labels_float(batch: List[Dict[str, Any]]) -> torch.Tensor:
        """
        Преобразует список 'labels' из элементов батча в тензор float32 [B, T] с выравниванием по T.

        Если длины таргетов различны, добивает меньшие нулями справа до максимальной длины.

        :param batch: Список элементов датасета.
        :return: torch.FloatTensor [B, T].
        """
        ys = []
        for b in batch:
            y = b.get("labels", None)
            if y is None:
                ys.append(np.array([0.0], dtype=np.float32))
            else:
                arr = np.array(y, dtype=np.float32).reshape(-1)
                ys.append(arr)
        max_t = max(a.shape[0] for a in ys)
        ys_padded = []
        for a in ys:
            if a.shape[0] == max_t:
                ys_padded.append(a)
            else:
                pad = np.zeros(max_t, dtype=np.float32)
                pad[:a.shape[0]] = a
                ys_padded.append(pad)
        return torch.from_numpy(np.stack(ys_padded, axis=0))

    def get_out_dim(self, modality: str) -> int:
        """
        Возвращает итоговую размерность выходного вектора по модальности
        (с учётом агрегации и параметров max_*).

        :param modality: Имя модальности: 'text' | 'image' | 'audio'.
        :return: Размерность эмбеддинга.
        """
        return self.out_dim_per_modality.get(modality, self.embed_dim)


class ClipBackend(BaseBackend):
    """
    CLIP-бэкенд для текста и изображений с поддержкой нескольких изображений на сэмпл.

    Агрегация изображений:
      - 'concat': конкатенация эмбеддингов N изображений в один вектор (с паддингом нулями до max_images);
      - 'mean': усреднение эмбеддингов изображений.

    :param checkpoint: Имя/путь чекпоинта CLIP (HF Hub).
    :param max_length: Максимальная длина токенов текста.
    :param freeze: Замораживать ли веса CLIP (linear probing).
    :param max_images: Максимальное число изображений на сэмпл для агрегации.
    :param image_agg: Тип агрегации изображений ('concat' или 'mean').
    """
    name = "clip"
    supported = {"text", "image"}

    def __init__(
        self,
        checkpoint: str = "openai/clip-vit-base-patch32",
        max_length: int = 77,
        freeze: bool = True,
        max_images: int = 1,
        image_agg: str = "concat"
    ):
        """
        Инициализация CLIP-бэкенда.

        :param checkpoint: Чекпоинт CLIP.
        :param max_length: Максимальная длина текста (токенов).
        :param freeze: Замораживать ли веса.
        :param max_images: Максимум изображений на сэмпл.
        :param image_agg: Агрегация изображений.
        """
        super().__init__()
        from transformers import CLIPModel, CLIPProcessor
        self.model = CLIPModel.from_pretrained(checkpoint)
        self.processor = CLIPProcessor.from_pretrained(checkpoint)
        self.embed_dim = int(self.model.config.projection_dim)
        self.max_length = max_length
        self.max_images = int(max_images)
        self.image_agg = image_agg
        if freeze:
            self.freeze_all()
        img_out = self.embed_dim * self.max_images if self.image_agg == "concat" else self.embed_dim
        self.out_dim_per_modality = {"text": self.embed_dim, "image": img_out}

    def collate(self, batch: List[Dict[str, Any]]) -> Dict[str, Any]:
        """
        Готовит батч для CLIP.

        Формирует:
          - text_inputs: токены текста;
          - image_inputs: pixel_values (возможен None);
          - image_counts: количество изображений на каждый сэмпл (для агрегации).

        :param batch: Элементы с ключами 'text', 'images', 'labels'.
        :return: {'labels': FloatTensor [B,T], 'backend_inputs': dict}.
        """
        labels = self._stack_labels_float(batch)
        texts = [b.get("text", "") for b in batch]

        images_lists = [b.get("images", []) for b in batch]
        flat_images, counts = [], []
        for lst in images_lists:
            lst = lst if isinstance(lst, list) else ([] if lst is None else [lst])
            counts.append(len(lst))
            for img in lst:
                flat_images.append(to_pil(img))

        text_inputs = self.processor(
            text=texts, padding=True, truncation=True, max_length=self.max_length, return_tensors="pt"
        )
        text_inputs = {k: v for k, v in text_inputs.items()}

        if len(flat_images):
            img_proc = self.processor(images=flat_images, return_tensors="pt")
            image_inputs = {"pixel_values": img_proc["pixel_values"]}
        else:
            image_inputs = {"pixel_values": None}

        backend_inputs = {
            "text_inputs": text_inputs,
            "image_inputs": image_inputs,
            "image_counts": torch.tensor(counts, dtype=torch.long),
        }
        return {"labels": labels, "backend_inputs": backend_inputs}

    def _concat_padded(self, embs: torch.Tensor, counts: List[int], max_k: int) -> torch.Tensor:
        """
        Конкатенирует до max_k эмбеддингов изображений на сэмпл с паддингом нулями.

        :param embs: Плоские эмбеддинги изображений [M, D], где M = сумма(counts).
        :param counts: Список количеств изображений на сэмпл.
        :param max_k: Максимум изображений на сэмпл (для паддинга/отрезания).
        :return: Нормализованный тензор [B, D*max_k].
        """
        device = embs.device if embs is not None else torch.device("cpu")
        B = len(counts); D = self.embed_dim
        out = torch.zeros((B, D * max_k), device=device, dtype=embs.dtype if embs is not None else torch.float32)
        if embs is None:
            return out
        offset = 0
        for i, c in enumerate(counts):
            if c > 0:
                chunk = embs[offset:offset + c]
                take = chunk[:max_k]
                if take.size(0) < max_k:
                    pad = torch.zeros((max_k - take.size(0), D), device=device, dtype=embs.dtype)
                    take = torch.cat([take, pad], dim=0)
                out[i] = take.reshape(-1)
            offset += c
        return F.normalize(out, dim=-1)

    def _mean_pool(self, embs: torch.Tensor, counts: List[int]) -> torch.Tensor:
        """
        Усредняет эмбеддинги изображений на сэмпл.

        :param embs: Плоские эмбеддинги [M, D].
        :param counts: Количества изображений на сэмпл.
        :return: Нормализованный тензор [B, D].
        """
        device = embs.device if embs is not None else torch.device("cpu")
        B = len(counts); D = self.embed_dim
        out = torch.zeros((B, D), device=device, dtype=embs.dtype if embs is not None else torch.float32)
        if embs is None:
            return out
        offset = 0
        for i, c in enumerate(counts):
            if c > 0:
                out[i] = embs[offset:offset + c].mean(dim=0)
            offset += c
        return F.normalize(out, dim=-1)

    def encode(self, backend_inputs: Dict[str, Any], device: torch.device) -> Dict[str, torch.Tensor]:
        """
        Вычисляет эмбеддинги текста и изображений (с агрегацией изображений).

        :param backend_inputs: Тензоры CLIPProcessor для текста и картинок.
        :param device: Девайс.
        :return: {'text':[B,D], 'image':[B,D*max_images] или [B,D]} — L2-нормированные эмбеддинги.
        """
        ti = {k: v.to(device) for k, v in backend_inputs["text_inputs"].items()}
        text_z = self.model.get_text_features(input_ids=ti["input_ids"], attention_mask=ti["attention_mask"])
        text_z = F.normalize(text_z, dim=-1)

        counts = backend_inputs["image_counts"].tolist()
        pi = backend_inputs["image_inputs"]["pixel_values"]
        if pi is not None:
            pi = pi.to(device)
            img_flat = self.model.get_image_features(pixel_values=pi)
            img_flat = F.normalize(img_flat, dim=-1)
            if self.image_agg == "concat":
                img_z = self._concat_padded(img_flat, counts, self.max_images)
            else:
                img_z = self._mean_pool(img_flat, counts)
        else:
            img_z = torch.zeros(
                (len(counts), self.embed_dim * self.max_images if self.image_agg == "concat" else self.embed_dim),
                device=device
            )

        return {"text": text_z, "image": img_z}


class ClapBackend(BaseBackend):
    """
    CLAP-бэкенд для текста и аудио с поддержкой нескольких аудио на сэмпл.

    Агрегация аудио:
      - 'concat': конкатенация эмбеддингов N аудиоклипов в один вектор (с паддингом нулями до max_audios);
      - 'mean': усреднение эмбеддингов аудио.

    :param checkpoint: Имя/путь чекпоинта CLAP (HF Hub).
    :param freeze: Замораживать ли веса CLAP.
    :param max_audios: Максимальное число аудио на сэмпл для агрегации.
    :param audio_agg: Тип агрегации аудио ('concat' или 'mean').
    """
    name = "clap"
    supported = {"text", "audio"}

    def __init__(
        self,
        checkpoint: str = "laion/clap-htsat-unfused",
        freeze: bool = True,
        max_audios: int = 1,
        audio_agg: str = "concat"
    ):
        """
        Инициализация CLAP-бэкенда.

        :param checkpoint: Чекпоинт CLAP.
        :param freeze: Замораживать ли веса.
        :param max_audios: Максимум аудио на сэмпл.
        :param audio_agg: Агрегация аудио.
        """
        super().__init__()
        from transformers import ClapModel, ClapProcessor
        self.model = ClapModel.from_pretrained(checkpoint)
        self.processor = ClapProcessor.from_pretrained(checkpoint)
        self.embed_dim = int(getattr(self.model.config, "projection_dim", 512))
        sr = getattr(self.processor, "sampling_rate", None)
        if sr is None:
            fe = getattr(self.processor, "feature_extractor", None)
            sr = getattr(fe, "sampling_rate", 48000) if fe is not None else 48000
        self.sr = int(sr)
        self.max_audios = int(max_audios)
        self.audio_agg = audio_agg
        if freeze:
            self.freeze_all()
        aud_out = self.embed_dim * self.max_audios if self.audio_agg == "concat" else self.embed_dim
        self.out_dim_per_modality = {"text": self.embed_dim, "audio": aud_out}

    def collate(self, batch: List[Dict[str, Any]]) -> Dict[str, Any]:
        """
        Готовит батч для CLAP.

        Формирует:
          - text_inputs: токены текста;
          - audio_inputs: input_features (или None);
          - audio_counts: число аудио на сэмпл.

        :param batch: Элементы с ключами 'text','audios','labels'.
        :return: {'labels': FloatTensor [B,T], 'backend_inputs': dict}.
        """
        labels = self._stack_labels_float(batch)
        texts = [b.get("text", "") for b in batch]

        audios_lists = [b.get("audios", []) for b in batch]
        flat_audios, counts = [], []
        for lst in audios_lists:
            lst = lst if isinstance(lst, list) else ([] if lst is None else [lst])
            counts.append(len(lst))
            for a in lst:
                if isinstance(a, str):
                    flat_audios.append(load_audio(a, self.sr))
                elif isinstance(a, np.ndarray):
                    flat_audios.append(a.astype(np.float32))
                else:
                    raise ValueError("Ожидается путь к аудио или numpy.ndarray")

        text_inputs = self.processor(text=texts, padding=True, truncation=True, return_tensors="pt")
        text_inputs = {k: v for k, v in text_inputs.items()}

        if len(flat_audios):
            aud_proc = self.processor(audios=flat_audios, sampling_rate=self.sr, padding=True, return_tensors="pt")
            audio_inputs = {"input_features": aud_proc["input_features"]}
        else:
            audio_inputs = {"input_features": None}

        backend_inputs = {
            "text_inputs": text_inputs,
            "audio_inputs": audio_inputs,
            "audio_counts": torch.tensor(counts, dtype=torch.long)
        }
        return {"labels": labels, "backend_inputs": backend_inputs}

    def _concat_padded(self, embs: torch.Tensor, counts: List[int], max_k: int) -> torch.Tensor:
        """
        Конкатенирует до max_k эмбеддингов аудио на сэмпл с паддингом нулями.

        :param embs: Плоские эмбеддинги аудио [M, D].
        :param counts: Список количеств аудио на сэмпл.
        :param max_k: Максимум аудио на сэмпл.
        :return: Нормализованный тензор [B, D*max_k].
        """
        device = embs.device if embs is not None else torch.device("cpu")
        B = len(counts); D = self.embed_dim
        out = torch.zeros((B, D * max_k), device=device, dtype=embs.dtype if embs is not None else torch.float32)
        if embs is None:
            return out
        offset = 0
        for i, c in enumerate(counts):
            if c > 0:
                chunk = embs[offset:offset + c]
                take = chunk[:max_k]
                if take.size(0) < max_k:
                    pad = torch.zeros((max_k - take.size(0), D), device=device, dtype=embs.dtype)
                    take = torch.cat([take, pad], dim=0)
                out[i] = take.reshape(-1)
            offset += c
        return F.normalize(out, dim=-1)

    def _mean_pool(self, embs: torch.Tensor, counts: List[int]) -> torch.Tensor:
        """
        Усредняет эмбеддинги аудио на сэмпл.

        :param embs: Плоские эмбеддинги [M, D].
        :param counts: Количества аудио на сэмпл.
        :return: Нормализованный тензор [B, D].
        """
        device = embs.device if embs is not None else torch.device("cpu")
        B = len(counts); D = self.embed_dim
        out = torch.zeros((B, D), device=device, dtype=embs.dtype if embs is not None else torch.float32)
        if embs is None:
            return out
        offset = 0
        for i, c in enumerate(counts):
            if c > 0:
                out[i] = embs[offset:offset + c].mean(dim=0)
            offset += c
        return F.normalize(out, dim=-1)

    def encode(self, backend_inputs: Dict[str, Any], device: torch.device) -> Dict[str, torch.Tensor]:
        """
        Вычисляет эмбеддинги текста и аудио (с агрегацией аудио).

        :param backend_inputs: Тензоры ClapProcessor для текста и аудио.
        :param device: Девайс.
        :return: {'text':[B,D], 'audio':[B,D*max_audios] или [B,D]} — L2-нормированные эмбеддинги.
        """
        ti = {k: v.to(device) for k, v in backend_inputs["text_inputs"].items()}
        if hasattr(self.model, "get_text_features") and hasattr(self.model, "get_audio_features"):
            text_z = self.model.get_text_features(input_ids=ti["input_ids"], attention_mask=ti["attention_mask"])
        else:
            out = self.model(**ti, output_attentions=False, output_hidden_states=False, return_dict=True)
            text_z = out.text_embeds
        text_z = F.normalize(text_z, dim=-1)

        counts = backend_inputs["audio_counts"].tolist()
        af = backend_inputs["audio_inputs"]["input_features"]
        if af is not None:
            af = af.to(device)
            if hasattr(self.model, "get_audio_features"):
                aud_flat = self.model.get_audio_features(input_features=af)
            else:
                out = self.model(input_features=af, output_attentions=False, output_hidden_states=False, return_dict=True)
                aud_flat = out.audio_embeds
            aud_flat = F.normalize(aud_flat, dim=-1)
            if self.audio_agg == "concat":
                aud_z = self._concat_padded(aud_flat, counts, self.max_audios)
            else:
                aud_z = self._mean_pool(aud_flat, counts)
        else:
            aud_z = torch.zeros(
                (len(counts), self.embed_dim * self.max_audios if self.audio_agg == "concat" else self.embed_dim),
                device=device
            )

        return {"text": text_z, "audio": aud_z}


class ClipWav2CLIPBackend(BaseBackend):
    """
    Комбинированный бэкенд: CLIP для текста/изображения + Wav2CLIP для аудио (в CLIP-пространство).

    Поддержка нескольких изображений и аудио на сэмпл с агрегацией 'concat' или 'mean'.

    :param checkpoint: Чекпоинт CLIP (HF).
    :param max_length: Максимальная длина токенов для CLIP.
    :param freeze: Замораживать ли веса CLIP.
    :param audio_sr: Частота дискретизации для Wav2CLIP.
    :param max_images: Максимум изображений на сэмпл.
    :param max_audios: Максимум аудио на сэмпл.
    :param image_agg: Тип агрегации изображений ('concat' | 'mean').
    :param audio_agg: Тип агрегации аудио ('concat' | 'mean').
    """
    name = "clip_wav2clip"
    supported = {"text", "image", "audio"}

    def __init__(
        self,
        checkpoint: str = "openai/clip-vit-base-patch32",
        max_length: int = 77,
        freeze: bool = True,
        audio_sr: int = 16000,
        max_images: int = 1,
        max_audios: int = 1,
        image_agg: str = "concat",
        audio_agg: str = "concat"
    ):
        """
        Инициализация ClipWav2CLIP-бэкенда.

        :param checkpoint: Чекпоинт CLIP.
        :param max_length: Максимальная длина текста.
        :param freeze: Замораживать ли веса CLIP.
        :param audio_sr: Частота дискретизации для Wav2CLIP.
        :param max_images: Максимум изображений на сэмпл.
        :param max_audios: Максимум аудио на сэмпл.
        :param image_agg: Агрегация изображений.
        :param audio_agg: Агрегация аудио.
        """
        super().__init__()
        from transformers import CLIPModel, CLIPProcessor
        self.model = CLIPModel.from_pretrained(checkpoint)
        self.processor = CLIPProcessor.from_pretrained(checkpoint)
        self.embed_dim = int(self.model.config.projection_dim)
        self.max_length = max_length
        self.audio_sr = int(audio_sr)
        self.max_images = int(max_images)
        self.max_audios = int(max_audios)
        self.image_agg = image_agg
        self.audio_agg = audio_agg
        if freeze:
            self.freeze_all()
        img_out = self.embed_dim * self.max_images if self.image_agg == "concat" else self.embed_dim
        aud_out = self.embed_dim * self.max_audios if self.audio_agg == "concat" else self.embed_dim
        self.out_dim_per_modality = {"text": self.embed_dim, "image": img_out, "audio": aud_out}
        # ленивое подключение Wav2CLIP
        self._w2c_model = None
        self._w2c_mod = None
        self._w2c_api = None
        self._w2c_device = None

    def _ensure_w2c(self, device: torch.device):
        """
        Ленивая загрузка wav2clip и его модели для текущего устройства.

        Поддерживаются разные версии API:
          - функции load_model/get_model + embed_audio/get_audio_embedding;
          - класс Wav2CLIP с методом embed_audio.

        :param device: Текущее устройство.
        :raises RuntimeError: Если wav2clip недоступен или не удалось инициализировать модель.
        """
        if self._w2c_model is not None and self._w2c_device == str(device):
            return
        import importlib
        try:
            w2c = importlib.import_module("wav2clip")
        except Exception as e:
            raise RuntimeError("Не найден пакет 'wav2clip'. Установите: pip install wav2clip") from e
        dev_str = str(device) if device.type == "cuda" else "cpu"
        if hasattr(w2c, "load_model"):
            model = w2c.load_model(device=dev_str); api_kind = "func"
        elif hasattr(w2c, "get_model"):
            model = w2c.get_model(device=dev_str); api_kind = "func"
        elif hasattr(w2c, "Wav2CLIP"):
            try:
                model = w2c.Wav2CLIP(dev_str)
            except TypeError:
                model = w2c.Wav2CLIP(device=dev_str)
            api_kind = "method"
        else:
            raise RuntimeError("wav2clip установлен, но нет способов загрузки (load_model/get_model/Wav2CLIP)")
        self._w2c_mod = w2c
        self._w2c_model = model
        self._w2c_api = api_kind
        self._w2c_device = str(device)

    def collate(self, batch: List[Dict[str, Any]]) -> Dict[str, Any]:
        """
        Готовит батч для текста/изображений (через CLIPProcessor) и аудио (через паддинг numpy->Tensor).

        Возвращает:
          - text_inputs: словарь тензоров для текста;
          - image_inputs: словарь с pixel_values или None;
          - image_counts: LongTensor [B] — количество изображений на сэмпл;
          - audio_inputs: {'waveforms': Tensor [M,Lmax] или None, 'lengths': LongTensor [M] или None};
          - audio_counts: LongTensor [B] — количество аудио на сэмпл;
          - labels: FloatTensor [B, T].

        :param batch: Элементы с 'text','images','audios','labels'.
        :return: Словарь {'labels', 'backend_inputs'}.
        """
        labels = self._stack_labels_float(batch)
        texts = [b.get("text", "") for b in batch]

        # images (flatten + counts)
        images_lists = [b.get("images", []) for b in batch]
        flat_images, img_counts = [], []
        for lst in images_lists:
            lst = lst if isinstance(lst, list) else ([] if lst is None else [lst])
            img_counts.append(len(lst))
            for img in lst:
                flat_images.append(to_pil(img))
        text_inputs = self.processor(text=texts, padding=True, truncation=True, max_length=self.max_length, return_tensors="pt")
        text_inputs = {k: v for k, v in text_inputs.items()}
        if len(flat_images):
            img_proc = self.processor(images=flat_images, return_tensors="pt")
            image_inputs = {"pixel_values": img_proc["pixel_values"]}
        else:
            image_inputs = {"pixel_values": None}

        # audios (flatten + counts)
        audios_lists = [b.get("audios", []) for b in batch]
        flat_audios, aud_counts = [], []
        for lst in audios_lists:
            lst = lst if isinstance(lst, list) else ([] if lst is None else [lst])
            aud_counts.append(len(lst))
            for a in lst:
                if isinstance(a, str):
                    flat_audios.append(load_audio(a, self.audio_sr))
                elif isinstance(a, np.ndarray):
                    flat_audios.append(a.astype(np.float32))
                else:
                    raise ValueError("Ожидается путь к аудио или numpy.ndarray")

        if len(flat_audios):
            Lmax = max(len(a) for a in flat_audios)
            wav = np.zeros((len(flat_audios), Lmax), dtype=np.float32)
            lens = np.zeros((len(flat_audios),), dtype=np.int64)
            for i, a in enumerate(flat_audios):
                L = len(a)
                wav[i, :L] = a
                lens[i] = L
            audio_inputs = {"waveforms": torch.from_numpy(wav), "lengths": torch.from_numpy(lens)}
        else:
            audio_inputs = {"waveforms": None, "lengths": None}

        backend_inputs = {
            "text_inputs": text_inputs,
            "image_inputs": image_inputs,
            "image_counts": torch.tensor(img_counts, dtype=torch.long),
            "audio_inputs": audio_inputs,
            "audio_counts": torch.tensor(aud_counts, dtype=torch.long),
        }
        return {"labels": labels, "backend_inputs": backend_inputs}

    def _agg_concat(self, embs: torch.Tensor, counts: List[int], max_k: int) -> torch.Tensor:
        """
        Конкатенирует до max_k эмбеддингов на сэмпл с паддингом нулями.

        :param embs: Плоские эмбеддинги [M, D].
        :param counts: Количества элементов на сэмпл.
        :param max_k: Максимум элементов на сэмпл (для паддинга).
        :return: Нормализованный тензор [B, D*max_k].
        """
        device = embs.device if embs is not None else torch.device("cpu")
        B = len(counts); D = self.embed_dim
        out = torch.zeros((B, D * max_k), device=device, dtype=embs.dtype if embs is not None else torch.float32)
        if embs is None:
            return out
        offset = 0
        for i, c in enumerate(counts):
            if c > 0:
                chunk = embs[offset:offset+c]
                take = chunk[:max_k]
                if take.size(0) < max_k:
                    pad = torch.zeros((max_k - take.size(0), D), device=device, dtype=embs.dtype)
                    take = torch.cat([take, pad], dim=0)
                out[i] = take.reshape(-1)
            offset += c
        return F.normalize(out, dim=-1)

    def _agg_mean(self, embs: torch.Tensor, counts: List[int]) -> torch.Tensor:
        """
        Усредняет эмбеддинги на сэмпл.

        :param embs: Плоские эмбеддинги [M, D].
        :param counts: Количества элементов на сэмпл.
        :return: Нормализованный тензор [B, D].
        """
        device = embs.device if embs is not None else torch.device("cpu")
        B = len(counts); D = self.embed_dim
        out = torch.zeros((B, D), device=device, dtype=embs.dtype if embs is not None else torch.float32)
        if embs is None:
            return out
        offset = 0
        for i, c in enumerate(counts):
            if c > 0:
                out[i] = embs[offset:offset+c].mean(dim=0)
            offset += c
        return F.normalize(out, dim=-1)

    def encode(self, backend_inputs: Dict[str, Any], device: torch.device) -> Dict[str, torch.Tensor]:
        """
        Кодирует модальности: текст/изображение — через CLIP; аудио — через Wav2CLIP.

        :param backend_inputs: Входы из collate().
        :param device: Целевой девайс.
        :return: Словарь {'text',[B,D]; 'image',[B,*]; 'audio',[B,*]} в CLIP-пространстве.
        """
        # text + image via CLIP
        ti = {k: v.to(device) for k, v in backend_inputs["text_inputs"].items()}
        text_z = self.model.get_text_features(input_ids=ti["input_ids"], attention_mask=ti["attention_mask"])
        text_z = F.normalize(text_z, dim=-1)

        img_counts = backend_inputs["image_counts"].tolist()
        pi = backend_inputs["image_inputs"]["pixel_values"]
        if pi is not None:
            pi = pi.to(device)
            img_flat = self.model.get_image_features(pixel_values=pi)
            img_flat = F.normalize(img_flat, dim=-1)
            if self.image_agg == "concat":
                image_z = self._agg_concat(img_flat, img_counts, self.max_images)
            else:
                image_z = self._agg_mean(img_flat, img_counts)
        else:
            image_z = torch.zeros(
                (len(img_counts), self.embed_dim * self.max_images if self.image_agg == "concat" else self.embed_dim),
                device=device
            )

        # audio via wav2clip
        aud_counts = backend_inputs["audio_counts"].tolist()
        wav = backend_inputs["audio_inputs"]["waveforms"]
        lens = backend_inputs["audio_inputs"]["lengths"]
        if wav is not None and lens is not None and (lens.numel() if torch.is_tensor(lens) else len(lens)) > 0:
            self._ensure_w2c(device)
            w2c = self._w2c_mod
            embs = []
            for i in range(wav.size(0)):
                L = int(lens[i].item())
                a_np = wav[i, :L].detach().cpu().numpy()
                e = None
                if self._w2c_api == "func":
                    if hasattr(w2c, "embed_audio"):
                        try:
                            e = w2c.embed_audio(a_np, self._w2c_model)
                        except TypeError:
                            e = w2c.embed_audio(a_np, self.audio_sr, self._w2c_model)
                    elif hasattr(w2c, "get_audio_embedding"):
                        try:
                            e = w2c.get_audio_embedding(a_np, self._w2c_model)
                        except TypeError:
                            e = w2c.get_audio_embedding(a_np, self.audio_sr, self._w2c_model)
                if e is None and self._w2c_api == "method" and hasattr(self._w2c_model, "embed_audio"):
                    try:
                        e = self._w2c_model.embed_audio(a_np)
                    except TypeError:
                        e = self._w2c_model.embed_audio(a_np, sr=self.audio_sr)
                if e is None:
                    raise RuntimeError("Не удалось получить аудио‑эмбеддинг через wav2clip.")
                e = np.asarray(e)
                if e.ndim == 2:
                    e = e.mean(axis=0)
                elif e.ndim > 2:
                    e = e.reshape(-1, e.shape[-1]).mean(axis=0)
                embs.append(e.astype(np.float32))
            aud_flat = torch.tensor(np.stack(embs, axis=0), dtype=torch.float32, device=device)
            aud_flat = F.normalize(aud_flat, dim=-1)
            if self.audio_agg == "concat":
                audio_z = self._agg_concat(aud_flat, aud_counts, self.max_audios)
            else:
                audio_z = self._agg_mean(aud_flat, aud_counts)
        else:
            audio_z = torch.zeros(
                (len(aud_counts), self.embed_dim * self.max_audios if self.audio_agg == "concat" else self.embed_dim),
                device=device
            )

        return {"text": text_z, "image": image_z, "audio": audio_z}


class SingleBackboneRegressor(nn.Module):
    """
    Регрессор поверх одного мультимодального бэкенда:
    фьюжн эмбеддингов модальностей -> MLP-голова -> предсказание R^T.
    """
    def __init__(
        self,
        backend: BaseBackend,
        modalities: List[str],
        num_targets: int,
        fusion: str = "concat",
        hidden: int = 512,
        dropout: float = 0.1
    ):
        """
        Инициализирует регрессионную голову.

        :param backend: Инициализированный бэкенд (CLIP/CLAP/ClipWav2CLIP).
        :param modalities: Список активных модальностей (учитывается порядок: image, text, audio).
        :param num_targets: Число целевых признаков (T).
        :param fusion: Тип фьюжна — 'concat' или 'mean'.
        :param hidden: Размер скрытого слоя головы.
        :param dropout: Дропаут в голове.
        :raises ValueError: Если fusion='mean' при несовпадающих размерах модальностей.
        """
        super().__init__()
        self.backend = backend
        self.modalities = modalities
        self.fusion = fusion
        self.num_targets = num_targets

        order = [m for m in ["image", "text", "audio"] if m in self.modalities]
        if fusion == "concat":
            in_dim = sum(self.backend.get_out_dim(m) for m in order)
        elif fusion == "mean":
            dims = [self.backend.get_out_dim(m) for m in order]
            if len(set(dims)) != 1:
                raise ValueError(f'Для fusion="mean" размеры модальностей должны совпадать: {dict(zip(order, dims))}')
            in_dim = dims[0]
        else:
            raise ValueError('fusion должен быть "concat" или "mean"')

        self.head = nn.Sequential(
            nn.LayerNorm(in_dim),
            nn.Linear(in_dim, hidden),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(hidden, num_targets)
        )

    def _infer_device_from_inputs(self, obj) -> torch.device:
        """
        Пытается определить целевой девайс по первому найденному тензору во входах.

        :param obj: Произвольная вложенная структура (тензоры/словари/списки).
        :return: torch.device (cuda при наличии, иначе cpu).
        """
        if isinstance(obj, torch.Tensor):
            return obj.device
        if isinstance(obj, dict):
            for v in obj.values():
                d = self._infer_device_from_inputs(v)
                if d is not None:
                    return d
        return torch.device("cuda" if torch.cuda.is_available() else "cpu")

    def _fuse(self, z: Dict[str, torch.Tensor]) -> torch.Tensor:
        """
        Объединяет эмбеддинги модальностей согласно self.fusion.

        - concat: конкатенация по последней оси;
        - mean: среднее по модальностям (требует совпадения размерностей).

        :param z: Словарь эмбеддингов по модальностям.
        :return: Слитый эмбеддинг [B, D*|mods|] (concat) или [B, D] (mean).
        """
        order = [m for m in ["image", "text", "audio"] if m in self.modalities]
        feats = []
        for m in order:
            t = z[m]
            if t.dim() == 3:
                t = t.mean(dim=1)
            elif t.dim() > 3:
                t = t.view(t.size(0), -1)
            feats.append(t)
        if self.fusion == "concat":
            return torch.cat(feats, dim=-1)
        elif self.fusion == "mean":
            sizes = [f.size(-1) for f in feats]
            if len(set(sizes)) != 1:
                raise ValueError(f'Для fusion="mean" размеры модальностей должны совпадать: {sizes}')
            return torch.stack(feats, dim=0).mean(dim=0)

    def forward(self, backend_inputs: Dict[str, Any], labels: Optional[torch.Tensor] = None) -> ModelOutput:
        """
        Прямой проход: кодирование модальностей -> фьюжн -> регрессионная голова.

        :param backend_inputs: Входы для бэкенда (из его collate()).
        :param labels: Не используется (Trainer читает labels отдельно).
        :return: ModelOutput с полем logits [B, T].
        """
        device = self._infer_device_from_inputs(backend_inputs)
        z = self.backend.encode(backend_inputs, device=device)
        fused = self._fuse(z)
        preds = self.head(fused)
        return ModelOutput(logits=preds)

    @torch.no_grad()
    def get_embeddings(self, backend_inputs: Dict[str, Any], return_per_modality: bool = False):
        """
        Извлекает fused эмбеддинги (и, опционально, эмбеддинги по модальностям).

        :param backend_inputs: Входы для бэкенда.
        :param return_per_modality: Вернуть также словарь эмбеддингов по модальностям.
        :return: fused [B, *] или (fused, {'text':[B,*], 'image':[B,*], 'audio':[B,*]}).
        """
        device = self._infer_device_from_inputs(backend_inputs)
        z = self.backend.encode(backend_inputs, device=device)
        fused = self._fuse(z)
        if return_per_modality:
            return fused, z
        return fused


class MSETrainer(Trainer):
    """
    Trainer для регрессии на основе MSE loss.
    """
    def __init__(self, *args, reduction: str = "mean", **kwargs):
        """
        :param reduction: Тип редукции MSELoss ('mean' | 'sum' | 'none').
        """
        super().__init__(*args, **kwargs)
        self._reduction = reduction
        self._warned_label_tiling = False

    def compute_loss(self, model, inputs, return_outputs: bool = False, num_items_in_batch: Optional[int] = None):
        """
        Вычисляет MSE лосс между предсказаниями и целевыми значениями.
        Защищается от случая DataParallel, когда logits дублируются по числу GPU.

        :param model: Модель.
        :param inputs: Батч, содержащий 'labels' (float [B,T]) и аргументы для model.forward().
        :param return_outputs: Возвращать ли также outputs.
        :param num_items_in_batch: Совместимость с API Trainer (не используется).
        :return: loss (и outputs, если return_outputs=True).
        """
        labels = inputs.pop("labels")
        outputs = model(**inputs)
        preds = outputs.logits
        labels = labels.to(preds.device)

        if preds.size(0) != labels.size(0):
            ngpu = torch.cuda.device_count()
            if ngpu > 1 and preds.size(0) == labels.size(0) * ngpu:
                labels = labels.repeat_interleave(ngpu, dim=0)
                if not self._warned_label_tiling:
                    print(f"[Warning] DataParallel удвоил batch для logits. "
                          f"Повторяем labels x{ngpu}. preds: {tuple(preds.shape)}, labels: {tuple(labels.shape)}")
                    self._warned_label_tiling = True
            else:
                raise ValueError(f"Batch size mismatch: preds {tuple(preds.shape)} vs labels {tuple(labels.shape)}")

        loss = nn.MSELoss(reduction=self._reduction)(preds, labels)
        return (loss, outputs) if return_outputs else loss


class PbarConsoleLogger(TrainerCallback):
    """
    Внешний прогресс‑бар и консольный логгер метрик/лоссов для стабильного отображения на больших данных.
    """
    def __init__(self, pbar):
        self.pbar = pbar
        self.last_logs = {}
        self.last_train_loss = None
        self.printed_eval_steps = set()

    def _step(self, state) -> int:
        return int(state.global_step or 0)

    def _fmt_postfix(self):
        parts = []
        if 'loss' in self.last_logs:
            parts.append(f"loss {self.last_logs['loss']:.4f}")
        if 'eval_loss' in self.last_logs:
            parts.append(f"val {self.last_logs['eval_loss']:.4f}")
        for k, v in self.last_logs.items():
            if k.startswith('eval_') and k not in (
                'eval_loss', 'eval_runtime', 'eval_samples_per_second', 'eval_steps_per_second', 'epoch'
            ):
                try:
                    parts.append(f"{k.replace('eval_', '')} {float(v):.4f}")
                except Exception:
                    pass
        return " | ".join(parts)

    def on_step_end(self, args, state, control, **kwargs):
        n = min(self._step(state), self.pbar.total)
        if n > self.pbar.n:
            self.pbar.update(n - self.pbar.n)
        if self.last_logs:
            self.pbar.set_postfix_str(self._fmt_postfix(), refresh=False)
        self.pbar.refresh()

    def on_log(self, args, state, control, logs=None, **kwargs):
        if not logs:
            return
        for k, v in logs.items():
            if isinstance(v, (int, float)):
                self.last_logs[k] = float(v)
        if 'loss' in logs and isinstance(logs['loss'], (int, float)):
            self.last_train_loss = float(logs['loss'])

        self.pbar.set_postfix_str(self._fmt_postfix(), refresh=False)
        self.pbar.refresh()

        if any(k.startswith('eval_') for k in logs.keys()):
            step = self._step(state)
            if step in self.printed_eval_steps:
                return
            self.printed_eval_steps.add(step)

            train_loss_str = f"{self.last_train_loss:.10f}" if self.last_train_loss is not None else "n/a"
            val_loss = logs.get('eval_loss', None)
            val_loss_str = f"{float(val_loss):.10g}" if isinstance(val_loss, (int, float)) else "n/a"

            exclude = {'eval_loss', 'eval_runtime', 'eval_samples_per_second', 'eval_steps_per_second', 'epoch'}
            extra_parts = []
            for k, v in logs.items():
                if k.startswith('eval_') and k not in exclude:
                    metric_name = k.replace('eval_', '')
                    try:
                        extra_parts.append(f"val {metric_name}: {float(v):.10f}")
                    except Exception:
                        pass

            line = f"step: {step}, train loss: {train_loss_str}, val loss: {val_loss_str}"
            if extra_parts:
                line += ", " + ", ".join(extra_parts)
            tqdm.write(line)

    def on_train_end(self, args, state, control, **kwargs):
        n = min(self._step(state), self.pbar.total)
        if n > self.pbar.n:
            self.pbar.update(n - self.pbar.n)
        self.pbar.refresh()


class SingleModelMultiComboRegression:
    """
    Регрессионный пайплайн на одной мультимодальной модели (бэкенде) под заданные комбинации модальностей.

    Поддерживаемые комбинации (auto-подбор бэкенда):
      - ['text','image']         -> CLIP (HF)
      - ['text','audio']         -> CLAP (HF)
      - ['image','audio']        -> ClipWav2CLIP
      - ['text','image','audio'] -> ClipWav2CLIP

    Особенности:
      - Поддержка нескольких изображений/аудио на сэмпл ('concat' или 'mean' агрегация).
      - Обучение на больших данных: чанковая подстановка train_dataset, стабильный прогресс‑бар и логи.
    """
    def __init__(
        self,
        modalities: List[str],
        target_columns_names: List[str],
        text_columns: Optional[List[str]] = None,
        image_columns: Optional[List[str]] = None,
        audio_columns: Optional[List[str]] = None,
        backend: str = "auto",
        clip_checkpoint: str = "openai/clip-vit-base-patch32",
        clap_checkpoint: str = "laion/clap-htsat-unfused",
        fusion: str = "concat",
        freeze_backbone: bool = True,
        clip_max_length: int = 77,
        max_images_per_sample: int = 1,
        max_audios_per_sample: int = 1,
        image_agg: str = "concat",
        audio_agg: str = "concat"
    ):
        """
        Инициализация пайплайна регрессии.

        :param modalities: Активные модальности ('text','image','audio') в любом порядке.
        :param target_columns_names: Имена целевых колонок (num_targets = len(target_columns_names)).
        :param text_columns: Имена текстовых колонок (склеиваются).
        :param image_columns: Список колонок изображений (ячейки — значение или список значений).
        :param audio_columns: Список колонок аудио (ячейки — значение или список значений).
        :param backend: 'auto' | 'clip' | 'clap' | 'clip_wav2clip'.
        :param clip_checkpoint: Чекпоинт CLIP (HF).
        :param clap_checkpoint: Чекпоинт CLAP (HF).
        :param fusion: Тип фьюжна ('concat' или 'mean').
        :param freeze_backbone: Заморозить веса бэкенда (linear probing).
        :param clip_max_length: Максимальная длина токенов для CLIP.
        :param max_images_per_sample: Максимум изображений на сэмпл при агрегации.
        :param max_audios_per_sample: Максимум аудио на сэмпл при агрегации.
        :param image_agg: Агрегация изображений ('concat' | 'mean').
        :param audio_agg: Агрегация аудио ('concat' | 'mean').
        """
        self.modalities = sorted(list(set(modalities)))
        self.target_columns_names = target_columns_names
        self.num_targets = len(target_columns_names)
        self.text_columns = text_columns or []
        self.image_columns = image_columns or []
        self.audio_columns = audio_columns or []
        self.backend_name = backend
        self.clip_checkpoint = clip_checkpoint
        self.clap_checkpoint = clap_checkpoint
        self.fusion = fusion
        self.freeze_backbone = freeze_backbone
        self.clip_max_length = clip_max_length
        self.max_images_per_sample = int(max_images_per_sample)
        self.max_audios_per_sample = int(max_audios_per_sample)
        self.image_agg = image_agg
        self.audio_agg = audio_agg

        self.backend: Optional[BaseBackend] = None
        self.model: Optional[SingleBackboneRegressor] = None
        self.trainer: Optional[Trainer] = None
        self.compute_metrics = None
        self.progress_callback: Optional[TrainerCallback] = None

        self._build_backend()

    def _build_backend(self):
        """
        Инициализирует бэкенд согласно настройке backend/auto и проверяет совместимость с модальностями.

        :raises ValueError: Если комбинация модальностей не поддерживается выбранным бэкендом.
        """
        mods = set(self.modalities)
        name = self.backend_name
        if name == "auto":
            if mods == {"text", "image"}:
                name = "clip"
            elif mods == {"text", "audio"}:
                name = "clap"
            elif mods in ({"image", "audio"}, {"text", "image", "audio"}):
                name = "clip_wav2clip"
            else:
                raise ValueError(f"Неподдерживаемая комбинация: {mods}")

        if name == "clip":
            self.backend = ClipBackend(
                checkpoint=self.clip_checkpoint,
                max_length=self.clip_max_length,
                freeze=self.freeze_backbone,
                max_images=self.max_images_per_sample,
                image_agg=self.image_agg
            )
        elif name == "clap":
            self.backend = ClapBackend(
                checkpoint=self.clap_checkpoint,
                freeze=self.freeze_backbone,
                max_audios=self.max_audios_per_sample,
                audio_agg=self.audio_agg
            )
        elif name == "clip_wav2clip":
            self.backend = ClipWav2CLIPBackend(
                checkpoint=self.clip_checkpoint,
                max_length=self.clip_max_length,
                freeze=self.freeze_backbone,
                audio_sr=16000,
                max_images=self.max_images_per_sample,
                max_audios=self.max_audios_per_sample,
                image_agg=self.image_agg,
                audio_agg=self.audio_agg
            )
        else:
            raise ValueError(f"Неизвестный backend: {name}")

        if not set(self.modalities).issubset(self.backend.supported):
            raise ValueError(
                f"Бэкенд {self.backend.name} не поддерживает модальности {self.modalities}. "
                f"Поддерживает: {self.backend.supported}"
            )

    def _validate_data(self, df: pd.DataFrame):
        """
        Проверяет соответствие колонок в DataFrame выбранным модальностям и целям.

        :param df: Исходный датафрейм.
        :raises ValueError: При отсутствии обязательных колонок.
        """
        missing_targets = [c for c in self.target_columns_names if c not in df.columns]
        if missing_targets:
            raise ValueError(f"В DataFrame отсутствуют целевые колонки: {missing_targets}")

        if "text" in self.modalities:
            if not self.text_columns:
                raise ValueError("Вы выбрали модальность 'text', но text_columns пустой.")
            missing = [c for c in self.text_columns if c not in df.columns]
            if missing:
                raise ValueError(f"В DataFrame отсутствуют текстовые колонки: {missing}")

        if "image" in self.modalities:
            if not self.image_columns:
                raise ValueError("Вы выбрали модальность 'image', но image_columns пуст.")
            missing = [c for c in self.image_columns if c not in df.columns]
            if missing:
                raise ValueError(f"В DataFrame отсутствуют колонки изображений: {missing}")

        if "audio" in self.modalities:
            if not self.audio_columns:
                raise ValueError("Вы выбрали модальность 'audio', но audio_columns пуст.")
            missing = [c for c in self.audio_columns if c not in df.columns]
            if missing:
                raise ValueError(f"В DataFrame отсутствуют колонки аудио: {missing}")

    def _split(self, df: pd.DataFrame, test_size: float = 0.2, seed: int = 42):
        """
        Перемешивает и делит DataFrame на обучающую и валидационную части.

        :param df: Исходный датафрейм.
        :param test_size: Доля валидации (0..1).
        :param seed: Зерно для перемешивания.
        :return: (df_train, df_eval).
        """
        df = df.sample(frac=1.0, random_state=seed).reset_index(drop=True)
        n_eval = int(math.ceil(len(df) * test_size))
        return df.iloc[n_eval:].reset_index(drop=True), df.iloc[:n_eval].reset_index(drop=True)

    def _setup_compute_metrics(self, metric_name: str):
        """
        Создаёт функцию подсчёта метрик для Trainer.

        Поддерживаемые итоговые метрики:
          - 'rmse' (минимизируется),
          - 'mae'  (минимизируется),
          - 'mse'  (минимизируется),
          - 'r2'   (максимизируется).

        :param metric_name: Название основной метрики.
        """
        name = metric_name.lower()

        def compute(p):
            preds = np.asarray(p.predictions)
            refs  = np.asarray(p.label_ids)

            # гарантируем форму [N, T]
            if preds.ndim == 1: preds = preds[:, None]
            if refs.ndim  == 1: refs  = refs[:,  None]

            T = min(preds.shape[1], refs.shape[1])
            mse_list, mae_list, r2_list = [], [], []

            for t in range(T):
                y_true = refs[:, t].astype(np.float64)
                y_pred = preds[:, t].astype(np.float64)

                err = y_pred - y_true
                mse = float(np.mean(err**2))
                mae = float(np.mean(np.abs(err)))

                # R^2: 1 - SS_res/SS_tot; если дисперсия нулевая, даём 0.0 (безопасный фолбэк)
                var = float(np.var(y_true))
                if var == 0.0:
                    r2 = 0.0
                else:
                    ss_res = float(np.sum(err**2))
                    ss_tot = float(np.sum((y_true - np.mean(y_true))**2))
                    r2 = 1.0 - (ss_res / ss_tot if ss_tot > 0 else 0.0)

                mse_list.append(mse)
                mae_list.append(mae)
                r2_list.append(r2)

            mse_avg = float(np.mean(mse_list))
            rmse_avg = float(np.sqrt(mse_avg))
            mae_avg = float(np.mean(mae_list))
            r2_avg = float(np.mean(r2_list))
            return {"rmse": rmse_avg, "mse": mse_avg, "mae": mae_avg, "r2": r2_avg}

        self.compute_metrics = compute
        self._primary_metric = name
        self._greater_is_better = True if name == "r2" else False

    def fit(
        self,
        train_data: pd.DataFrame,
        epochs: int = 3,
        test_size: float = 0.2,
        per_device_train_batch_size: int = 16,
        gradient_accumulation_steps: int = 1,
        learning_rate: float = 2e-4,
        metric_name: str = "rmse",
        fp16: bool = True,
        logging_steps: int = 50,
        eval_steps: int = 200,
        output_dir: str = "./reg_result",
        seed: int = 42,
        hidden: int = 512,
        dropout: float = 0.1,
        fit_chunk_size: Optional[int] = None
    ):
        """
        Обучает регрессионную голову поверх выбранного бэкенда.
        Поддерживает обучение на больших данных за счёт чанков: train_dataset подставляется кусками.

        :param train_data: Данные обучения с необходимыми колонками модальностей и целями.
        :param epochs: Количество эпох обучения.
        :param test_size: Доля валидации.
        :param per_device_train_batch_size: Размер батча на устройство.
        :param gradient_accumulation_steps: Шаги аккумуляции градиентов.
        :param learning_rate: Learning rate для AdamW.
        :param metric_name: Основная метрика ('rmse' | 'mae' | 'mse' | 'r2') для выбора лучшей модели.
        :param fp16: Использовать ли fp16 при наличии CUDA (если доступен bf16 — он будет использован вместо fp16).
        :param logging_steps: Периодичность логирования.
        :param eval_steps: Периодичность валидации/сохранения чекпоинтов.
        :param output_dir: Папка для логов/чекпоинтов.
        :param seed: Зерно.
        :param hidden: Размер скрытого слоя головы.
        :param dropout: Дропаут в голове.
        :param fit_chunk_size: Размер чанка обучающей выборки. Если None — весь train как один чанк.
        :return: self.
        :raises ValueError: При проблемах с данными или параметрами.
        """
        self._validate_data(train_data)
        set_seed(seed)

        df_train, df_eval = self._split(train_data, test_size=test_size, seed=seed)

        # Датасеты: валидацию держим целиком, train будем подставлять чанками
        ds_eval = MultiComboRegDataset(df_eval, self.target_columns_names, self.text_columns, self.image_columns, self.audio_columns)

        self.model = SingleBackboneRegressor(
            backend=self.backend,
            modalities=self.modalities,
            num_targets=self.num_targets,
            fusion=self.fusion,
            hidden=hidden,
            dropout=dropout
        )

        self._setup_compute_metrics(metric_name)

        # Настройки точности (bf16, если доступен, иначе fp16 при флаге fp16=True)
        bf16_ok = bool(torch.cuda.is_available() and torch.cuda.get_device_capability()[0] >= 8)

        args = TrainingArguments(
            output_dir=output_dir,
            num_train_epochs=epochs,
            per_device_train_batch_size=per_device_train_batch_size,
            per_device_eval_batch_size=max(4, per_device_train_batch_size // 2),
            gradient_accumulation_steps=gradient_accumulation_steps,
            eval_accumulation_steps=max(1, gradient_accumulation_steps * 2),
            learning_rate=learning_rate,
            warmup_ratio=0.1,
            lr_scheduler_type="cosine",
            weight_decay=0.01,
            eval_strategy="steps",            # <- по вашей просьбе
            eval_steps=eval_steps,
            save_strategy="steps",
            save_steps=eval_steps,
            load_best_model_at_end=True,
            metric_for_best_model=f"eval_{self._primary_metric}",
            greater_is_better=self._greater_is_better,
            save_total_limit=1,
            logging_strategy="steps",
            logging_steps=logging_steps,
            report_to="none",
            fp16=bool(fp16 and torch.cuda.is_available() and not bf16_ok),
            bf16=bool(bf16_ok and not fp16),
            dataloader_num_workers=0,
            seed=seed,
            remove_unused_columns=False,
            disable_tqdm=True  # используем внешний tqdm
        )

        def data_collator(batch_list):
            """
            Collate-хук для Trainer: делегирует сборку батча в backend.collate().

            :param batch_list: Список элементов датасета.
            :return: Батч словаря {'labels', 'backend_inputs'}.
            """
            return self.backend.collate(batch_list)

        # Вспомогательные функции для чанков
        def steps_for_size(sz: int, bsz: int, accum: int) -> int:
            """
            Оценивает число оптимизационных шагов на чанке размера sz.

            :param sz: Количество примеров в чанке.
            :param bsz: Размер батча.
            :param accum: Шаги аккумуляции.
            :return: Число оптимизационных шагов.
            """
            return max(0, math.ceil(math.ceil(sz / max(1, bsz)) / max(1, accum)))

        def chunk_slices(index_array: np.ndarray, chunk_size: int):
            """
            Генератор срезов индексов по chunk_size.

            :param index_array: Индексы обучающей выборки.
            :param chunk_size: Размер чанка.
            :yield: Срез индексов.
            """
            for i in range(0, len(index_array), chunk_size):
                yield index_array[i:i + chunk_size]

        # Индексы train
        n_train = len(df_train)
        rng = np.random.default_rng(seed)
        train_idx = np.arange(n_train)

        # Чанк по умолчанию — весь train
        chunk_size = fit_chunk_size if (fit_chunk_size and fit_chunk_size > 0) else len(train_idx)

        # Предварительный рассчёт общего числа шагов (для прогресс‑бара и планировщика)
        total_steps = 0
        for _ in range(epochs):
            rng.shuffle(train_idx)
            for slc in chunk_slices(train_idx, chunk_size):
                total_steps += steps_for_size(len(slc), per_device_train_batch_size, gradient_accumulation_steps)

        # Инициализация Trainer с «пустым» train датасетом (минимальный чанк), чтобы не держать весь train
        dummy_idx = np.arange(min(len(df_train), 1))
        ds_train_init = MultiComboRegDataset(
            df_train.iloc[dummy_idx], self.target_columns_names, self.text_columns, self.image_columns, self.audio_columns
        ) if len(dummy_idx) > 0 else ds_eval

        self.trainer = MSETrainer(
            model=self.model,
            args=args,
            train_dataset=ds_train_init,
            eval_dataset=ds_eval,
            data_collator=data_collator,
            compute_metrics=self.compute_metrics
        )
        # Убираем стандартный принтер, чтобы не мешал внешнему tqdm
        try:
            self.trainer.remove_callback(PrinterCallback)
        except Exception:
            pass

        # Планировщик на рассчитанное количество шагов
        if total_steps > 0:
            self.trainer.create_optimizer_and_scheduler(num_training_steps=total_steps)

        # Внешний прогресс‑бар + консольный лог
        pbar = tqdm(total=total_steps, desc="Training Progress", unit="step")
        cb = PbarConsoleLogger(pbar)
        self.trainer.add_callback(cb)
        self.progress_callback = cb

        # Основной цикл обучения по эпохам и чанкам
        steps_done = 0
        for ep in range(epochs):
            rng = np.random.default_rng(seed + ep)
            shuffled = np.arange(n_train)
            rng.shuffle(shuffled)

            for slc in chunk_slices(shuffled, chunk_size):
                # Подставляем чанк
                chunk_df = df_train.iloc[slc]
                ds_chunk = MultiComboRegDataset(
                    chunk_df, self.target_columns_names, self.text_columns, self.image_columns, self.audio_columns
                )
                self.trainer.train_dataset = ds_chunk

                # Считаем шаги на чанке и настраиваем max_steps Trainer
                chunk_steps = steps_for_size(len(slc), per_device_train_batch_size, gradient_accumulation_steps)
                if chunk_steps == 0:
                    del ds_chunk, chunk_df
                    continue

                self.trainer.args.max_steps = steps_done + chunk_steps
                self.trainer.train()
                steps_done += chunk_steps

                # Очистка памяти между чанками
                del ds_chunk, chunk_df
                gc.collect()
                if torch.cuda.is_available():
                    torch.cuda.empty_cache()

        pbar.close()
        return self

    def predict(self, df: pd.DataFrame) -> np.ndarray:
        """
        Делает предсказания регрессионных таргетов на новых данных.

        :param df: Датафрейм с нужными колонками модальностей (целевые колонки могут отсутствовать).
        :return: Массив предсказаний формы [N, T].
        :raises RuntimeError: Если модель ещё не обучена.
        """
        if self.trainer is None:
            raise RuntimeError("Модель не обучена. Вызовите .fit().")
        df_c = df.copy()
        for c in self.target_columns_names:
            if c not in df_c.columns:
                df_c[c] = 0.0
        ds = MultiComboRegDataset(df_c, self.target_columns_names, self.text_columns, self.image_columns, self.audio_columns)
        preds = self.trainer.predict(test_dataset=ds)
        return preds.predictions

    def get_embeddings(self, df: pd.DataFrame, batch_size: int = 32, return_per_modality: bool = False):
        """
        Извлекает эмбеддинги для новых данных (fused и, опционально, по модальностям).

        :param df: Датафрейм с нужными колонками модальностей.
        :param batch_size: Размер батча для извлечения эмбеддингов.
        :param return_per_modality: Вернуть также по-модальные эмбеддинги.
        :return: fused [N, D_fused] или (fused, {'text':[N,*], 'image':[N,*], 'audio':[N,*]}).
        :raises RuntimeError: Если модель ещё не обучена.
        """
        if self.trainer is None or self.model is None:
            raise RuntimeError("Модель не обучена. Вызовите .fit().")

        # Определяем девайс модели
        try:
            device = next(self.trainer.model.parameters()).device
        except StopIteration:
            device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        self.model.to(device).eval()

        df_c = df.copy()
        for c in self.target_columns_names:
            if c not in df_c.columns:
                df_c[c] = 0.0

        ds = MultiComboRegDataset(
            df_c,
            self.target_columns_names,
            self.text_columns,
            self.image_columns,
            self.audio_columns
        )

        def collate(batch_list):
            """
            Collate для DataLoader при извлечении эмбеддингов (использует backend.collate()).

            :param batch_list: Элементы датасета.
            :return: Батч {'labels', 'backend_inputs'}.
            """
            return self.backend.collate(batch_list)

        def move_to_device(obj, device):
            """
            Рекурсивно переносит тензоры в структуре на заданный девайс.

            :param obj: Тензор/словарь/список/кортеж/прочее.
            :param device: torch.device ('cpu' или 'cuda').
            :return: Объект с перенесёнными тензорами.
            """
            if torch.is_tensor(obj):
                return obj.to(device)
            if isinstance(obj, dict):
                return {k: move_to_device(v, device) for k, v in obj.items()}
            if isinstance(obj, (list, tuple)):
                t = [move_to_device(v, device) for v in obj]
                return type(obj)(t) if not isinstance(obj, list) else t
            return obj

        loader = DataLoader(ds, batch_size=batch_size, shuffle=False, collate_fn=collate)
        fused_list = []
        per_mod_lists = {m: [] for m in self.modalities} if return_per_modality else None

        with torch.no_grad():
            for batch in loader:
                bi = move_to_device(batch["backend_inputs"], device)
                fused, per = self.model.get_embeddings(backend_inputs=bi, return_per_modality=True)
                fused_list.append(fused.cpu().numpy())
                if return_per_modality:
                    for m in per_mod_lists.keys():
                        if m in per:
                            per_mod_lists[m].append(per[m].cpu().numpy())

        fused_arr = np.vstack(fused_list)
        if not return_per_modality:
            return fused_arr
        per_mod = {m: np.vstack(chunks) for m, chunks in per_mod_lists.items()}
        return fused_arr, per_mod

Создание фиктивных данных.

In [None]:
import os, random
import numpy as np
import pandas as pd
from PIL import Image

import torch
import torchaudio

HAVE_TORCHAUDIO = True

random.seed(42); np.random.seed(42); torch.manual_seed(42)

BASE_DIR = "./dummy_data"
IMG_DIR  = os.path.join(BASE_DIR, "images")
AUD_DIR  = os.path.join(BASE_DIR, "audio")
os.makedirs(IMG_DIR, exist_ok=True)
os.makedirs(AUD_DIR, exist_ok=True)

# Сколько элементов на модальность в каждой строке
K_PER_MODALITY = 3

def make_dummy_images(n=12, size=(256, 256)):
    paths = []
    for i in range(n):
        color = tuple(np.random.randint(0, 255, size=3).tolist())
        img = Image.new("RGB", size, color=color)
        path = os.path.join(IMG_DIR, f"img_{i:02d}.png")
        img.save(path)
        paths.append(path)
    return paths

def make_dummy_audios(n=12, sr=48000, duration_sec=0.6):
    if not HAVE_TORCHAUDIO:
        raise RuntimeError("Нельзя сгенерировать аудио без torchaudio. Установите 'pip install torchaudio'.")
    paths = []
    t = torch.linspace(0, duration_sec, int(sr * duration_sec))
    for i in range(n):
        freq = random.choice([220, 330, 440, 550, 660, 880])
        wave = 0.2 * torch.sin(2 * np.pi * freq * t)  # амплитуда 0.2
        wave = wave.unsqueeze(0)  # [1, T] mono
        path = os.path.join(AUD_DIR, f"tone_{i:02d}.wav")
        torchaudio.save(path, wave, sample_rate=sr)
        paths.append(path)
    return paths

img_paths = make_dummy_images(n=12)
audio_paths = make_dummy_audios(n=12) if HAVE_TORCHAUDIO else []

# Вспомогательные тексты
TITLES = ["Red fox", "Blue sky", "Green field", "Yellow sun", "Purple rain", "Silver line"]
BODIES = ["quick brown", "lazy dog", "jumps high", "runs fast", "stays calm", "shines bright"]
QUERIES = ["find tone", "classify sound", "describe image", "retrieve pair", "detect event"]

def rand_title(): return random.choice(TITLES)
def rand_body(): return random.choice(BODIES)
def rand_query(): return random.choice(QUERIES)

# Универсальные хелперы
def sample_k(seq, k):
    if len(seq) >= k:
        return random.sample(seq, k)  # без повторов
    else:
        return [random.choice(seq) for _ in range(k)]  # с повторами, если мало исходников

def as_cols(prefix, values):
    # {"prefix_1": values[0], ..., "prefix_k": values[k-1]}
    return {f"{prefix}_{i+1}": v for i, v in enumerate(values)}

def pick_text_desc(k=K_PER_MODALITY):
    # Текстовое описание: "Title | body"
    vals = [f"{rand_title()} | {rand_body()}" for _ in range(k)]
    return as_cols("text", vals)

def pick_text_query(k=K_PER_MODALITY):
    vals = [rand_query() for _ in range(k)]
    return as_cols("text", vals)

def pick_images(k=K_PER_MODALITY):
    vals = sample_k(img_paths, k)
    return as_cols("image_path", vals)

def pick_audios(k=K_PER_MODALITY):
    vals = sample_k(audio_paths, k)
    return as_cols("audio_path", vals)

# 1) Текст + Картинка -> по 3 текстовых и 3 картинок
def build_df_text_image(n=24, k=K_PER_MODALITY):
    rows = []
    for _ in range(n):
        row = {}
        row.update(pick_text_desc(k))
        row.update(pick_images(k))
        row["y1"] = random.uniform(-10, 10)
        row["y2"] = random.uniform(-10, 10)
        row["y3"] = random.uniform(-10, 10)
        rows.append(row)
    return pd.DataFrame(rows)

# 2) Текст + Звук -> по 3 текста (query) и 3 аудио
def build_df_text_audio(n=24, k=K_PER_MODALITY):
    if not HAVE_TORCHAUDIO:
        raise RuntimeError("Для text+audio нужен torchaudio.")
    rows = []
    for _ in range(n):
        row = {}
        row.update(pick_text_query(k))
        row.update(pick_audios(k))
        row["y1"] = random.uniform(-10, 10)
        row["y2"] = random.uniform(-10, 10)
        row["y3"] = random.uniform(-10, 10)
        rows.append(row)
    return pd.DataFrame(rows)

# 3) Картинка + Звук -> по 3 картинки и 3 аудио
def build_df_image_audio(n=24, k=K_PER_MODALITY):
    if not HAVE_TORCHAUDIO:
        raise RuntimeError("Для image+audio нужен torchaudio.")
    rows = []
    for _ in range(n):
        row = {}
        row.update(pick_images(k))
        row.update(pick_audios(k))
        row["y1"] = random.uniform(-10, 10)
        row["y2"] = random.uniform(-10, 10)
        row["y3"] = random.uniform(-10, 10)
        rows.append(row)
    return pd.DataFrame(rows)

# 4) Текст + Картинка + Звук -> по 3 на каждую модальность
def build_df_text_image_audio(n=24, k=K_PER_MODALITY):
    if not HAVE_TORCHAUDIO:
        raise RuntimeError("Для text+image+audio нужен torchaudio.")
    rows = []
    for _ in range(n):
        row = {}
        row.update(pick_text_desc(k))
        row.update(pick_images(k))
        row.update(pick_audios(k))
        row["y1"] = random.uniform(-10, 10)
        row["y2"] = random.uniform(-10, 10)
        row["y3"] = random.uniform(-10, 10)
        rows.append(row)
    return pd.DataFrame(rows)

# Собираем 4 датасета
df_text_image = build_df_text_image(243, K_PER_MODALITY)
df_text_audio = build_df_text_audio(100, K_PER_MODALITY) if HAVE_TORCHAUDIO else None
df_image_audio = build_df_image_audio(300, K_PER_MODALITY) if HAVE_TORCHAUDIO else None
df_text_image_audio  = build_df_text_image_audio(190, K_PER_MODALITY) if HAVE_TORCHAUDIO else None

print("df_text_image columns:", list(df_text_image.columns))
if HAVE_TORCHAUDIO:
    print("df_text_audio columns:", list(df_text_audio.columns))
    print("df_image_audio columns:", list(df_image_audio.columns))
    print("df_text_image_audio columns:", list(df_text_image_audio.columns))

Пример использования.

In [None]:
pipeline = SingleModelMultiComboRegression(
    modalities=["text", "image", "audio"],
    target_columns_names=["y1", "y2"],
    text_columns=["text_1", "text_2"],
    image_columns=["image_path_1", "image_path_2"],
    audio_columns=["audio_path_1", "audio_path_2"],
    backend="auto",
    clip_checkpoint="openai/clip-vit-base-patch32",
    fusion="concat",
    freeze_backbone=True,
)
pipeline.fit(
    df_text_image_audio,
    epochs=3,
    per_device_train_batch_size=8,
    logging_steps=10,
    eval_steps=10,
    fit_chunk_size=30
)
preds = pipeline.predict(df_text_image_audio[:5])
embeddings = pipeline.get_embeddings(df_text_image_audio[:5])

print(preds)
print(embeddings)