! pip install onnx 
! pip install onnxruntime

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import random
import time
import os
import sys
from torch.utils.data import DataLoader, Dataset
import math
from torchvision.transforms import v2
from PIL import Image
import matplotlib.pyplot as plt
import cv2
from torch.autograd import Variable
from torchvision import models,transforms
from transformers import pipeline
import torchvision.transforms.functional as TF
import onnx
import onnxruntime
from onnxruntime.quantization import quantize_dynamic, quantize_static, QuantType, QuantFormat, create_calibrator, CalibrationMethod, CalibrationDataReader
from transformers import AutoImageProcessor, AutoModelForDepthEstimation, AutoConfig

In [None]:
def convert_to_onnx(model, sample_input, model_name):
    """Convert PyTorch model to ONNX format"""
    model = model.cpu().eval()
    with torch.no_grad():
        torch.onnx.export(
            model,                    # PyTorch model
            sample_input,             # Sample input for tracing
            f"{model_name}.onnx",     # Output file name
            export_params=True,       # Store trained weights
            opset_version=13,         # ONNX version
            do_constant_folding=True, # Optimize constant foldings
            input_names=['input'],    # Model input names
            output_names=['output'],  # Model output names
            dynamic_axes={            # Dynamic axes for variable batch size
                'input': {0: 'batch_size'},
                'output': {0: 'batch_size'}
            }
        )

In [None]:
class QuntizationDataReader(CalibrationDataReader):
    def __init__(self, loader,model,device):

        self.torch_dl = loader
        self.model=model
        self.model.eval()
        self.device=device
        self.datasize = len(self.torch_dl)
        self.enum_data = iter(self.torch_dl)

    def to_numpy(self, pt_tensor):
        return pt_tensor.detach().cpu().numpy() if pt_tensor.requires_grad else pt_tensor.cpu().numpy()

    def get_next(self):
        batch = next(self.enum_data, None)
        if batch is not None:
            inputs, binary_labels = batch
            with torch.no_grad():
                op = self.model(inputs.to(self.device))
            return {'input': self.to_numpy(op)}
        else:
          return None

    def rewind(self):
        self.enum_data = iter(self.torch_dl)

In [None]:
def quantize_onnx_model(model_path, quantized_model_name, calibration_data_loader,depth_map_model ,device,quantization_type='static'):
    """Quantize ONNX model using either dynamic or static quantization"""
    if quantization_type == 'dynamic':
        # Dynamic quantization
        quantize_dynamic(
            model_input=model_path,
            model_output=f"{quantized_model_name}.onnx",
            weight_type=QuantType.QUInt8,
            per_channel=False,
            reduce_range=False,
            nodes_to_exclude=[]
        )
        
    else:
        # Static quantization with calibration
        data_reader = QuntizationDataReader(calibration_data_loader,depth_map_model,device)

        # Create calibrator with correct parameters

        quantize_static(
            model_input=model_path,
            model_output=f"{quantized_model_name}.onnx",
            calibration_data_reader=data_reader,
            quant_format=QuantFormat.QDQ,
            op_types_to_quantize=['Conv', 'MatMul', 'Gemm'],  # Specify operations to quantize
            per_channel=False,
            weight_type=QuantType.QInt8,  # Use Int8 quantization for weights
            activation_type=QuantType.QInt8,  # Use Int8 quantization for activations
            calibrate_method=CalibrationMethod.MinMax
        )

In [None]:
def create_inference_session(quantized_model_path):
    """Create ONNX Runtime inference session"""
    # Set up ONNX Runtime options for Processor (Raspberry Pi)
    options = onnxruntime.SessionOptions()
    options.intra_op_num_threads = 4  # Adjust based on your Raspberry Pi's CPU
    options.graph_optimization_level = onnxruntime.GraphOptimizationLevel.ORT_ENABLE_ALL
    
    # Create inference session
    session = onnxruntime.InferenceSession(
        quantized_model_path,
        options,
        providers=['CPUExecutionProvider']
    )
    return session

