In [None]:
import os
from tqdm import tqdm
import json
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
import torchvision.transforms.functional as F
import random
try:
    import timm  # Добавим библиотеку timm для эффективных моделей
except ImportError:
    import subprocess
    subprocess.check_call(["pip", "install", "timm"])
    import timm
from PIL import Image
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_recall_fscore_support, accuracy_score
from tqdm import tqdm
import copy
import time
import timm  # Добавим библиотеку timm для эффективных моделей

In [None]:
# Определение путей к данным
# CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))
DATA_DIR = '/kaggle/input/colors/dataset_colors'
TRAIN_DATA_DIR = '/kaggle/input/colors/dataset_colors/train_data'
TEST_DATA_DIR = '/kaggle/input/colors/dataset_colors/test_data'
TRAIN_CSV = '/kaggle/input/colors/dataset_colors/train_data.csv'
TEST_CSV = '/kaggle/input/colors/dataset_colors/test_data.csv'

In [None]:
TRANSLIT_TO_RU = {
    'bezhevyi': 'бежевый',
    'belyi': 'белый',
    'biryuzovyi': 'бирюзовый',
    'bordovyi': 'бордовый',
    'goluboi': 'голубой',
    'zheltyi': 'желтый',
    'zelenyi': 'зеленый',
    'zolotoi': 'золотой',
    'korichnevyi': 'коричневый',
    'krasnyi': 'красный',
    'oranzhevyi': 'оранжевый',
    'raznocvetnyi': 'разноцветный',
    'rozovyi': 'розовый',
    'serebristyi': 'серебряный',
    'seryi': 'серый',
    'sinii': 'синий',
    'fioletovyi': 'фиолетовый',
    'chernyi': 'черный'
}

# Создаем обратный маппинг с русского на транслит
RU_TO_TRANSLIT = {v: k for k, v in TRANSLIT_TO_RU.items()}

In [None]:
# Маппинг цветов
COLORS = {
    'бежевый': 'beige',
    'белый': 'white',
    'бирюзовый': 'turquoise',
    'бордовый': 'burgundy',
    'голубой': 'blue',
    'желтый': 'yellow',
    'зеленый': 'green',
    'золотой': 'gold',
    'коричневый': 'brown',
    'красный': 'red',
    'оранжевый': 'orange',
    'разноцветный': 'variegated',
    'розовый': 'pink',
    'серебряный': 'silver',
    'серый': 'gray',
    'синий': 'blue',
    'фиолетовый': 'purple',
    'черный': 'black'
}

# Создаем обратный словарь для получения индекса по цвету
COLOR_INDICES = {c: i for i, c in enumerate(COLORS.keys())}

CATEGORIES = ['одежда для девочек', 'столы', 'стулья', 'сумки']

