In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from torchvision.utils import make_grid
import torchvision.utils as vutils
import os
import matplotlib.animation as animation
from IPython.display import HTML
import tifffile as tiff

import cv2
from skimage.metrics import structural_similarity as ssim

import matplotlib.pyplot as plt
from PIL import Image, ImageFilter
import numpy as np
import pandas as pd
import copy
import time
import cv2 as cv
from tqdm import tqdm_notebook as tqdm
import matplotlib.image as mpimg

import torchvision.transforms.functional as TF
from math import log10, sqrt

device = 'cuda'

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# custom weights initialization called on netG and netD
def weights_init(m):
    classname = m.__class__.__name__
    if classname.find('Conv') != -1:
        nn.init.normal_(m.weight.data, 0.0, 0.02)
    elif classname.find('BatchNorm') != -1:
        nn.init.normal_(m.weight.data, 1.0, 0.02)
        nn.init.constant_(m.bias.data, 0)

In [None]:
# Encoder Model
class Encoder(nn.Module):
    def __init__(self,num_channels_in_encoder):
        super(Encoder, self).__init__()

        # ENCODER

        # 64x64x64
        self.e_conv_1 = nn.Sequential(
            nn.ZeroPad2d((1, 2, 1, 2)),
            nn.Conv2d(in_channels=3, out_channels=64, kernel_size=(5, 5), stride=(2, 2)),nn.LeakyReLU()
        )

        # 128x32x32
        self.e_conv_2 = nn.Sequential(
            nn.ZeroPad2d((1, 2, 1, 2)),
            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=(5, 5), stride=(2, 2)),
            nn.LeakyReLU()
        )

        # 128x32x32
        self.e_block_1 = nn.Sequential(
            nn.ZeroPad2d((1, 1, 1, 1)),
            nn.Conv2d(in_channels=128, out_channels=128, kernel_size=(3, 3), stride=(1, 1)),
            nn.LeakyReLU(),

            nn.ZeroPad2d((1, 1, 1, 1)),
            nn.Conv2d(in_channels=128, out_channels=128, kernel_size=(3, 3), stride=(1, 1)),
        )

        # 128x32x32
        self.e_block_2 = nn.Sequential(
            nn.ZeroPad2d((1, 1, 1, 1)),
            nn.Conv2d(in_channels=128, out_channels=128, kernel_size=(3, 3), stride=(1, 1)),
            nn.LeakyReLU(),
            nn.ZeroPad2d((1, 1, 1, 1)),
            nn.Conv2d(in_channels=128, out_channels=128, kernel_size=(3, 3), stride=(1, 1)),
        )

        # 128x32x32
        self.e_block_3 = nn.Sequential(
            nn.ZeroPad2d((1, 1, 1, 1)),
            nn.Conv2d(in_channels=128, out_channels=128, kernel_size=(3, 3), stride=(1, 1)),
            nn.LeakyReLU(),

            nn.ZeroPad2d((1, 1, 1, 1)),
            nn.Conv2d(in_channels=128, out_channels=128, kernel_size=(3, 3), stride=(1, 1)),
        )

        # 32x32x32
        self.e_conv_3 = nn.Sequential(
            nn.Conv2d(in_channels=128, out_channels=num_channels_in_encoder, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2)),
            nn.Tanh()
        )
    def forward(self, x):
        ec1 = self.e_conv_1(x)
        ec2 = self.e_conv_2(ec1)
        eblock1 = self.e_block_1(ec2) + ec2
        eblock2 = self.e_block_2(eblock1) + eblock1
        eblock3 = self.e_block_3(eblock2) + eblock2
        ec3 = self.e_conv_3(eblock3)  # in [-1, 1] from tanh activation
        return ec3

In [None]:
# Generator / Decoder Model

