In [1]:
!pip install -q kaggle

In [2]:
from google.colab import files
files.upload()


Saving kaggle.json to kaggle.json


{'kaggle.json': b'{"username":"aroojzahra","key":"a872123416faa088384afea314a1ee79"}'}

In [4]:
# create a kaggle directory

!mkdir ~/.kaggle


mkdir: cannot create directory ‘/root/.kaggle’: File exists


In [5]:
# copy the kaggle.json to created the folder
!cp kaggle.json ~/.kaggle/

In [6]:
# permission for the json to act
!chmod 600 ~/.kaggle/kaggle.json

In [7]:
# to list all the datasets in kaggle
!kaggle datasets list

ref                                                       title                                           size  lastUpdated          downloadCount  voteCount  usabilityRating  
--------------------------------------------------------  ---------------------------------------------  -----  -------------------  -------------  ---------  ---------------  
haseebindata/student-performance-predictions              Student Performance Predictions                  9KB  2024-08-17 06:57:57           9623        209  0.9411765        
lainguyn123/student-performance-factors                   Student Performance Factors                     94KB  2024-09-02 10:53:57           2946         69  1.0              
hanaksoy/customer-purchasing-behaviors                    Customer Purchasing Behaviors                    1KB  2024-09-01 22:18:07           1876         25  1.0              
muhammadehsan02/olympic-summer-games-paris-2024           Olympic Summer Games - Paris 2024                3MB  202

In [8]:
!kaggle datasets download -d mohit3430/haze1k


Dataset URL: https://www.kaggle.com/datasets/mohit3430/haze1k
License(s): unknown
Downloading haze1k.zip to /content
100% 951M/953M [00:55<00:00, 21.1MB/s]
100% 953M/953M [00:55<00:00, 17.9MB/s]


In [9]:
!unzip haze1k.zip -d ./haze1k


Archive:  haze1k.zip
  inflating: ./haze1k/Distributed_haze1k/test_moderate/input/001.png  
  inflating: ./haze1k/Distributed_haze1k/test_moderate/input/002.png  
  inflating: ./haze1k/Distributed_haze1k/test_moderate/input/003.png  
  inflating: ./haze1k/Distributed_haze1k/test_moderate/input/004.png  
  inflating: ./haze1k/Distributed_haze1k/test_moderate/input/005.png  
  inflating: ./haze1k/Distributed_haze1k/test_moderate/input/006.png  
  inflating: ./haze1k/Distributed_haze1k/test_moderate/input/007.png  
  inflating: ./haze1k/Distributed_haze1k/test_moderate/input/008.png  
  inflating: ./haze1k/Distributed_haze1k/test_moderate/input/009.png  
  inflating: ./haze1k/Distributed_haze1k/test_moderate/input/010.png  
  inflating: ./haze1k/Distributed_haze1k/test_moderate/input/011.png  
  inflating: ./haze1k/Distributed_haze1k/test_moderate/input/012.png  
  inflating: ./haze1k/Distributed_haze1k/test_moderate/input/013.png  
  inflating: ./haze1k/Distributed_haze1k/test_moderate/i

In [10]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import torchvision.transforms as transforms
from torchvision.datasets import ImageFolder
from torchvision.utils import save_image
import os