In [None]:
class ProductDataset(Dataset):
    def __init__(self, df, img_dir, transform=None, is_test=False):
        self.df = df.reset_index(drop=True)  # Сбрасываем индексы после фильтрации
        self.img_dir = img_dir
        self.transform = transform
        self.is_test = is_test
        self.category_to_idx = {cat: idx for idx, cat in enumerate(CATEGORIES)}
        self.color_to_idx = {color: idx for idx, color in enumerate(COLORS.keys())}
        
        # Проверяем все пути к изображениям заранее
        self.valid_indices = []
        
        # Если в датафрейме есть путь к изображению, используем его
        if 'image_path' in self.df.columns:
            for idx in range(len(df)):
                if pd.notna(self.df.iloc[idx]['image_path']):
                    # Проверяем, что файл действительно существует
                    img_path = self.df.iloc[idx]['image_path']
                    if os.path.exists(img_path):
                        self.valid_indices.append(idx)
                    else:
                        print(f"Предупреждение: файл {img_path} не существует")
                else:
                    # Пробуем найти файл с разными расширениями
                    img_id = df.iloc[idx]['id']
                    extensions = ['.jpg', '.jpeg', '.png', '.JPG', '.JPEG', '.PNG']
                    found = False
                    
                    for ext in extensions:
                        img_path = os.path.join(self.img_dir, f"{img_id}{ext}")
                        if os.path.exists(img_path):
                            self.df.at[idx, 'image_path'] = img_path
                            self.valid_indices.append(idx)
                            found = True
                            break
                    
                    if not found:
                        print(f"Не найдено изображение для id: {img_id}")
        else:
            # Проверяем разные расширения файлов
            for idx in range(len(df)):
                found = False
                img_id = df.iloc[idx]['id']
                extensions = ['.jpg', '.jpeg', '.png', '.JPG', '.JPEG', '.PNG']
                
                for ext in extensions:
                    img_path = os.path.join(self.img_dir, f"{img_id}{ext}")
                    if os.path.exists(img_path):
                        found = True
                        break
                
                if found:
                    self.valid_indices.append(idx)
                else:
                    print(f"Не найдено изображение для id: {img_id}")
        
        # Выводим первые 5 и последние 5 индексов для проверки
        if len(self.valid_indices) > 0:
            print(f"Первые 5 валидных индексов: {self.valid_indices[:5]}")
            if len(self.valid_indices) > 5:
                print(f"Последние 5 валидных индексов: {self.valid_indices[-5:]}")
            
            # Вывод путей для проверки
            for i in range(min(5, len(self.valid_indices))):
                idx = self.valid_indices[i]
                img_id = self.df.iloc[idx]['id']
                if 'image_path' in self.df.columns and pd.notna(self.df.iloc[idx]['image_path']):
                    print(f"ID: {img_id}, путь: {self.df.iloc[idx]['image_path']}")
                else:
                    print(f"ID: {img_id}, путь не сохранен")
        
        if len(self.valid_indices) == 0:
            # Дополнительная отладочная информация
            print(f"Проверяем доступность директории: {img_dir}, существует: {os.path.exists(img_dir)}")
            if os.path.exists(img_dir):
                # Вывести список файлов в директории
                files = os.listdir(img_dir)
                print(f"Первые 10 файлов в директории: {files[:10] if len(files) > 0 else 'директория пуста'}")
                
                # Проверка первых 5 записей датафрейма
                for idx in range(min(5, len(df))):
                    img_id = df.iloc[idx]['id']
                    print(f"Проверка ID: {img_id}")
                    for ext in ['.jpg', '.jpeg', '.png', '.JPG', '.JPEG', '.PNG']:
                        img_path = os.path.join(img_dir, f"{img_id}{ext}")
                        print(f"  Путь {img_path} существует: {os.path.exists(img_path)}")
            
            raise ValueError(f"Не найдено ни одного изображения в директории {img_dir}")
        
        print(f"Найдено {len(self.valid_indices)} валидных изображений из {len(df)}")
        
    def __len__(self):
        return len(self.valid_indices)
    
    def __getitem__(self, idx):
        real_idx = self.valid_indices[idx]
        img_id = self.df.iloc[real_idx]['id']
        
        # Используем сохраненный путь если он есть, иначе ищем файл
        if 'image_path' in self.df.columns and pd.notna(self.df.iloc[real_idx]['image_path']):
            img_path = self.df.iloc[real_idx]['image_path']
        else:
            # Проверяем разные расширения файлов
            extensions = ['.jpg', '.jpeg', '.png', '.JPG', '.JPEG', '.PNG']
            img_path = None
            
            for ext in extensions:
                path = os.path.join(self.img_dir, f"{img_id}{ext}")
                if os.path.exists(path):
                    img_path = path
                    break
                    
            if img_path is None:
                # Если не найдено, используем стандартный путь (вызовет ошибку при отсутствии файла)
                img_path = os.path.join(self.img_dir, f"{img_id}.jpg")
        
        try:
            image = Image.open(img_path).convert('RGB')
        except Exception as e:
            print(f"Ошибка при загрузке изображения {img_path}: {str(e)}")
            raise
        
        if self.transform:
            try:
                image = self.transform(image)
            except Exception as e:
                print(f"Ошибка при применении трансформации к {img_path}: {str(e)}")
                raise
            
        category = self.category_to_idx[self.df.iloc[real_idx]['category']]
        
        if not self.is_test:
            color_translit = self.df.iloc[real_idx]['target']
            color_ru = TRANSLIT_TO_RU[color_translit]
            color = self.color_to_idx[color_ru]
            return image, category, color
        
        return image, category, img_id