In [None]:
class CustomDataset(Dataset): 
    
    def __init__(self, path, device, transform=None, img_size=(128, 128)):
        super(CustomDataset, self).__init__()
        self.device = device
        self.images = []
        self.labels = []
        self.img_size = img_size
        self.transform = transform
        self.path = path
        self.num_channels = 1
        
        for folder in os.listdir(self.path):
            label = 1 if 'client' in folder else 0
            for image in os.listdir(os.path.join(self.path, folder)):
                if image.endswith('.jpg') or image.endswith('.png'):
                    img_path = os.path.join(self.path, folder, image)
                    self.images.append(img_path)
                    self.labels.append(label)
        
    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img = Image.open(self.images[idx]).convert("RGB")

        if self.transform:
            img= self.transform(img)
            
        return img, self.labels[idx]

In [None]:
img_size = (252, 252)
batch_size = 10
transf = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(20),
    transforms.ColorJitter(brightness=0.5, contrast=0.5, saturation=0.5, hue=0.1),
    transforms.Resize(img_size),
    transforms.ToTensor()
])

# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device = torch.device('cpu')

val_dataset = CustomDataset("/kaggle/input/increased-liveliness-detection/val",device,transf,img_size=img_size)
val_loader = DataLoader(val_dataset,batch_size=batch_size,shuffle=True)

In [None]:
def run_inference(session, input_data):
    """Run inference using ONNX Runtime session"""
    input_name = session.get_inputs()[0].name
    output_name = session.get_outputs()[0].name
    
    result = session.run([output_name], {input_name: input_data})
    return result[0]

In [None]:
class FineTuneDepthAnything(nn.Module):
    def __init__(self, device,load_trained=False,model_path=None):
        super(FineTuneDepthAnything, self).__init__()
        if load_trained:
            config = AutoConfig.from_pretrained("depth-anything/Depth-Anything-V2-Small-hf")
            self.depth_anything = AutoModelForDepthEstimation.from_config(config)
            state_dict = torch.load(model_path, map_location=device,weights_only=True)
                
            # Adjust keys in the state dictionary to match the model's keys
            new_state_dict = {}
            for key, value in state_dict.items():
                new_key = key.replace("depth_anything.", "")
                new_state_dict[new_key] = value

            # Load the adjusted state dictionary into the model
            self.depth_anything.load_state_dict(new_state_dict)
        else:
            self.depth_anything = AutoModelForDepthEstimation.from_pretrained("depth-anything/Depth-Anything-V2-Small-hf")
            for name,param in self.depth_anything.named_parameters():
                if 'head' in name or 'neck.fusion_stage.layers.2.residual_layer' in name or 'neck.fusion_stage.layers.3' in name:
                    param.requires_grad = True
                else:
                    param.requires_grad = False
        
        self.depth_anything = self.depth_anything.to(device)
                
    def forward(self, inp):
        # print(f'inp shape: {inp.shape}')
        return self.depth_anything(inp).predicted_depth.unsqueeze(1)

In [None]:
class CDC(nn.Module):
    '''
    This class performs central difference convolution (CDC) operation. First the normal convolution is performed and then the difference convolution is performed. The output is the difference between the two is taken.
    '''
    def __init__(self, in_channels, out_channels, kernel_size=3, stride=1,
                 padding=1, dilation=1, groups=1, bias=False, theta=0.7):

        super(CDC, self).__init__()
        self.bias= bias
        self.stride = stride
        self.groups = groups
        self.dilation = dilation
        self.theta = theta
        self.padding = padding
        if bias:
            self.bias = nn.Parameter(torch.zeros(out_channels))
        else:
            self.bias = None
            
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=kernel_size, stride=stride, padding=padding if kernel_size==3 else 0, dilation=dilation, groups=groups, bias=bias)

    def forward(self, x):
        out_normal = self.conv(x)
        # if conv.weight is (out_channels, in_channels, kernel_size, kernel_size),
        # then the  self.conv.weight.sum(2) will return (out_channels, in_channels,kernel_size)
        # and self.conv.weight.sum(2).sum(2) will return (out_channels,n_channels)
        kernel_diff = self.conv.weight.sum(2).sum(2)
        # Here we are adding extra dimensions such that the kernel_diff is of shape (out_channels, in_channels, 1, 1) so that convolution can be performed.
        kernel_diff = kernel_diff[:, :, None, None]
        out_diff = F.conv2d(input=x, weight=kernel_diff, bias=self.bias, stride=self.stride, padding=0, groups=self.groups)
        return out_normal - self.theta * out_diff
        

