<!-- Copyright (c) 2021-2024, InterDigital Communications, Inc
All rights reserved.

Redistribution and use in source and binary forms, with or without 
modification, are permitted (subject to the limitations in the disclaimer 
below) provided that the following conditions are met:

* Redistributions of source code must retain the above copyright notice, 
this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice, 
this list of conditions and the following disclaimer in the documentation 
and/or other materials provided with the distribution.
* Neither the name of InterDigital Communications, Inc nor the names of its 
contributors may be used to endorse or promote products derived from this 
software without specific prior written permission.

NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE GRANTED BY 
THIS LICENSE. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND 
CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT 
NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A 
PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER 
OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 
EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 
PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 
OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 
WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 
OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 
ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -->

# Knowledge Distillation Applied to LIC

In [14]:
import io
import math
import numpy as np

from PIL import Image

import torch
import torch.nn as nn
from torchvision import transforms
from torch.utils.data import DataLoader
from pytorch_msssim import ms_ssim

from compressai.zoo.image import (
    model_urls,
    load_state_dict_from_url,
    load_pretrained,
    _load_model,
    bmshj2018_hyperprior,
    mbt2018_mean,
    mbt2018
)

from fvcore.nn import FlopCountAnalysis
from zeus.monitor import ZeusMonitor
from zeus.device.cpu import get_current_cpu_index

import matplotlib.pyplot as plt
from ipywidgets import interact, widgets

from models import ScaleHyperprior, MeanScaleHyperprior, JointAutoregressiveHierarchicalPriors

In [3]:
def compute_psnr(a, b):
    mse = torch.mean((a - b)**2).item()
    return -10 * math.log10(mse)


def compute_msssim(a, b):
    return ms_ssim(a, b, data_range=1.).item()


def compute_bpp(out_net):
    size = out_net["x_hat"].size()
    num_pixels = size[0] * size[2] * size[3]
    return sum(torch.log(likelihoods).sum() / (-math.log(2) * num_pixels)
               for likelihoods in out_net["likelihoods"].values()).item()

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print(device)

## Load Models

In [5]:
N_teacher = 128
M = 192

In [6]:
url = model_urls["bmshj2018-hyperprior"]["mse"][5]
state_dict = load_state_dict_from_url(url, progress=False)
state_dict = load_pretrained(state_dict)
teacher = ScaleHyperprior.from_state_dict(state_dict).eval().to(device)

In [7]:
# Teacher Model
id = 258263 # 16
student_16 = ScaleHyperprior(16, M)
checkpoint = torch.load(f"train_res/{id}/checkpoint_best.pth.tar",
    weights_only=True, map_location=torch.device("cpu"))
student_16.load_state_dict(checkpoint["state_dict"])
student_16 = student_16.eval().to(device)

In [8]:
# Student Model
id = 258258 # 32
student_32 = ScaleHyperprior(32, M)
checkpoint = torch.load(f"train_res/{id}/checkpoint_best.pth.tar",
    weights_only=True, map_location=torch.device("cpu"))
student_32.load_state_dict(checkpoint["state_dict"])
student_32 = student_32.eval().to(device)

In [9]:
# Student Model with KD (no latent loss)
id = 258259 # 64
student_64 = ScaleHyperprior(64, M)
checkpoint = torch.load(f"train_res/{id}/checkpoint_best.pth.tar",
    weights_only=True, map_location=torch.device("cpu"))
student_64.load_state_dict(checkpoint["state_dict"])
student_64 = student_64.eval().to(device)

In [10]:
# Student Model with KD (latent loss)
id = 258262 # 96
student_96 = ScaleHyperprior(96, M)
checkpoint = torch.load(f"train_res/{id}/checkpoint_best.pth.tar",
    weights_only=True, map_location=torch.device("cpu"))
student_96.load_state_dict(checkpoint["state_dict"])
student_96 = student_96.eval().to(device)

In [11]:
models = {
    "teacher": {"model": teacher, "metrics": {"params": sum(p.numel() for p in teacher.parameters())}},
    "student_16": {"model": student_16, "metrics": {"params": sum(p.numel() for p in student_16.parameters())}},
    "student_32": {"model": student_32, "metrics": {"params": sum(p.numel() for p in student_32.parameters())}},
    "student_64": {"model": student_64, "metrics": {"params": sum(p.numel() for p in student_64.parameters())}},
    "student_96": {"model": student_96, "metrics": {"params": sum(p.numel() for p in student_96.parameters())}},
}

## Models Comparison

### Saint Malo

In [None]:
test_transforms = transforms.Compose(
    [transforms.RandomCrop((256,256)), transforms.ToTensor()]
)

img = Image.open("../data/assets/stmalo_fracape.png").convert("RGB")
x = test_transforms(img).unsqueeze(0).to(device)
print(x.shape)

In [None]:
# Flops computation
with torch.no_grad():
    for model in models.keys():
        flops = FlopCountAnalysis(models[model]["model"], x)
        models[model]["metrics"]["flops"] = flops.total()

In [None]:
%%script false --no-raise-error
# Energy consumption computation
current_cpu_index = get_current_cpu_index()
monitor = ZeusMonitor(cpu_indices=[current_cpu_index], gpu_indices=[])
with torch.no_grad():
    for model in models.keys():
        monitor.begin_window("step")
        output = models[model]["model"].forward(x)
        monitor.end_window("step")

In [13]:
# Inference
criterion = nn.MSELoss()
with torch.no_grad():
    for model in models.keys():
        output = models[model]["model"].forward(x)

        models[model]["metrics"]["mse"] = criterion(output["x_hat"], x).item()
        models[model]["metrics"]["psnr"] = compute_psnr(output["x_hat"], x)
        models[model]["metrics"]["ms_ssim"] = compute_msssim(output["x_hat"], x)
        models[model]["metrics"]["bpp"] = compute_bpp(output)

        models[model]["output"] = output

In [None]:
fig, axs = plt.subplots(5, 4, figsize=(16, 12))

orig = transforms.ToPILImage()(x.squeeze().cpu())

for ax in axs.ravel():
    ax.axis("off")

for i, model in enumerate(models.keys()):
    axs[i, 0].imshow(orig)
    axs[i, 0].title.set_text("Original")

    recons = transforms.ToPILImage()(models[model]["output"]["x_hat"].squeeze().cpu())
    axs[i, 1].imshow(recons)
    axs[i, 1].title.set_text("Reconstructed")

    diffs = torch.mean((models[model]["output"]["x_hat"] - x).abs(), axis=1).squeeze().cpu()
    axs[i, 2].imshow(diffs, cmap="viridis")
    axs[i, 2].title.set_text(f"Difference")

    axs[i, 3].set_axis_off()
    axs[i, 3].table(rowLabels=list(models[model]["metrics"].keys()),
                    cellText=[[f"{m:,}"] if isinstance(m, int) else [f"{m:.6f}"] for m in models[model]["metrics"].values()],
                    loc="center")

plt.show()