# Import

In [10]:
!pip install opencv-python



In [6]:
import random
import numpy as np
import os

import torch
import torch_directml
from PIL import Image
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.optim as optim
import cv2

import zipfile

In [7]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# device = torch_directml.device()
print("Using device:", device)

Using device: cpu


# Hyperparameter Setting

In [8]:
CFG = {
    'EPOCHS':10,
    'LEARNING_RATE':3e-4,
    'BATCH_SIZE':16,
    'SEED':42
}

# Fixed RandomSeed

In [9]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(CFG['SEED']) # Seed 고정

# CustomDataset

In [10]:
# #저장된 이미지 쌍을 동시에 로드 

# class CustomDataset(Dataset):
#     def __init__(self, damage_dir, origin_dir, transform=None):
#         self.damage_dir = damage_dir
#         self.origin_dir = origin_dir
#         self.transform = transform
#         self.damage_files = sorted(os.listdir(damage_dir))
#         self.origin_files = sorted(os.listdir(origin_dir))

#         assert len(self.damage_files) == len(self.origin_files), "The number of images in gray and color folders must match"

#     def __len__(self):
#         return len(self.damage_files)

#     def __getitem__(self, idx):
#         damage_img_name = self.damage_files[idx]
#         origin_img_name = self.origin_files[idx]

#         damage_img_path = os.path.join(self.damage_dir, damage_img_name)
#         origin_img_path = os.path.join(self.origin_dir, origin_img_name)

#         damage_img = Image.open(damage_img_path).convert("RGB")
#         origin_img = Image.open(origin_img_path).convert("RGB")

#         if self.transform:
#             damage_img = self.transform(damage_img)
#             origin_img = self.transform(origin_img)

#         return {'A': damage_img, 'B': origin_img}



# Data Load

In [11]:
# # 경로 설정
# origin_dir = './open/train_gt'  # 원본 이미지 폴더 경로
# damage_dir = './open/train_input'  # 손상된 이미지 폴더 경로
# test_dir = './open/test_input'     # test 이미지 폴더 경로

# # 데이터 전처리 설정
# transform = transforms.Compose([
#     transforms.Resize((256, 256)),
#     transforms.ToTensor(),
#     transforms.Normalize([0.5], [0.5])
# ])

# # 데이터셋 및 DataLoader 생성
# dataset = CustomDataset(damage_dir=damage_dir, origin_dir=origin_dir, transform=transform)
# dataloader = DataLoader(dataset, batch_size=CFG['BATCH_SIZE'], shuffle=True, num_workers=1)

# Model Define

In [12]:
# U-Net 기반의 Generator
class UNetGenerator(nn.Module):
    def __init__(self, in_channels=3, out_channels=3):
        super(UNetGenerator, self).__init__()

        def down_block(in_feat, out_feat, normalize=True):
            layers = [nn.Conv2d(in_feat, out_feat, kernel_size=4, stride=2, padding=1)]
            if normalize:
                layers.append(nn.BatchNorm2d(out_feat))
            layers.append(nn.LeakyReLU(0.2, inplace=True))
            return nn.Sequential(*layers)

        def up_block(in_feat, out_feat, dropout=0.0):
            layers = [nn.ConvTranspose2d(in_feat, out_feat, kernel_size=4, stride=2, padding=1),
                      nn.BatchNorm2d(out_feat),
                      nn.ReLU(inplace=True)] 
            if dropout:
                layers.append(nn.Dropout(dropout))
            return nn.Sequential(*layers)

        self.down1 = down_block(in_channels, 64, normalize=False)
        self.down2 = down_block(64, 128)
        self.down3 = down_block(128, 256)
        self.down4 = down_block(256, 512)
        self.down5 = down_block(512, 512)
        self.down6 = down_block(512, 512)
        self.down7 = down_block(512, 512)
        self.down8 = down_block(512, 512, normalize=False)

        self.up1 = up_block(512, 512, dropout=0.5)
        self.up2 = up_block(1024, 512, dropout=0.5)
        self.up3 = up_block(1024, 512, dropout=0.5)
        self.up4 = up_block(1024, 512)
        self.up5 = up_block(1024, 256)
        self.up6 = up_block(512, 128)
        self.up7 = up_block(256, 64)
        self.up8 = nn.Sequential(
            nn.ConvTranspose2d(128, out_channels, kernel_size=4, stride=2, padding=1),
            nn.Tanh()
        )

    def forward(self, x):
        d1 = self.down1(x)
        d2 = self.down2(d1)
        d3 = self.down3(d2)
        d4 = self.down4(d3)
        d5 = self.down5(d4)
        d6 = self.down6(d5)
        d7 = self.down7(d6)
        d8 = self.down8(d7)

        u1 = self.up1(d8)
        u2 = self.up2(torch.cat([u1, d7], 1))
        u3 = self.up3(torch.cat([u2, d6], 1))
        u4 = self.up4(torch.cat([u3, d5], 1))
        u5 = self.up5(torch.cat([u4, d4], 1))
        u6 = self.up6(torch.cat([u5, d3], 1))
        u7 = self.up7(torch.cat([u6, d2], 1))
        u8 = self.up8(torch.cat([u7, d1], 1))

        return u8

