Program to use BMSHJ for compressing the Images

In [1]:
#importing
import os
import math
import torch
from torchvision import transforms
from torchvision.utils import save_image
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
from pytorch_msssim import ms_ssim #metric for the image
import pandas as pd

import pillow_avif #AVIF encoder plugin for the Pillow installed using, pip install pillow-avif-plugin

import lpips #already installed with pip install lpips 
#used for the lpips metric
loss_fn_alex=lpips.LPIPS(net="alex")
loss_fn_vgg=lpips.LPIPS(net="vgg") 

#another image reconstruction quality metric
from skimage.metrics import structural_similarity as ssim 
#PSNR metric for image reconstruction
from skimage.metrics import peak_signal_noise_ratio as psnr

from compressai.zoo import bmshj2018_factorized #VAE model for image compression
from ipywidgets import interact, widgets 


Setting up [LPIPS] perceptual loss: trunk [alex], v[0.1], spatial [off]




Loading model from: C:\Swapnil\Narrowband_DRONE\Image_compression_code\Method_7_compress_ai\compressAIenv\Lib\site-packages\lpips\weights\v0.1\alex.pth
Setting up [LPIPS] perceptual loss: trunk [vgg], v[0.1], spatial [off]




Loading model from: C:\Swapnil\Narrowband_DRONE\Image_compression_code\Method_7_compress_ai\compressAIenv\Lib\site-packages\lpips\weights\v0.1\vgg.pth


In [2]:
#setting the CUDA device
device="cuda" if torch.cuda.is_available() else "cpu"   #cpu and cuda in small letters important
print("Using:",device)

Using: cuda


In [23]:
#Loading the pretrained model 
quality=6
metric="ms-ssim" #MSE
model_name="cheng2020-anchor"#"bmshj2018_factorized"
net=bmshj2018_factorized(quality=quality,metric=metric,pretrained=True).eval().to(device)
print(f"Parameters: {sum(p.numel() for p in net.parameters())}")

Parameters: 7030531


In [4]:
#Metrics for the image reconstruction 
    
    
def crop_image(img_org_path,img_recon_path):
    """
        top left X and Y
        bottom right X and Y is the crop box
    """

    img_recon=Image.open(img_recon_path)
    img_org=Image.open(img_org_path)

    output_width, output_height =img_recon.size
    original_width,original_height=img_org.size

    #checking the dimensions of the images
    if original_width==output_width and \
    original_height==output_height:
        return None
    else:
        crop_box = (0, 0, output_width, original_height)
        cropped_image = img_recon.crop(crop_box)
        #print(cropped_image.size)
        return cropped_image

def calculate_ssim(img_org,img_recon):
    """
    img_org and img_recon are the path of the 
    original image and the reconstructed image
    """
    if crop_image(img_org,img_recon) is None:
        img_recon=np.asarray(Image.open(img_recon))
        img_org=np.asarray(Image.open(img_org))
    else:
        img_recon=np.asarray(crop_image(img_org,img_recon))
        img_org=np.asarray(Image.open(img_org))
        
        

    ssim_value=ssim(img_org,img_recon,win_size=3) #for 3 channel
    #print("SSIM:",ssim_value)
    return ssim_value

def calculate_psnr(img_org,img_recon):

    """
    img_org and img_recon are the path of the 
    original image and the reconstructed image
    """
    
    if crop_image(img_org,img_recon) is None:
        img_recon=np.asarray(Image.open(img_recon))
        img_org=np.asarray(Image.open(img_org))
    else:
        img_recon=np.asarray(crop_image(img_org,img_recon))
        img_org=np.asarray(Image.open(img_org))

    psnr_value=psnr(img_org,img_recon)
    #print("PSNR:",psnr_value)
    return psnr_value