In [None]:
class ColorClassifier(nn.Module):
    def __init__(self, num_colors, num_categories):
        super().__init__()
        # Используем более легкий и быстрый вариант ViT
        self.backbone = timm.create_model(
            'beitv2_large_patch16_224', 
            pretrained=True, 
            num_classes=0,  # Без верхнего слоя классификации
        )
        
        # Фиксируем большую часть весов для ускорения обучения
        for param in list(self.backbone.parameters())[:-30]:
            param.requires_grad = False
            
        # Расширение для быстрой инференции с помощью кэширования
        self.backbone.reset_classifier(0)
        
        # Размерность характеристик модели
        self.feature_dim = self.backbone.embed_dim  # Для vit_tiny это 192
        
        # Эмбеддинг категории
        self.category_embedding = nn.Embedding(num_categories, 32)
        
        # Классификационная голова
        self.classifier = nn.Sequential(
            nn.Linear(self.feature_dim + 32, 256),  # Меньше параметров для ускорения
            nn.ReLU(),
            nn.Dropout(0.2),  # Меньше дропаут для более быстрой сходимости
            nn.Linear(256, num_colors)
        )
        
        # Для оптимизации torch.jit
        self.example_input = torch.zeros(1, 3, 224, 224)
        self.example_category = torch.LongTensor([0])
        
    def forward(self, x, category):
        features = self.backbone(x)
        
        category_emb = self.category_embedding(category)
        combined = torch.cat([features, category_emb], dim=1)
        
        return self.classifier(combined)

    def optimize_for_inference(self, device):
        """Оптимизирует модель для быстрой инференции"""
        self.eval()
        # Использование TorchScript для оптимизации
        return torch.jit.trace((self.example_input.to(device), self.example_category.to(device)), self)

In [None]:
def load_model(model_path):
    """
    Загружает ранее обученную модель из указанного пути.
    Если модель была сохранена как TorchScript, загрузит её как таковую.
    Функция вызывается только один раз.
    """
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    
    # Проверяем формат сохраненной модели
    try:
        # Пробуем загрузить как TorchScript модель
        model = torch.jit.load(model_path, map_location=device)
        print("Загружена оптимизированная TorchScript модель")
        return model
    except:
        # Загружаем как обычную модель
        print("Загружаем модель из стандартных весов...")
        model = ColorClassifier(len(COLORS), len(CATEGORIES))
        
        checkpoint = torch.load(model_path, map_location=device)
        if isinstance(checkpoint, dict) and 'model_state_dict' in checkpoint:
            model.load_state_dict(checkpoint['model_state_dict'])
        else:
            model.load_state_dict(checkpoint)
        
        # Установка модели в режим оценки (инференса)
        model.eval()
        
        model = model.to(device)
        
        return model