# PatchGAN 기반의 Discriminator
class PatchGANDiscriminator(nn.Module):
    def __init__(self, in_channels=3):
        super(PatchGANDiscriminator, self).__init__()

        def discriminator_block(in_filters, out_filters, normalization=True):
            layers = [nn.Conv2d(in_filters, out_filters, kernel_size=4, stride=2, padding=1)]
            if normalization:
                layers.append(nn.BatchNorm2d(out_filters))
            layers.append(nn.LeakyReLU(0.2, inplace=True))
            return nn.Sequential(*layers)

        self.model = nn.Sequential(
            discriminator_block(in_channels * 2, 64, normalization=False),
            discriminator_block(64, 128),
            discriminator_block(128, 256),
            discriminator_block(256, 512),
            nn.Conv2d(512, 1, kernel_size=4, padding=1)
        )

    def forward(self, img_A, img_B):
        img_input = torch.cat((img_A, img_B), 1)
        return self.model(img_input)

# 가중치 초기화 함수
def weights_init_normal(m):
    classname = m.__class__.__name__
    if classname.find('Conv') != -1:
        torch.nn.init.normal_(m.weight.data, 0.0, 0.02)
    elif classname.find('BatchNorm2d') != -1:
        torch.nn.init.normal_(m.weight.data, 1.0, 0.02)
        torch.nn.init.constant_(m.bias.data, 0.0)

# Train

In [13]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms
from torch.utils.data import DataLoader, Dataset
from PIL import Image
import os

# Custom dataset for loading grayscale masked images and original color images
class MaskedImageDataset(Dataset):
    def __init__(self, masked_dir, original_dir, transform=None):
        self.masked_dir = masked_dir
        self.original_dir = original_dir
        self.masked_images = os.listdir(masked_dir)
        self.original_images = os.listdir(original_dir)
        self.transform = transform

    def __len__(self):
        return len(self.masked_images)

    def __getitem__(self, idx):
        masked_image_path = os.path.join(self.masked_dir, self.masked_images[idx])
        original_image_path = os.path.join(self.original_dir, self.original_images[idx])

        masked_image = Image.open(masked_image_path).convert('L')  # Load as grayscale
        original_image = Image.open(original_image_path).convert('RGB')  # Load as color

        if self.transform:
            masked_image = self.transform(masked_image)
            original_image = self.transform(original_image)

        return masked_image, original_image

# Define the LatentPaint module (as described in the LatentPaint paper)
class LatentPaint(nn.Module):
    def __init__(self, latent_dim):
        super(LatentPaint, self).__init__()
        self.latent_dim = latent_dim
        # Explicit Propagation parameters
        self.gamma = nn.MaxPool2d(kernel_size=2)
        self.phi = nn.Sequential(
            nn.Linear(latent_dim, latent_dim),
            nn.ReLU(),
            nn.Linear(latent_dim, latent_dim)
        )
        
    def forward(self, h_infr, h_cond, mask):
        # Step 1: Downsample mask to match latent space dimensions
        D_m = F.interpolate(mask, size=h_infr.shape[2:], mode='bilinear', align_corners=False)
        
        # Step 2: Latent Space Conditioning
        h_star = h_infr * (1 - D_m) + h_cond * D_m
        
        # Step 3: Explicit Propagation
        gamma_mask = self.gamma(D_m)
        phi_output = self.phi(torch.cat([gamma_mask.flatten(1), h_cond.flatten(1)], dim=-1))
        h_hat = phi_output.view_as(h_star)
        
        return h_hat

