In [1]:
import io
import time
import torch
import numpy as np
import PIL
from torchvision.transforms import ToPILImage, PILToTensor
from datasets import load_dataset, Image
from walloc import walloc
from compressai.zoo import cheng2020_anchor
from piq import LPIPS, DISTS, psnr, multi_scale_ssim

class Config: pass

In [2]:
codec = cheng2020_anchor(quality=6, metric='mse', pretrained=True)
codec.eval();

In [3]:
lpips_loss = LPIPS().to("cuda")
dists_loss = DISTS().to("cuda")



In [4]:
dataset = load_dataset("danjacobellis/LSDIR", split='validation')

Resolving data files:   0%|          | 0/195 [00:00<?, ?it/s]

Resolving data files:   0%|          | 0/195 [00:00<?, ?it/s]

In [7]:
def DGML_compress(sample):
    with torch.no_grad():
        img = sample['image']
        x = PILToTensor()(img).to(torch.float)
        x = (x/255).unsqueeze(0).to(device)
        H, W = x.size(2), x.size(3)
        x_padded = walloc.pad(x,p=64)
    
        t0 = time.time()
        Y = codec.compress(x_padded)
        encode_time = time.time() - t0
    
        t0 = time.time()
        x_hat = codec.decompress(Y['strings'],Y['shape'])['x_hat']
        x_hat = x_hat.clamp(0,1)
        decode_time = time.time() - t0
        
        x_hat = walloc.crop(x_hat, (H,W))
        rec = ToPILImage()(x_hat[0])
        buff2 = io.BytesIO()
        rec.save(buff2, format='png', lossless=True)
        rec_png_bytes = buff2.getbuffer()
    
        bpp = 8*sum([sum(len(ss) for ss in s) for s in Y['strings']]) / (H*W)
        PSNR = psnr(x,x_hat)
        MSSIM = multi_scale_ssim(x,x_hat)
        LPIPS_dB = -10*np.log10(lpips_loss(x.to("cuda"), x_hat.to("cuda")).item())
        DISTS_dB = -10*np.log10(dists_loss(x.to("cuda"), x_hat.to("cuda")).item())        
        
        return {
            'recovered': rec_png_bytes,
            'encode_time': encode_time,
            'decode_time': decode_time,
            'bpp': bpp,
            'PSNR': PSNR,
            'MSSIM': MSSIM,
            'LPIPS_dB': LPIPS_dB,
            'DISTS_dB': DISTS_dB,
        }

In [None]:
device = "cuda"
codec = codec.to(device)
DGML_ds = dataset.map(DGML_compress)
DGML_ds = DGML_ds.cast_column('recovered',Image())
DGML_ds = DGML_ds.cast_column('compressed',Image())

Map:   0%|          | 0/250 [00:00<?, ? examples/s]

  Y = codec.compress(x_padded)
  x_hat = codec.decompress(Y['strings'],Y['shape'])['x_hat']
  return F.conv2d(input, weight, bias, self.stride,


In [None]:
def DGML_compress_cpu(sample):
    with torch.no_grad():
        img = sample['image']
        x = PILToTensor()(img).to(torch.float)
        x = (x/255).unsqueeze(0).to(device)
        H, W = x.size(2), x.size(3)
        x_padded = walloc.pad(x,p=64)
    
        t0 = time.time()
        Y = codec.compress(x_padded)
        encode_time = time.time() - t0
    
        t0 = time.time()
        x_hat = codec.decompress(Y['strings'],Y['shape'])['x_hat']
        x_hat = x_hat.clamp(0,1)
        decode_time = time.time() - t0
        
    return {
        'cpu_encode_time': encode_time,
        'cpu_decode_time': decode_time,
    }

In [None]:
device = "cpu"
codec = codec.to(device)
DGML_cpu = dataset.map(DGML_compress_cpu)

In [None]:
DGML_ds = DGML_ds.add_column('cpu_encode_time',DGML_cpu['cpu_encode_time'])
DGML_ds = DGML_ds.add_column('cpu_decode_time',DGML_cpu['cpu_decode_time'])

In [None]:
DGML_ds.push_to_hub("danjacobellis/DGML_q6",split='validation')