In [None]:
def predict(model, test_loader, seed=42, use_transliteration=True):
    """
    Запускает модель на тестовом датасете и возвращает предсказания.
    
    Args:
        model: Обученная модель
        test_loader: DataLoader с тестовыми данными
        seed: Seed для воспроизводимости
        use_transliteration: Если True, возвращает цвета в транслитерации
        
    Returns:
        DataFrame с предсказаниями
    """
    # Устанавливаем сид для воспроизводимости
    torch.manual_seed(seed)
    np.random.seed(seed)
    random.seed(seed)
    
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"Используется устройство: {device}")
    
    model = model.to(device)
    model.eval()
    
    predictions = []
    
    # Получаем список цветов
    color_list = list(COLORS.keys())
    
    # Замер времени на один батч
    batch_times = []
    
    with torch.no_grad():
        for images, categories, img_ids in tqdm(test_loader, desc="Инференс"):
            images = images.to(device)
            categories = categories.to(device)
            
            # Замер времени
            start_time = time.time()
            outputs = model(images, categories)
            end_time = time.time()
            
            inference_time = (end_time - start_time) * 1000  # мс
            batch_times.append(inference_time / len(images))  # время на один образец
            
            probs = torch.softmax(outputs, dim=1).cpu().numpy()
            
            for img_id, category_idx, prob in zip(img_ids, categories.cpu().numpy(), probs):
                # Преобразуем id из тензора в обычное значение
                if isinstance(img_id, torch.Tensor):
                    img_id = img_id.item()  # для числовых id
                # Если id является строкой в тензоре:
                elif hasattr(img_id, 'item') and not isinstance(img_id, (int, float, str)):
                    img_id = str(img_id)
                
                # Получаем категорию товара
                category = CATEGORIES[category_idx]
                
                # Формируем словарь вероятностей для каждого цвета
                if use_transliteration:
                    # Используем транслитерацию для ключей словаря
                    pred_dict = {RU_TO_TRANSLIT[color]: float(p) for color, p in zip(color_list, prob)}
                    # Определяем цвет с максимальной вероятностью
                    pred_color = RU_TO_TRANSLIT[color_list[np.argmax(prob)]]
                else:
                    # Используем русские названия цветов
                    pred_dict = {color: float(p) for color, p in zip(color_list, prob)}
                    pred_color = color_list[np.argmax(prob)]
                
                predictions.append({
                    'id': img_id,
                    'category': category,
                    'predict_proba': json.dumps(pred_dict),
                    'predict_color': pred_color
                })
                
    # Вывод среднего времени инференции
    if batch_times:
        avg_time = sum(batch_times) / len(batch_times)
        print(f"Среднее время инференции на одно изображение: {avg_time:.2f} мс")
    
    # Создаем DataFrame с предсказаниями
    predictions_df = pd.DataFrame(predictions)
    return predictions_df