In [None]:
class conv_block_nested(nn.Module):
    def __init__(self, in_ch,  out_ch):
        super(conv_block_nested, self).__init__()
        self.activation = nn.ReLU(inplace=True)
        self.conv1 = CDC(in_ch, out_ch, kernel_size=3, padding=1, bias=True)
        self.bn1 = nn.BatchNorm2d(out_ch)
        self.conv2 = CDC(out_ch, out_ch, kernel_size=3, padding=1, bias=True)
        self.bn2 = nn.BatchNorm2d(out_ch)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.activation(x)
        
        x = self.conv2(x)
        x = self.bn2(x)
        output = self.activation(x)

        return output

In [None]:
class ClassifierUCDCN(nn.Module):
    def __init__(self, dropout=0.5):
        super(ClassifierUCDCN, self).__init__()
        self.layers =8
        self.dropout_prob = dropout
        self.img_size = (252, 252)
        self.hidden_size = 64
        self.conv1 = conv_block_nested(1,self.layers)
        self.relu = nn.ReLU()
        self.maxpool = nn.AvgPool2d(kernel_size=2,stride=2)
        self.conv2 = conv_block_nested(self.layers,1)
        # Maxpool
        self.linear_1 = nn.Linear((self.img_size[0]//4 * self.img_size[1]//4), self.hidden_size)
        self.dropout = nn.Dropout(self.dropout_prob)
        self.linear_2 = nn.Linear(self.hidden_size, 2)
        self.sigmoid = nn.Sigmoid()
    
    def forward(self, inp):
        conv1 = self.conv1(inp)
        maxpool = self.maxpool(conv1)
        conv2 = self.conv2(maxpool)
        maxpool2 = self.maxpool(conv2)
        linear_1 = self.linear_1(maxpool2.view(-1, self.img_size[0]//4 * self.img_size[1]//4))
        relu = self.relu(linear_1)
        dropout = self.dropout(relu)
        linear_2 = self.linear_2(dropout)
        return self.sigmoid(linear_2)

In [None]:
depth_map_model = FineTuneDepthAnything(device, load_trained=True, model_path='/kaggle/input/finetune_depth_anything/pytorch/63_epochs_trained/1/fine_tuning_depth_anything.pth').to(device)
sample_input = torch.randn((1,3,img_size[0],img_size[1]))
convert_to_onnx(depth_map_model,sample_input,'depth_map_model')

In [None]:
classifier = ClassifierUCDCN(dropout=0.5).to(device)
classifier.load_state_dict(torch.load('/kaggle/input/finetune_depth_anyhting_classifier/pytorch/64_size_7_epochs_trained/1/64_finetune_depth_anything_classifier.pth',map_location=device,weights_only=True))
sample_input_classifier = torch.randn((1,1,img_size[0],img_size[1]))
convert_to_onnx(classifier,sample_input_classifier,'classifier_model')

In [None]:
quantize_onnx_model('/kaggle/working/depth_map_model.onnx',"depth_quantized_model",None, depth_map_model,device,"dynamic")

In [None]:
quantize_onnx_model('/kaggle/working/classifier_model.onnx',"classifier_quantized_model", val_loader,depth_map_model, device,"static")

In [None]:
onnx_model = onnx.load('/kaggle/working/depth_map_model.onnx')
onnx.checker.check_model(onnx_model)

In [None]:
onnx_model = onnx.load('/kaggle/working/classifier_model.onnx')
onnx.checker.check_model(onnx_model)

In [None]:
onnx_model = onnx.load('/kaggle/working/depth_quantized_model.onnx')
onnx.checker.check_model(onnx_model)

In [None]:
onnx_model = onnx.load('/kaggle/working/classifier_quantized_model.onnx')
onnx.checker.check_model(onnx_model)

In [None]:
depth_session = create_inference_session('/kaggle/working/depth_map_model.onnx')
classifier_session = create_inference_session('/kaggle/working/classifier_model.onnx')

In [None]:
depth_session = create_inference_session('/kaggle/working/depth_quantized_model.onnx')
classifier_session = create_inference_session('/kaggle/working/classifier_quantized_model.onnx')