# Define the U-Net with LatentPaint for inpainting tasks
class UNetWithLatentPaint(nn.Module):
    def __init__(self, unet_model, latent_dim):
        super(UNetWithLatentPaint, self).__init__()
        self.unet = unet_model
        self.latent_paint1 = LatentPaint(latent_dim)
        self.latent_paint2 = LatentPaint(latent_dim)

    def forward(self, x, mask):
        # Forward pass through the U-Net model
        h_infr_1 = self.unet.encoder(x)
        
        # Apply LatentPaint after first encoder block
        h_cond_1 = self.unet.encoder(mask)  # Conditioned on the masked image
        h_painted_1 = self.latent_paint1(h_infr_1, h_cond_1, mask)
        
        # Continue through the U-Net model
        h_infr_2 = self.unet.middle_block(h_painted_1)
        
        # Apply LatentPaint after middle block
        h_cond_2 = self.unet.middle_block(mask)
        h_painted_2 = self.latent_paint2(h_infr_2, h_cond_2, mask)
        
        # Final output through the decoder
        output = self.unet.decoder(h_painted_2)
        
        return output

# Example training loop for the inpainting task using LatentPaint

def train_model(model, dataloader, criterion, optimizer, num_epochs=25):
    for epoch in range(num_epochs):
        running_loss = 0.0
        
        for inputs, targets in dataloader:
            inputs = inputs.to(device)  # Masked grayscale images (input)
            targets = targets.to(device)  # Original color images (target)

            optimizer.zero_grad()

            outputs = model(inputs)  # Forward pass through the model
            
            loss = criterion(outputs, targets)  # Compute loss between output and target
            
            loss.backward()  # Backpropagation
            optimizer.step()  # Update weights
            
            running_loss += loss.item()

        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(dataloader)}')

    print("Training complete.")

# Hyperparameters and setup
latent_dim = 256  # Example latent dimension size
unet_model = UNetGenerator()  # Load or define your U-Net model here

# Prepare dataset and dataloader
transform = transforms.Compose([
    transforms.Resize((512, 512)),
    transforms.ToTensor()
])

masked_dir = './open/train_input'
original_dir = './open/train_gt'

dataset = MaskedImageDataset(masked_dir=masked_dir, original_dir=original_dir, transform=transform)
dataloader = DataLoader(dataset, batch_size=4, shuffle=True)

# Initialize model and optimizer
model_with_latentpaint = UNetWithLatentPaint(unet_model=unet_model, latent_dim=latent_dim).to(device)
criterion = nn.MSELoss()  # Mean Squared Error loss for pixel-wise comparison between predicted and target images.
optimizer = optim.Adam(model_with_latentpaint.parameters(), lr=0.001)

In [14]:
# Train the model
train_model(model_with_latentpaint, dataloader=dataloader, criterion=criterion, optimizer=optimizer)

TypeError: UNetWithLatentPaint.forward() missing 1 required positional argument: 'mask'

In [None]:
# 모델 가중치 저장
torch.save(model_with_latentpaint.state_dict(), "best_model.pth")

NameError: name 'model_with_latentpaint' is not defined

In [None]:
# 모델 가중치 불러오기
model_with_latentpaint.load_state_dict(torch.load("best_model.pth"))

# Inference

In [None]:
# 모델의 가중치를 불러오는 함수
def load_model(model, checkpoint_path):
    checkpoint = torch.load(checkpoint_path)
    model.load_state_dict(checkpoint['model_state_dict'])
    return model

# 이미지를 로드 및 전처리하는 함수
def load_image(image_path, transform):
    image = Image.open(image_path).convert("RGB")
    image = transform(image)
    image = image.unsqueeze(0)  # 배치 차원을 추가합니다.
    return image
model_save_dir = './'

# 모델 경로 설정
generator_path = os.path.join(model_save_dir, "best_model.pth")