class Generator(nn.Module):
    def __init__(self,num_channels_in_encoder):
        super(Generator, self).__init__()

        # 128x64x64
        self.d_up_conv_1 = nn.Sequential(
        nn.Conv2d(in_channels=num_channels_in_encoder, out_channels=64, kernel_size=(3, 3), stride=(1, 1)),
            nn.LeakyReLU(),

            nn.ZeroPad2d((1, 1, 1, 1)),
            nn.ConvTranspose2d(in_channels=64, out_channels=128, kernel_size=(2, 2), stride=(2, 2))
        )

        # 128x64x64
        self.d_block_1 = nn.Sequential(
            nn.ZeroPad2d((1, 1, 1, 1)),
            nn.Conv2d(in_channels=128, out_channels=128, kernel_size=(3, 3), stride=(1, 1)),
            nn.LeakyReLU(),

            nn.ZeroPad2d((1, 1, 1, 1)),
            nn.Conv2d(in_channels=128, out_channels=128, kernel_size=(3, 3), stride=(1, 1)),
        )

        # 128x64x64
        self.d_block_2 = nn.Sequential(
            nn.ZeroPad2d((1, 1, 1, 1)),
            nn.Conv2d(in_channels=128, out_channels=128, kernel_size=(3, 3), stride=(1, 1)),
            nn.LeakyReLU(),

            nn.ZeroPad2d((1, 1, 1, 1)),
            nn.Conv2d(in_channels=128, out_channels=128, kernel_size=(3, 3), stride=(1, 1)),
        )

        # 128x64x64
        self.d_block_3 = nn.Sequential(
            nn.ZeroPad2d((1, 1, 1, 1)),
            nn.Conv2d(in_channels=128, out_channels=128, kernel_size=(3, 3), stride=(1, 1)),
            nn.LeakyReLU(),

            nn.ZeroPad2d((1, 1, 1, 1)),
            nn.Conv2d(in_channels=128, out_channels=128, kernel_size=(3, 3), stride=(1, 1)),
        )

        # 256x128x128
        self.d_up_conv_2 = nn.Sequential(
            nn.Conv2d(in_channels=128, out_channels=32, kernel_size=(3, 3), stride=(1, 1)),
            nn.LeakyReLU(),

            nn.ZeroPad2d((1, 1, 1, 1)),
            nn.ConvTranspose2d(in_channels=32, out_channels=128, kernel_size=(2, 2), stride=(2, 2))
        )

        # 3x128x128
        self.d_up_conv_3 = nn.Sequential(
            nn.Conv2d(in_channels=128, out_channels=16, kernel_size=(3, 3), stride=(1, 1)),
            nn.LeakyReLU(),

            nn.ReflectionPad2d((3, 3, 3, 3)),
            nn.Conv2d(in_channels=16, out_channels=3, kernel_size=(3, 3), stride=(1, 1)),
            nn.Tanh()
        )



    def forward(self, x):
        uc1 = self.d_up_conv_1(x)
        dblock1 = self.d_block_1(uc1) + uc1
        dblock2 = self.d_block_2(dblock1) + dblock1
        dblock3 = self.d_block_3(dblock2) + dblock2
        uc2 = self.d_up_conv_2(dblock3)
        dec = self.d_up_conv_3(uc2)
        return dec

In [None]:
def image_read(path):
  image = Image.open(path)
  if image.mode in ("RGBA", "LA") or (image.mode == "P" and "transparency" in image.info) or image.mode == "L":
      # Convert the image to RGB format
      image = image.convert("RGB")
  return image

In [None]:
def transform_image(image):
  width, height = (image.width, image.height) if (image.width <= 178 and image.height <= 218) else (178, 218)
  trans = transforms.Compose([
    transforms.Resize((width, height)), # set the desired size of the image
    transforms.ToTensor()
  ])

  img_tensor=trans(image)
  img_tensor = (img_tensor-0.5) /0.5
  img_batch=(img_tensor).unsqueeze(0)
  img_batch=img_batch.to(device)
  return img_batch

In [None]:
def convert_to_single_channel(encoded_batch):
    # Assuming encoded_batch has shape (1, num_channels, H, W)
    single_channel_image = torch.mean(encoded_batch[0], dim=0, keepdim=True)  # Average over channels
    single_channel_image = single_channel_image.clamp(-1, 1)  # Ensure values are in range [-1, 1]

    # Convert to NumPy array
    numpy_array = single_channel_image[0].cpu().detach().numpy()

    # Normalize values to be in the range [0, 255]
    numpy_array = ((numpy_array + 1) / 2 * 255).astype(np.uint8)

    # Create a grayscale image
    grayscale_image = Image.fromarray(numpy_array, mode='L')

    return grayscale_image

In [None]:
def compress(img_batch, num_channels):
  num_channels_in_encoder = num_channels
  netE = Encoder(num_channels_in_encoder).to(device)
  netE.apply(weights_init)
  netG = Generator(num_channels_in_encoder).to(device)
  netG.apply(weights_init)

  netE.load_state_dict(torch.load("/content/drive/MyDrive/Trained Models/netE"+str(num_channels)+".model"))
  netG.load_state_dict(torch.load("/content/drive/MyDrive/Trained Models/netG"+str(num_channels)+".model"))

  netG.eval()
  netE.eval()

  encoded_batch = netE(img_batch)

  single_channel_image = convert_to_single_channel(encoded_batch)
  single_channel_image.save("encoded_"+str(num_channels)+'_image.jpg')

  compressed_batch = netG(encoded_batch)

  del netE
  del netG
  torch.cuda.empty_cache()

  return compressed_batch