In [None]:
def predict_test_dataset(test_df, model_path, test_data_dir=TEST_DATA_DIR, seed=42):
    """
    Функция для предсказания цветов на тестовом датасете.
    
    Args:
        test_df: DataFrame с тестовыми данными
        model_path: Путь к сохраненной модели
        test_data_dir: Директория с тестовыми изображениями
        seed: Seed для воспроизводимости
    
    Returns:
        DataFrame с предсказаниями в требуемом формате
    """
    # Устанавливаем сид для воспроизводимости
    set_seed(seed)
    
    # Загружаем модель
    model = load_model(model_path)
    
    # Проверяем наличие каталога с тестовыми данными
    if not os.path.exists(test_data_dir):
        print(f"Предупреждение: директория {test_data_dir} не существует!")
        # Попробуем найти правильный путь
        parent_dir = os.path.dirname(test_data_dir)
        if os.path.exists(parent_dir):
            subdirs = [d for d in os.listdir(parent_dir) if os.path.isdir(os.path.join(parent_dir, d))]
            print(f"Доступные подкаталоги в {parent_dir}: {subdirs}")
    else:
        # Вывести первые несколько файлов для проверки
        files = os.listdir(test_data_dir)
        print(f"Найдено {len(files)} файлов в {test_data_dir}")
        print(f"Примеры файлов: {files[:5] if files else 'директория пуста'}")
    
    # Проверяем наличие файлов изображений с разными расширениями
    def check_image_exists(row, data_dir):
        # Проверяем разные расширения файлов
        extensions = ['.jpg', '.jpeg', '.png', '.JPG', '.JPEG', '.PNG']
        img_id = row['id']
        
        for ext in extensions:
            img_path = os.path.join(data_dir, f"{img_id}{ext}")
            if os.path.exists(img_path):
                # Сохраняем найденный путь в строке датафрейма для дальнейшего использования
                row['image_path'] = img_path
                return True
        
        # Дополнительная проверка на случай, если файл существует без расширения
        img_path = os.path.join(data_dir, f"{img_id}")
        if os.path.exists(img_path):
            row['image_path'] = img_path
            return True
            
        # Пробуем найти файл с этим ID без полного совпадения
        for file in os.listdir(data_dir):
            if file.startswith(str(img_id)):
                img_path = os.path.join(data_dir, file)
                row['image_path'] = img_path
                print(f"Найден файл по частичному совпадению: {img_path}")
                return True
        
        # Выводим предупреждение только для первых 10 ненайденных файлов
        if row.name < 10:
            print(f"Не найдено изображение для id: {img_id}")
        elif row.name == 10:
            print("... и для других ID (не выводится для экономии места)")
        return False
    
    # Добавляем столбец для хранения пути к изображению
    test_df['image_path'] = None
    
    # Применяем фильтрацию и сохраняем найденные пути к изображениям
    valid_rows = test_df.apply(lambda row: check_image_exists(row, test_data_dir), axis=1)
    
    # Выводим количество найденных путей
    valid_count = valid_rows.sum()
    print(f"Найдено {valid_count} файлов изображений из {len(test_df)}")
    
    if valid_count == 0:
        print("Не удалось найти ни одного файла изображения! Проверьте пути к данным.")
        # Вместо фильтрации, сохраним все строки и заполним пути заглушками для отладки
        for idx in range(len(test_df)):
            test_df.at[idx, 'image_path'] = os.path.join(test_data_dir, f"dummy_{idx}.jpg")
        print("Добавлены заглушки вместо реальных файлов для отладки.")
    else:
        test_df = test_df[valid_rows].copy()  # Используем .copy() для избежания предупреждений
    
    print(f"Размер тестового датасета после фильтрации: {len(test_df)}")
    
    # Трансформации для тестового датасета
    mean = [0.485, 0.456, 0.406]
    std = [0.229, 0.224, 0.225]
    
    test_transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean, std)
    ])
    
    # Создаем датасет
    test_dataset = ProductDataset(test_df, test_data_dir, transform=test_transform, is_test=True)
    
    # Создаем DataLoader
    g = torch.Generator()
    g.manual_seed(seed)
    
    num_workers = min(4, os.cpu_count() or 4)
    
    test_loader = DataLoader(
        test_dataset, 
        batch_size=64,
        shuffle=False, 
        num_workers=num_workers,
        pin_memory=True,
        worker_init_fn=lambda worker_id: random.seed(seed + worker_id),
        generator=g
    )
    
    # Делаем предсказания
    predictions_df = predict(model, test_loader, seed=seed, use_transliteration=True)
    
    return predictions_df

In [None]:
def set_seed(seed=42):
    """
    Set all random seeds for reproducibility.
    
    Args:
        seed (int): Seed value to use for all random number generators
    """
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)  # For multi-GPU setups
    
    # Make operations deterministic
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    
    # Set environment variable for Python hash seed
    os.environ['PYTHONHASHSEED'] = str(seed)
    
    print(f"All random seeds set to {seed} for reproducibility")

In [None]:
def main():
    # Set random seeds for reproducibility
    set_seed(42)
    
    # Устанавливаем все необходимые настройки для максимальной производительности
    torch.backends.cuda.matmul.allow_tf32 = True
    torch.backends.cudnn.allow_tf32 = True
    
    print("Запуск предсказания на тестовом датасете...")
    
    # Загружаем тестовый датасет
    test_df = pd.read_csv(TEST_CSV)
    print(f"Исходный размер тестового датасета: {len(test_df)}")
    
    # Путь к модели
    model_path = "/kaggle/input/macro_/pytorch/default/1/macro_weights.pth"
    
    # Делаем предсказания
    predictions_df = predict_test_dataset(test_df, model_path)
    
    # Сохраняем результаты
    predictions_df.to_csv('submission.csv', index=False)
    print("Готово! Результаты сохранены в submission.csv")

In [None]:
if __name__ == '__main__':
    main()