# 모델 로드 및 설정 (여기서는 UNetWithLatentPaint 모델을 사용한다고 가정)
# model = UNetWithLatentPaint(unet_model, latent_dim).to(device)  # unet_model, latent_dim은 미리 정의 필요
# model = load_model(model, generator_path)
model = model_with_latentpaint.to(device)
model.eval()

# 테스트 데이터 디렉토리 설정
test_dir = "./open/test_input"  # 테스트 데이터 디렉토리 경로
submission_dir = "./open/submission"
os.makedirs(submission_dir, exist_ok=True)

# 파일 리스트 불러오기
test_images = sorted(os.listdir(test_dir))

# 모든 테스트 이미지에 대해 추론 수행
for image_name in test_images:
    test_image_path = os.path.join(test_dir, image_name)

    # 손상된 테스트 이미지 로드 및 전처리
    test_image = load_image(test_image_path, transform).to(device)

    with torch.no_grad():
        # 모델로 예측
        pred_image = model(test_image, mask=test_image)  # 여기서 mask는 이미지와 동일하게 설정 (흑백 마스킹된 이미지가 있다고 가정)
        pred_image = pred_image.cpu().squeeze(0)  # 배치 차원 제거
        pred_image = pred_image * 0.5 + 0.5  # 역정규화
        pred_image = pred_image.numpy().transpose(1, 2, 0)  # HWC로 변경
        pred_image = (pred_image * 255).astype('uint8')  # 0-255 범위로 변환
        
        # 예측된 이미지를 실제 이미지와 같은 512x512로 리사이즈
        pred_image_resized = cv2.resize(pred_image, (512, 512), interpolation=cv2.INTER_LINEAR)

    # 결과 이미지 저장
    output_path = os.path.join(submission_dir, image_name)
    cv2.imwrite(output_path, cv2.cvtColor(pred_image_resized, cv2.COLOR_RGB2BGR))    

print(f"Saved all images")

In [None]:
# 저장할 디렉토리 설정
submission_dir = "./open/submission"
os.makedirs(submission_dir, exist_ok=True)

# 이미지 로드 및 전처리
def load_image(image_path):
    image = Image.open(image_path).convert("RGB")
    image = transform(image)
    image = image.unsqueeze(0)  # 배치 차원을 추가합니다.
    return image

# 모델 경로 설정
generator_path = os.path.join(model_save_dir, "best_generator.pth")

# 모델 로드 및 설정
model = UNetGenerator().to(device)
model.load_state_dict(torch.load(generator_path))
model.eval()

# 파일 리스트 불러오기
test_images = sorted(os.listdir(test_dir))

# 모든 테스트 이미지에 대해 추론 수행
for image_name in test_images:
    test_image_path = os.path.join(test_dir, image_name)

    # 손상된 테스트 이미지 로드 및 전처리
    test_image = load_image(test_image_path).to(device)

    with torch.no_grad():
        # 모델로 예측
        pred_image = model(test_image)
        pred_image = pred_image.cpu().squeeze(0)  # 배치 차원 제거
        pred_image = pred_image * 0.5 + 0.5  # 역정규화
        pred_image = pred_image.numpy().transpose(1, 2, 0)  # HWC로 변경
        pred_image = (pred_image * 255).astype('uint8')  # 0-255 범위로 변환
        
        # 예측된 이미지를 실제 이미지와 같은 512x512로 리사이즈
        pred_image_resized = cv2.resize(pred_image, (512, 512), interpolation=cv2.INTER_LINEAR)

    # 결과 이미지 저장
    output_path = os.path.join(submission_dir, image_name)
    cv2.imwrite(output_path, cv2.cvtColor(pred_image_resized, cv2.COLOR_RGB2BGR))    
    
print(f"Saved all images")

Saved all images


# Submission

In [20]:
# 저장된 결과 이미지를 ZIP 파일로 압축
zip_filename = "submission.zip"
with zipfile.ZipFile(zip_filename, 'w') as submission_zip:
    for image_name in test_images:
        image_path = os.path.join(submission_dir, image_name)
        submission_zip.write(image_path, arcname=image_name)

print(f"All images saved in {zip_filename}")

All images saved in submission.zip
