In [10]:
import os
import cv2
import numpy as np
import pandas as pd
from pathlib import Path
from PIL import Image
import matplotlib.pyplot as plt

In [11]:
def apply_fast_filter(img, pattern):
    """Применяет 2x2 фильтр к изображению."""
    # Создаём новое изображение с теми же размерами
    filtered_img = np.zeros_like(img)

    # Каждому пикселю в блоке 2x2 оставляем только нужный канал
    # Верхний левый
    filtered_img[0::2, 0::2, pattern[0]] = img[0::2, 0::2, pattern[0]]
    # Верхний правый
    filtered_img[0::2, 1::2, pattern[1]] = img[0::2, 1::2, pattern[1]]
    # Нижний левый
    filtered_img[1::2, 0::2, pattern[2]] = img[1::2, 0::2, pattern[2]]
    # Нижний правый
    filtered_img[1::2, 1::2, pattern[3]] = img[1::2, 1::2, pattern[3]]

    return filtered_img


In [12]:
patterns = [
    [0,1,2,0],
    [0,2,1,0],
    [1,0,0,2],
    [2,0,0,1],
    [1,0,2,1],
    [1,2,0,1],
    [0,1,1,2],
    [2,1,1,0],
    [2,0,1,2],
    [2,1,0,2],
    [0,2,2,1],
    [1,2,2,0],
]


In [13]:
import numpy as np
from PIL import Image

def apply_fast_filter(img_np, pattern):
    """Применяет 2x2 фильтр к изображению numpy."""
    filtered_img = np.zeros_like(img_np)
    filtered_img[0::2, 0::2, pattern[0]] = img_np[0::2, 0::2, pattern[0]]
    filtered_img[0::2, 1::2, pattern[1]] = img_np[0::2, 1::2, pattern[1]]
    filtered_img[1::2, 0::2, pattern[2]] = img_np[1::2, 0::2, pattern[2]]
    filtered_img[1::2, 1::2, pattern[3]] = img_np[1::2, 1::2, pattern[3]]
    return filtered_img

def apply_filter_func(pil_img, filter_id):
    pattern = patterns[filter_id]
    img_np = np.array(pil_img)  # PIL -> numpy (H, W, 3)
    filtered_np = apply_fast_filter(img_np, pattern)
    # Можно вернуть PIL для совместимости с transform
    return Image.fromarray(filtered_np)


In [14]:
import os
from PIL import Image
import torch
from torch.utils.data import Dataset
from torchvision import transforms

class FilteredRestoreDataset(Dataset):
    def __init__(self, original_dir, filter_num=12, transform=None, apply_filter_func=None):
        self.original_dir = original_dir
        self.transform = transform
        self.filter_num = filter_num
        self.apply_filter_func = apply_filter_func
        
        self.img_names = sorted(os.listdir(original_dir))
        
    def __len__(self):
        return len(self.img_names) * self.filter_num
    
    def __getitem__(self, idx):
        # idx - индекс по всем фильтрованным версиям всех картинок
        img_idx = idx // self.filter_num  # индекс исходной картинки
        filter_id = idx % self.filter_num  # какой фильтр применяем
        
        img_name = self.img_names[img_idx]
        img_path = os.path.join(self.original_dir, img_name)
        img = Image.open(img_path).convert('RGB')
        
        # Применяем фильтр
        filtered_img = self.apply_filter_func(img, filter_id)
        
        if self.transform:
            filtered_img = self.transform(filtered_img)
            original_img = self.transform(img)
        else:
            # По умолчанию - преобразуем в тензор
            to_tensor = transforms.ToTensor()
            filtered_img = to_tensor(filtered_img)
            original_img = to_tensor(img)
        
        return filtered_img, original_img, filter_id


In [15]:
from torchvision import transforms
from torch.utils.data import DataLoader, Subset
from sklearn.model_selection import train_test_split

transform = transforms.Compose([
    transforms.Resize((128, 128)),
    transforms.ToTensor(),
])

dataset = FilteredRestoreDataset(
    '/kaggle/input/tst-day-1/train/train/real_images', 
    filter_num=12, 
    transform=transform, 
    apply_filter_func=apply_filter_func
)

# Получаем индексы оригинальных изображений
all_img_idxs = list(range(len(dataset.img_names)))

# Делим на train/val (например, 80/20)
train_img_idxs, val_img_idxs = train_test_split(all_img_idxs, test_size=0.2, random_state=42, shuffle=True)