In [11]:
class UNetGenerator(nn.Module):
    def __init__(self):
        super(UNetGenerator, self).__init__()

        def down_block(in_channels, out_channels, kernel_size=4, stride=2, padding=1, batch_norm=True):
            layers = [nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding, bias=False)]
            if batch_norm:
                layers.append(nn.BatchNorm2d(out_channels))
            layers.append(nn.ReLU(inplace=True))
            return nn.Sequential(*layers)

        def up_block(in_channels, out_channels, kernel_size=4, stride=2, padding=1, dropout=False):
            layers = [nn.ConvTranspose2d(in_channels, out_channels, kernel_size, stride, padding, bias=False)]
            layers.append(nn.BatchNorm2d(out_channels))
            if dropout:
                layers.append(nn.Dropout(0.5))
            layers.append(nn.ReLU(inplace=True))
            return nn.Sequential(*layers)

        self.down1 = down_block(3, 64, batch_norm=False)
        self.down2 = down_block(64, 128)
        self.down3 = down_block(128, 256)
        self.down4 = down_block(256, 512)
        self.down5 = down_block(512, 512)
        self.down6 = down_block(512, 512)
        self.down7 = down_block(512, 512)
        self.down8 = down_block(512, 512, batch_norm=False)

        self.up1 = up_block(512, 512, dropout=True)
        self.up2 = up_block(1024, 512, dropout=True)
        self.up3 = up_block(1024, 512, dropout=True)
        self.up4 = up_block(1024, 512)
        self.up5 = up_block(1024, 256)
        self.up6 = up_block(512, 128)
        self.up7 = up_block(256, 64)
        self.up8 = nn.ConvTranspose2d(128, 3, kernel_size=4, stride=2, padding=1)

    def forward(self, x):
        # Encoder (downsampling)
        d1 = self.down1(x)
        d2 = self.down2(d1)
        d3 = self.down3(d2)
        d4 = self.down4(d3)
        d5 = self.down5(d4)
        d6 = self.down6(d5)
        d7 = self.down7(d6)
        d8 = self.down8(d7)

        # Decoder (upsampling)
        u1 = self.up1(d8)
        u2 = self.up2(torch.cat([u1, d7], 1))
        u3 = self.up3(torch.cat([u2, d6], 1))
        u4 = self.up4(torch.cat([u3, d5], 1))
        u5 = self.up5(torch.cat([u4, d4], 1))
        u6 = self.up6(torch.cat([u5, d3], 1))
        u7 = self.up7(torch.cat([u6, d2], 1))
        u8 = self.up8(torch.cat([u7, d1], 1))

        return torch.tanh(u8)


In [12]:
class PatchDiscriminator(nn.Module):
    def __init__(self):
        super(PatchDiscriminator, self).__init__()

        def conv_block(in_channels, out_channels, kernel_size=4, stride=2, padding=1, batch_norm=True):
            layers = [nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding, bias=False)]
            if batch_norm:
                layers.append(nn.BatchNorm2d(out_channels))
            layers.append(nn.LeakyReLU(0.2, inplace=True))
            return nn.Sequential(*layers)

        self.model = nn.Sequential(
            conv_block(6, 64, batch_norm=False),
            conv_block(64, 128),
            conv_block(128, 256),
            conv_block(256, 512),
            nn.Conv2d(512, 1, kernel_size=4, stride=1, padding=1)
        )

    def forward(self, x):
        return self.model(x)


In [13]:
class HazeDataset(Dataset):
    def __init__(self, hazy_dir, clear_dir, transform=None):
        self.hazy_images = sorted(os.listdir(hazy_dir))
        self.clear_images = sorted(os.listdir(clear_dir))
        self.hazy_dir = hazy_dir
        self.clear_dir = clear_dir
        self.transform = transform

    def __len__(self):
        return len(self.hazy_images)

    def __getitem__(self, idx):
        hazy_img_path = os.path.join(self.hazy_dir, self.hazy_images[idx])
        clear_img_path = os.path.join(self.clear_dir, self.clear_images[idx])

        hazy_image = transforms.ToTensor()(Image.open(hazy_img_path))
        clear_image = transforms.ToTensor()(Image.open(clear_img_path))

        if self.transform:
            hazy_image = self.transform(hazy_image)
            clear_image = self.transform(clear_image)

        return hazy_image, clear_image


In [16]:
# # Hyperparameters
# lr = 0.0002
# batch_size = 4
# epochs = 30

lr = 0.0005
batch_size = 8
epochs = 200