def calculate_lpips(img_org,img_recon):
    """
    img_org and img_recon are the path of the 
    original image and the reconstructed image
    """
    if crop_image(img_org,img_recon) is None:
        img_recon=np.asarray(Image.open(img_recon))
        img_org=np.asarray(Image.open(img_org))
    else:
        img_recon=np.asarray(crop_image(img_org,img_recon))
        img_org=np.asarray(Image.open(img_org))
    
    transform=transforms.ToTensor() #convert the image to Tensor 
    #print(img_org.shape,img_recon.shape)
    img_org=transform(img_org)
    img_recon=transform(img_recon)
    
    lpips=loss_fn_alex.forward(img_org,img_recon)
    lpips=lpips.detach().numpy() #to cpu then 
    #print("LPIPS:",lpips) 
    return lpips[0][0][0][0]
    

def compression_ratio(img_1_path,img_2_path):
    """
        Assuming the first image is uncompressed version
    """
    print(img_1_path)
    try:
        size_1=os.path.getsize(img_1_path) #getting the size of the image on the disc
    except:
        print("img_1 path not valid")

    try:
        size_2=os.path.getsize(img_2_path) #getting the size of the image on the disc
    except:
        print("img_2 path not valid")

    ratio=size_1/size_2
    #print("Compression ratio is:",ratio)
    return ratio

In [5]:
#calculate the metrics for the batch
def batch_metrics(org_path,recon_path):
    """
    Input: Path of the original dataset
    Output: Dataframe of reconstruction quality
            metrics and compression ratio for each
            image in dataset
    """
    
    images,path=file_handling(org_path)
    images_r,path_r=file_handling(recon_path)
        

    #initialize the metrics
    ssim=[]
    psnr=[]
    lpips=[]
    cr=[]
    org_size=[]
    recon_size=[]

    for img_o,img_r in zip(images,images_r):

        print("Doing for:",img_o)

        img_o=os.path.join(path,img_o)
        img_r=os.path.join(path_r,img_r)
        
        ratio=compression_ratio(img_o,img_r)
        cr.append(round(ratio,2))
        
        lpips_=calculate_lpips(img_o,img_r)
        lpips.append(lpips_)
    
    
        ssim_=calculate_ssim(img_o,img_r)
        ssim.append(ssim_)

        
        psnr_=calculate_psnr(img_o,img_r)
        psnr.append(psnr_)

        #sizes are in KB
        org_size.append(os.path.getsize(os.path.join(path,img_o))/1000)
        recon_size.append(os.path.getsize(os.path.join(path_r,img_r))/1000)
        

    metrics={"SSIM":ssim,
             "PSNR":psnr,
             "LPIPS":lpips,
             "CR":cr,
            "ORG_SIZE":org_size,
            "RECON_SIZE":recon_size}

    #make a dataframe from the metrics for exporting
    df=pd.DataFrame(metrics)
    return df

def average_batch_metrics(df):
    """
    Gives the average of each of the metric 
    for the whole batch
    """
    cols=df.columns
    avg_metric={}

    #number of images
    avg_metric["Dataset_size"]=len(df[cols[0]])
    for metric in cols:
        metric_array=np.array(df[metric]) #convert the each column of df to np array
        avg_metric[metric]=round(np.average(metric_array),2)

    print("average_metrics:",avg_metric)
    return avg_metric
    
    
        
    

    
        

In [6]:
#path to the images dataset
#path=r"C:\Swapnil\Narrowband_DRONE\Drone_artificial_dataset_experiment\images"

def file_handling(path):
    """
    path to the images dataset
    """
    #org_dir=os.getcwd()

    #changing to the dataset dir
    try:
        os.chdir(path)
    except:
        print("Path not valid")
        return

    files=os.listdir(path) #files in the directory
    images=[] #getting only the jpg and png images
    for file in files:
        if file.lower().endswith(".jpeg") or file.lower().endswith(".jpg") \
        or file.lower().endswith(".png") or file.lower().endswith(".webp") \
        or file.lower().endswith(".avif"):
            images.append(file)

    print("Images in dataset:",len(images))
    if len(images)!=0:
        print("First filename:",images[0])
    else:
        print("No images in the path or different format")
    
    #go to the original directory back
    #os.chdir(org_dir)
    
    return images, path

    



In [83]:
#testing only
dataset_path=path
export_path=os.path.join(dataset_path,"Data_webp")
df=batch_metrics(dataset_path,export_path) #calculate the metrics of the batch
avg_metrics=average_batch_metrics(df)