In [None]:
def tensor_to_image(tensor):
    tensor = (tensor + 1) / 2
    tensor = tensor.cpu().detach()
    tensor = tensor*255
    tensor = np.array(tensor, dtype=np.uint8)
    if np.ndim(tensor)>3:
        assert tensor.shape[0] == 1
        tensor = tensor[0]
    return Image.fromarray(tensor.transpose(1,2,0))

In [None]:
def PSNR(original, compressed):
    mse = np.mean((original - compressed) ** 2)
    if(mse == 0):  # MSE is zero means no noise is present in the signal .
                  # Therefore PSNR have no importance.
        return 100
    max_pixel = 255.0
    psnr = 20 * log10(max_pixel / sqrt(mse))
    return psnr

In [None]:
def calculate_msssim(original_image, compressed_image, num_scales=5):
    msssim = 1.0
    weights = np.linspace(0.0448, 0.2856, num=num_scales)  # Default weights for 5 scales

    for i in range(num_scales):
        # Calculate SSIM for each scale
        ssim_val = ssim(original_image, compressed_image, channel_axis=True)

        # Calculate the power of SSIM value with the corresponding weight
        msssim *= (ssim_val ** weights[i])

        # Resize images for the next scale
        original_image = original_image[::2, ::2]  # Downsample by a factor of 2
        compressed_image = compressed_image[::2, ::2]  # Downsample by a factor of 2

    msssim = msssim ** (1 / num_scales)

    return msssim

In [None]:
def adjust_brightness(image_path, brightness_factor=1.2):
    # Load the image
    image = cv2.imread(image_path)

    # Adjust brightness
    adjusted_image = cv2.convertScaleAbs(image, alpha=brightness_factor, beta=0)

    # Save the adjusted image
    cv2.imwrite(image_path, adjusted_image)

In [None]:
def main():
  img_path = input("Enter the image path to compress: ")
  image = image_read(img_path)
  target= (image.width, image.height) if (image.width < 178 and image.height < 218) else (178, 218)

  dictionary = ['compressed_img_8', 'compressed_img_16', 'compressed_img_28', 'compressed_img_40']
  dummy_value = None
  my_dict = {key: dummy_value for key in dictionary}
  channels = [8, 16, 28, 40]

  im = image.resize(target)
  im.save('resized_original.tif')

  im.save('jpg.jpg', quality = 10)
  im.save('jpg_.jpg', quality = 80)

  img_batch = transform_image(im)
  print("JPEG: ")
  psnr = PSNR(cv2.imread('resized_original.tif'), cv2.imread('/content/jpg.jpg'))
  ssim = calculate_msssim(cv2.cvtColor(cv2.imread('resized_original.tif'), cv2.COLOR_BGR2GRAY), cv2.cvtColor(cv2.imread('/content/jpg.jpg'), cv2.COLOR_BGR2GRAY))
  print(f"PSNR value is {psnr} dB")
  print(f"MS-SSIM value is {ssim*100}%")
  print('----------------------------------------------------------')

  for key, num_channels in zip(dictionary, channels):
    key = compress(img_batch, num_channels)
    key = tensor_to_image(key)
    key = key.resize(target)
    # adjust_brightness('Compressed_img_'+str(num_channels)+'.jpg', brightness_factor=0.8)
    key.save('Compressed_img_'+str(num_channels)+'.jpg')

    print("GAN for "+str(num_channels)+" channels")
    psnr = PSNR(cv2.imread('resized_original.tif'), cv2.imread('Compressed_img_'+str(num_channels)+'.jpg'))
    ssim = calculate_msssim(cv2.cvtColor(cv2.imread('resized_original.tif'), cv2.COLOR_BGR2GRAY), cv2.cvtColor(cv2.imread('Compressed_img_'+str(num_channels)+'.jpg'), cv2.COLOR_BGR2GRAY))
    print(f"PSNR value is {psnr} dB")
    print(f"MS-SSIM value is {ssim*100}%")
    print('----------------------------------------------------------')

if __name__ == "__main__":
    main()

Enter the image path to compress: /content/drive/MyDrive/Images/animation/animation10.tif
JPEG: 
PSNR value is 29.99303324412451 dB
MS-SSIM value is 96.82574352224371%
----------------------------------------------------------
GAN for 8 channels
PSNR value is 28.727431475527844 dB
MS-SSIM value is 97.43022882830893%
----------------------------------------------------------
GAN for 16 channels
PSNR value is 29.916349576590143 dB
MS-SSIM value is 97.85064920228471%
----------------------------------------------------------
GAN for 28 channels
PSNR value is 30.707623261810852 dB
MS-SSIM value is 97.10240896334828%
----------------------------------------------------------
GAN for 40 channels
PSNR value is 30.3471620373532 dB
MS-SSIM value is 98.24094343096965%
----------------------------------------------------------