# Directories
hazy_dir = "/content/haze1k/Distributed_haze1k/train/input"
clear_dir = "/content/haze1k/Distributed_haze1k/train/target"

# Data loader
dataset = HazeDataset(hazy_dir, clear_dir, transform=transforms.Resize((256, 256)))
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)


In [17]:
# Initialize models
# generator = UNetGenerator().cuda()
# discriminator = PatchDiscriminator().cuda()

generator = UNetGenerator()
discriminator = PatchDiscriminator()

# Optimizers
optimizer_G = optim.Adam(generator.parameters(), lr=lr, betas=(0.5, 0.999))
optimizer_D = optim.Adam(discriminator.parameters(), lr=lr, betas=(0.5, 0.999))

# Losses
adversarial_loss = nn.BCEWithLogitsLoss()
pixelwise_loss = nn.L1Loss()

# Labels
real_label = 1.
fake_label = 0.


In [20]:
from PIL import Image

In [None]:
for epoch in range(epochs):
    for i, (hazy_imgs, clear_imgs) in enumerate(dataloader):
        hazy_imgs = hazy_imgs
        clear_imgs = clear_imgs

        # Train Discriminator
        optimizer_D.zero_grad()

        real_output = discriminator(torch.cat([hazy_imgs, clear_imgs], 1))
        real_loss = adversarial_loss(real_output, torch.full_like(real_output, real_label))

        fake_imgs = generator(hazy_imgs)
        fake_output = discriminator(torch.cat([hazy_imgs, fake_imgs.detach()], 1))
        fake_loss = adversarial_loss(fake_output, torch.full_like(fake_output, fake_label))

        d_loss = (real_loss + fake_loss) / 2
        d_loss.backward()
        optimizer_D.step()

        # Train Generator
        optimizer_G.zero_grad()

        fake_output = discriminator(torch.cat([hazy_imgs, fake_imgs], 1))
        g_loss = adversarial_loss(fake_output, torch.full_like(fake_output, real_label))
        pixel_loss = pixelwise_loss(fake_imgs, clear_imgs)

        g_total_loss = g_loss + pixel_loss * 100  # Weighted pixel-wise loss
        g_total_loss.backward()
        optimizer_G.step()

        print(f"[Epoch {epoch}/{epochs}] [Batch {i}/{len(dataloader)}] [D loss: {d_loss.item()}] [G loss: {g_total_loss.item()}]")

    # Save generated images every epoch
    if epoch % 10 == 0:
        save_image(fake_imgs, f"generated_epoch_{epoch}.png", normalize=True)


In [None]:
# Save the trained models
torch.save(generator.state_dict(), 'unet_generator.pth')
torch.save(discriminator.state_dict(), 'patch_discriminator.pth')


In [None]:
# Initialize models
generator = UNetGenerator()
discriminator = PatchDiscriminator()

# Load saved model weights
generator.load_state_dict(torch.load('unet_generator.pth'))
discriminator.load_state_dict(torch.load('patch_discriminator.pth'))

# Set models to evaluation mode
generator.eval()
discriminator.eval()


In [None]:
from PIL import Image
import torchvision.transforms as transforms

def load_image(image_path):
    transform = transforms.Compose([
        transforms.Resize((256, 256)),  # Resize to match input size
        transforms.ToTensor(),  # Convert to tensor
    ])
    image = Image.open(image_path).convert('RGB')
    return transform(image).unsqueeze(0)  # Add batch dimension and move to GPU

image = load_image('/content/haze1.jpg')


In [None]:
with torch.no_grad():
    output = generator(image)

# Post-process and save the output image
from torchvision.utils import save_image
save_image(output, 'output_image.png', normalize=True)


In [None]:
from PIL import Image
import matplotlib.pyplot as plt

def view_image(image_path):
    image = Image.open(image_path)
    plt.imshow(image)
    plt.axis('off')
    plt.show()

view_image('output_image.png')