# Функция для получения индексов по фильтрам
def create_filtered_indices(img_idxs, filter_num):
    indices = []
    for img_idx in img_idxs:
        for filter_id in range(filter_num):
            idx = img_idx * filter_num + filter_id
            indices.append(idx)
    return indices

train_indices = create_filtered_indices(train_img_idxs, dataset.filter_num)
val_indices = create_filtered_indices(val_img_idxs, dataset.filter_num)

# Создаем Subset датасеты
train_dataset = Subset(dataset, train_indices)
val_dataset = Subset(dataset, val_indices)

# Создаем загрузчики
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)


In [16]:
import torch
import torch.nn as nn
import torch.nn.functional as F

# SE-блок (Squeeze-and-Excitation)
class SEBlock(nn.Module):
    def __init__(self, channels, reduction=16):
        super().__init__()
        self.fc = nn.Sequential(
            nn.AdaptiveAvgPool2d(1),
            nn.Conv2d(channels, channels // reduction, 1),
            nn.ReLU(inplace=True),
            nn.Conv2d(channels // reduction, channels, 1),
            nn.Sigmoid()
        )
    def forward(self, x):
        scale = self.fc(x)
        return x * scale

# Простой Residual блок с SE
class ResidualBlock(nn.Module):
    def __init__(self, in_ch, out_ch):
        super().__init__()
        self.conv1 = nn.Conv2d(in_ch, out_ch, 3, padding=1)
        self.bn1 = nn.BatchNorm2d(out_ch)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_ch, out_ch, 3, padding=1)
        self.bn2 = nn.BatchNorm2d(out_ch)
        self.se = SEBlock(out_ch)
        self.skip = nn.Conv2d(in_ch, out_ch, 1) if in_ch != out_ch else nn.Identity()

    def forward(self, x):
        identity = self.skip(x)
        out = self.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out = self.se(out)
        out += identity
        out = self.relu(out)
        return out

# Мощный U-Net без ResNet
class CustomUNet(nn.Module):
    def __init__(self, in_channels=3, out_channels=1, filter_size=12):
        super().__init__()

        # Этапы Энкодера
        self.encoder1 = ResidualBlock(in_channels, 64)  # (B, 64, H/2, W/2)
        self.encoder2 = ResidualBlock(64, 128)  # (B, 128, H/4, W/4)
        self.encoder3 = ResidualBlock(128, 256)  # (B, 256, H/8, W/8)
        self.encoder4 = ResidualBlock(256, 512)  # (B, 512, H/16, W/16)

        # Линейный слой для преобразования one-hot представления фильтра
        self.filter_fc = nn.Linear(filter_size, 256)  # Фильтр преобразуется в 256-мерный вектор

        # Этапы Декодера
        self.up4 = nn.ConvTranspose2d(512, 256, 2, stride=2)
        self.dec4 = ResidualBlock(256 + 256, 256)

        self.up3 = nn.ConvTranspose2d(256, 128, 2, stride=2)
        self.dec3 = ResidualBlock(128 + 128, 128)

        self.up2 = nn.ConvTranspose2d(128, 64, 2, stride=2)
        self.dec2 = ResidualBlock(64 + 64, 64)

        self.up1 = nn.ConvTranspose2d(64, 64, 2, stride=2)
        self.dec1 = ResidualBlock(64 + 64, 64)

        # Финальная свертка для сегментации
        self.final_conv = nn.Conv2d(64, out_channels, kernel_size=1)

    def forward(self, x, filter_one_hot):
        # Преобразуем one-hot фильтр в вектор признаков
        filter_features = self.filter_fc(filter_one_hot)  # (B, 256)
        
        # Этапы энкодера
        x1 = self.encoder1(x)  # (B, 64, H/2, W/2)
        x2 = self.encoder2(x1)  # (B, 128, H/4, W/4)
        x3 = self.encoder3(x2)  # (B, 256, H/8, W/8)
        x4 = self.encoder4(x3)  # (B, 512, H/16, W/16)
        
        # Этапы декодера
        d4 = self.up4(x4)  # (B, 256, H/8, W/8)
        filter_d4 = filter_features.unsqueeze(-1).unsqueeze(-1).expand(-1, -1, d4.size(2), d4.size(3))  # Расширяем filter_features
        d4 = torch.cat([d4, x3, filter_d4], dim=1)  # (B, 256+256, H/8, W/8)
        d4 = self.dec4(d4)
    
        d3 = self.up3(d4)  # (B, 128, H/4, W/4)
        filter_d3 = filter_features.unsqueeze(-1).unsqueeze(-1).expand(-1, -1, d3.size(2), d3.size(3))  # Расширяем filter_features
        d3 = torch.cat([d3, x2, filter_d3], dim=1)  # (B, 128+128, H/4, W/4)
        d3 = self.dec3(d3)
    
        d2 = self.up2(d3)  # (B, 64, H/2, W/2)
        filter_d2 = filter_features.unsqueeze(-1).unsqueeze(-1).expand(-1, -1, d2.size(2), d2.size(3))  # Расширяем filter_features
        d2 = torch.cat([d2, x1, filter_d2], dim=1)
        d2 = self.dec2(d2)
    
        d1 = self.up1(d2)  # (B, 64, H, W)
        filter_d1 = filter_features.unsqueeze(-1).unsqueeze(-1).expand(-1, -1, d1.size(2), d1.size(3))  # Расширяем filter_features
        d1 = torch.cat([d1, x, filter_d1], dim=1)
        d1 = self.dec1(d1)
    
        out = self.final_conv(d1)  # (B, out_channels, H, W)
        return torch.sigmoid(out)  # Используем сигмоиду для получения вероятности для сегментации


In [17]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchmetrics.image.ssim import StructuralSimilarityIndexMeasure

class CombinedLossNoPretrained(nn.Module):
    def __init__(self, device, alpha=0.7, beta=0.3):
        super().__init__()
        self.alpha = alpha  # вес MSE
        self.beta = beta    # вес SSIM
        self.device = device
        self.ssim = StructuralSimilarityIndexMeasure(data_range=1.0).to(device)  # Инициализация метрики SSIM
        
    def forward(self, pred, target):
        # Вычисляем MSE
        mse_loss = F.mse_loss(pred, target)
        
        # Вычисляем SSIM
        ssim_loss = 1 - self.ssim(pred, target)  # SSIM хочет минимизировать 1 - SSIM
        
        # Комбинированный лосс
        total_loss = self.alpha * mse_loss + self.beta * ssim_loss
        return total_loss


In [18]:
import torch.optim as optim
import torch
from torch.utils.data import DataLoader
import torch.nn as nn

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Инициализация модели (например, для 3-канальных изображений на входе и 1 канала на выходе)
model = CustomUNet(in_channels=3, out_channels=1).to(device)

criterion = CombinedLossNoPretrained(device)  # Простой MSE лосс для начала
optimizer = optim.Adam(model.parameters(), lr=1e-3)

num_epochs = 20
num_filters = 12  # Кол-во фильтров

for epoch in range(num_epochs):
    # --- Обучение ---
    model.train()
    running_loss = 0.0
    
    for filtered_imgs, original_imgs, filter_ids in train_loader:
        filtered_imgs = filtered_imgs.to(device)
        original_imgs = original_imgs.to(device)
        filter_ids = filter_ids.to(device)

        filter_one_hot = torch.zeros(filtered_imgs.size(0), num_filters).to(device)
        filter_one_hot.scatter_(1, filter_ids.view(-1, 1), 1)

        optimizer.zero_grad()
        outputs = model(filtered_imgs, filter_one_hot)
        loss = criterion(outputs, original_imgs)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item() * filtered_imgs.size(0)

    epoch_train_loss = running_loss / len(train_loader.dataset)

    # --- Валидация ---
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for filtered_imgs, original_imgs, filter_ids in val_loader:
            filtered_imgs = filtered_imgs.to(device)
            original_imgs = original_imgs.to(device)
            filter_ids = filter_ids.to(device)

            filter_one_hot = torch.zeros(filtered_imgs.size(0), num_filters).to(device)
            filter_one_hot.scatter_(1, filter_ids.view(-1, 1), 1)

            outputs = model(filtered_imgs, filter_one_hot)
            loss = criterion(outputs, original_imgs)
            val_loss += loss.item() * filtered_imgs.size(0)

    epoch_val_loss = val_loss / len(val_loader.dataset)

    print(f"Epoch [{epoch+1}/{num_epochs}], Train Loss: {epoch_train_loss:.4f}, Val Loss: {epoch_val_loss:.4f}")


OutOfMemoryError: CUDA out of memory. Tried to allocate 2.00 GiB. GPU 0 has a total capacity of 14.74 GiB of which 1.39 GiB is free. Process 4062 has 13.34 GiB memory in use. Of the allocated memory 9.44 GiB is allocated by PyTorch, and 3.77 GiB is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True to avoid fragmentation.  See documentation for Memory Management  (https://pytorch.org/docs/stable/notes/cuda.html#environment-variables)

In [None]:
def detect_filter(filtered_img_np, patterns):
    """
    Определяет, какой фильтр был применён, сравнивая отфильтрованное изображение с результатами
    применения каждого фильтра (паттерна) к тому же изображению.
    
    :param filtered_img_np: отфильтрованное изображение (numpy array)
    :param patterns: список паттернов фильтров (список numpy array паттернов)
    
    :return: индекс фильтра, который применен к изображению (или -1, если фильтр не найден)
    """
    for i, pattern in enumerate(patterns):
        # Применяем паттерн к самому себе (фильтрация на исходном изображении)
        filtered_test = apply_fast_filter(filtered_img_np, pattern)
        
        # Сравниваем результат фильтрации с самим собой
        if np.array_equal(filtered_test, filtered_img_np):
            return i  # Индекс найденного фильтра
    
    return -1  # Если фильтр не найден


In [None]:
def inference(model, filtered_tensor, filter_one_hot, device):
    """
    Выполняет инференс модели для отфильтрованного изображения и возвращает восстановленное изображение.

    :param model: модель нейронной сети (например, ResNet34UNetWithFilter)
    :param filtered_tensor: отфильтрованное изображение в виде тензора (torch.Tensor), размер (C,H,W)
    :param filter_one_hot: one-hot вектор фильтра (torch.Tensor), размер (num_filters,)
    :param device: устройство (cuda или cpu)

    :return: восстановленное изображение (PIL Image)
    """
    model.eval()

    filtered_tensor = filtered_tensor.to(device)
    filter_one_hot = torch.tensor(filter_one_hot).to(device)

    with torch.no_grad():
        output = model(filtered_tensor.unsqueeze(0), filter_one_hot.unsqueeze(0))

    output_img_np = output.squeeze(0).cpu().numpy().transpose(1, 2, 0)  # (H, W, C)
    output_img_np = (output_img_np * 255).clip(0, 255).astype(np.uint8)

    output_img = Image.fromarray(output_img_np)

    return output_img


In [None]:
import os
from PIL import Image
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

class TestDataset(Dataset):
    def __init__(self, test_dir, transform=None):
        self.test_dir = test_dir
        self.img_names = sorted(os.listdir(test_dir))
        self.transform = transform
    
    def __len__(self):
        return len(self.img_names)
    
    def __getitem__(self, idx):
        img_name = self.img_names[idx]
        img_path = os.path.join(self.test_dir, img_name)
        img = Image.open(img_path).convert('RGB')
        if self.transform:
            img = self.transform(img)
        return img, img_name


In [None]:
test_transform = transforms.Compose([
    transforms.Resize((128,128)),
    transforms.ToTensor(),   # обязательно преобразуем в тензор
])



test_dataset = TestDataset('/kaggle/input/tst-day-1/test/test/filtered_images', transform=test_transform)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)


In [None]:
import pandas as pd
import numpy as np
import os

# Загружаем sample submission
sample = pd.read_csv('/kaggle/input/tst-day-1/sample_submission.csv')

# Создадим словарь для быстрых вставок
restored_np_dict = {}

model.eval()
for filtered_img, img_name in test_loader:
    filtered_img = filtered_img[0]  # batch=1, берём PIL
    img_id = os.path.splitext(img_name[0])[0]

    restored_img, _ = inference(model, filtered_img, patterns, device)

    restored_np = np.array(restored_img).reshape(-1)
    restored_np_dict[img_id] = restored_np

# Заполняем sample_submission
for idx, row in sample.iterrows():
    img_id = row['id']
    if str(img_id) in restored_np_dict:
        sample.loc[idx, sample.columns[1:]] = restored_np_dict[img_id]
    else:
        print(f"Warning: {img_id} not found in predictions!")

# Сохраняем итоговый submission
print("Submission saved to submission.csv")


In [None]:
import pandas as pd


num_pixels = 128 * 128  # Размер блока для первых и последних пикселей

# Преобразуем данные в массивы пикселей
for idx, row in sample.iterrows():
    # Берем все пиксели после первого столбца
    sample_pixels = row[1:].values  # Пиксели, начиная с 1-го столбца
    
    # Разбиваем на два блока по 128*128 пикселей
    first_part = sample_pixels[:num_pixels]
    last_part = sample_pixels[-num_pixels:]
    
    # Поменяли местами
    sample_pixels[:num_pixels] = last_part
    sample_pixels[-num_pixels:] = first_part
    
    # Заполняем в sample
    sample.loc[idx, sample.columns[1:]] = sample_pixels

# Сохраняем итоговый submission
sample.to_csv('submission.csv', index=False)
print("Submission with swapped pixels saved to submission_swapped.csv")


Whoever is watching it, know that I am screwed up(((