Images in dataset: 100
First filename: 0300.jpg
Images in dataset: 6
First filename: 0300.webp
Doing for: 0300.jpg
C:\Swapnil\Narrowband_DRONE\Drone_artificial_dataset_experiment\images\0300.jpg
Doing for: 0301.jpg
C:\Swapnil\Narrowband_DRONE\Drone_artificial_dataset_experiment\images\0301.jpg
Doing for: 0302.jpg
C:\Swapnil\Narrowband_DRONE\Drone_artificial_dataset_experiment\images\0302.jpg
Doing for: 0303.jpg
C:\Swapnil\Narrowband_DRONE\Drone_artificial_dataset_experiment\images\0303.jpg
Doing for: 0304.jpg
C:\Swapnil\Narrowband_DRONE\Drone_artificial_dataset_experiment\images\0304.jpg
Doing for: 0305.jpg
C:\Swapnil\Narrowband_DRONE\Drone_artificial_dataset_experiment\images\0305.jpg
average_metrics: {'Dataset_size': 6, 'SSIM': 0.93, 'PSNR': 28.48, 'LPIPS': 0.12, 'CR': 3.32, 'ORG_SIZE': 887.08, 'RECON_SIZE': 268.21}


In [7]:
def compress_dataset(model,images,dataset_path,img_type='RGB',export_format='webp'):
    '''
    img_type argument expects 'RGB' for color image and 'L' for grayscale
    USES save_image method which does not have controls
    '''
    img_details={} #dictionary to store the details of the images

    if export_format=='webp':
        extension='.webp'

    #make the export directory
    try:
        directory_name="Data_"+export_format
        os.mkdir(directory_name)
    except:
        print("Directory already exists")
    print(dataset_path)
    export_path=os.path.join(dataset_path,directory_name)
    
    for img in images:
        print("Doing for:",img)
        img_path=os.path.join(dataset_path,img)
        #print("img_path:",img_path)
        #open the img
        image=Image.open(img_path).convert(img_type) #opening the image as RGB
        x=transforms.ToTensor()(image).unsqueeze(0).to(device) #convert to PyTorch tensor and send to device
        
        with torch.no_grad():
            out_net=net.forward(x)
        out_net['x_hat'].clamp(0,1)
        obj=out_net['x_hat']

        #save the output of the network as image
        #saving in the tensor form takes lot of space that is why
        #directly exporting the tensor output as image (94 MB vs 1MB)
        #print(img)
        img=img.split('.')[0] + extension #getting the image name and changing the extension
        img=os.path.join(export_path,img)
        save_image(obj,img)
    return export_path

    
    #df=batch_metrics(dataset_path,export_path) #calculate the metrics of the batch
    #avg_metrics=average_batch_metrics(df)

#path=r"C:\Swapnil\Narrowband_DRONE\Drone_atsugi_dataset_exp"

In [15]:
def resize_FHD(image,path):
    image_path=os.path.join(path,image)
    print("path:",image_path)
    output_path=os.path.join(path,image.split('.')[0]+"_r.png")
    print("output:",output_path)
    try:
        with Image.open(image_path) as img:
          # Resize the image to maintain aspect ratio
            print("Image opened")
            img.thumbnail((1920, 1080), Image.LANCZOS)
            img.save(output_path)
        print(f"Image resized to Full HD and saved to: {output_path}")
    except OSError as e:
        print(f"Error opening or saving image: {e}")

In [16]:
path=r'C:\Swapnil\Narrowband_DRONE\Drone_dataset_small_1\exp'
images,path=file_handling(path)
print("Images:",images)
for image in images:
    resize_FHD(image,path)


Images in dataset: 3
First filename: 000000_left_r.png
Images: ['000000_left_r.png', '000020_left_r.png', '000074_left_r.png']
path: C:\Swapnil\Narrowband_DRONE\Drone_dataset_small_1\exp\000000_left_r.png
output: C:\Swapnil\Narrowband_DRONE\Drone_dataset_small_1\exp\000000_left_r_r.png
Image opened
Image resized to Full HD and saved to: C:\Swapnil\Narrowband_DRONE\Drone_dataset_small_1\exp\000000_left_r_r.png
path: C:\Swapnil\Narrowband_DRONE\Drone_dataset_small_1\exp\000020_left_r.png
output: C:\Swapnil\Narrowband_DRONE\Drone_dataset_small_1\exp\000020_left_r_r.png
Image opened
Image resized to Full HD and saved to: C:\Swapnil\Narrowband_DRONE\Drone_dataset_small_1\exp\000020_left_r_r.png
path: C:\Swapnil\Narrowband_DRONE\Drone_dataset_small_1\exp\000074_left_r.png
output: C:\Swapnil\Narrowband_DRONE\Drone_dataset_small_1\exp\000074_left_r_r.png
Image opened
Image resized to Full HD and saved to: C:\Swapnil\Narrowband_DRONE\Drone_dataset_small_1\exp\000074_left_r_r.png


In [24]:
path=r'C:\Swapnil\Narrowband_DRONE\Drone_dataset_small_1\exp'
import time
images,path=file_handling(path)
start=time.time()
compress_dataset(net,images,path)
end=time.time()
print("Average time per image:",round(end-start,2)/len(images))


Images in dataset: 3
First filename: 000000_left.png
Directory already exists
C:\Swapnil\Narrowband_DRONE\Drone_dataset_small_1\exp
Doing for: 000000_left.png
Doing for: 000020_left.png
Doing for: 000074_left.png
Average time per image: 10.31


In [26]:
#Stage 2 compression 
#OverWriting the stage 1 compression results from stage 2
def compress_stage_2(img_org_path,img_recon_path,export_format="avif",quality=5):
    """ 
    Input: Path to already compressed images from method save_image
    Output: OverWrite the existing images
    Use Pillow here to export the images

    export_format="avif","webp"
    quality=0 to 100
    
    """
    
    img_details={} #dictionary to store the details of the images

    if export_format=='webp':
        extension='.webp'
    else:
        extension='.avif'

    #make the export directory
    #try:
    #    directory_name="Data_"+export_format
    #    os.mkdir(directory_name)
    #except:
    #    print("Directory already exists")
    #print(dataset_path)
    
    #export_path=os.path.join(dataset_path,directory_name)

    images,path=file_handling(img_recon_path)
    
    for img in images:
        print("Doing for:",img)
        img_path=os.path.join(path,img)
        image_name=img.split('.')[0] + "_c"+ extension #getting the image name and changing the extension
        with Image.open(img_path) as image:
            image.save(image_name,format=export_format.upper(),quality=quality)
        #remove previous file
        os.remove(img)
img_recon_path=r"C:\Swapnil\Narrowband_DRONE\Drone_dataset_small_1\exp\Data_webp"
org_path=path
compress_stage_2(org_path,img_recon_path,export_format="webp")
        

    

    

Images in dataset: 3
First filename: 000000_left.webp
Doing for: 000000_left.webp
Doing for: 000020_left.webp
Doing for: 000074_left.webp


In [27]:
#calculate the metrics with respect to the original and the second stage image compression 
dataset_path=r'C:\Swapnil\Narrowband_DRONE\Drone_dataset_small_1\exp' #org_path
export_path=img_recon_path
df=batch_metrics(dataset_path,export_path) #calculate the metrics of the batch
avg_metrics=average_batch_metrics(df)

Images in dataset: 3
First filename: 000000_left.png
Images in dataset: 3
First filename: 000000_left_c.webp
Doing for: 000000_left.png
C:\Swapnil\Narrowband_DRONE\Drone_dataset_small_1\exp\000000_left.png
Doing for: 000020_left.png
C:\Swapnil\Narrowband_DRONE\Drone_dataset_small_1\exp\000020_left.png
Doing for: 000074_left.png
C:\Swapnil\Narrowband_DRONE\Drone_dataset_small_1\exp\000074_left.png
average_metrics: {'Dataset_size': 3, 'SSIM': 0.87, 'PSNR': 29.06, 'LPIPS': 0.12, 'CR': 56.46, 'ORG_SIZE': 10233.49, 'RECON_SIZE': 189.97}
