In [1]:
import os
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader, random_split, ConcatDataset
import pytorch_lightning as pl
from pytorch_lightning import Trainer
from torch import nn
from torch.optim import Adam
from torchvision.transforms import  Compose, ToTensor, Resize, RandomHorizontalFlip, RandomRotation, ColorJitter, RandomResizedCrop, Lambda
from PIL import Image
import matplotlib.pyplot as plt
from torchsummary import summary
import torchmetrics
from lightning.pytorch.loggers import WandbLogger
import wandb
import random
from datetime import datetime
from pytorch_lightning.callbacks import ModelCheckpoint, EarlyStopping

### MacOS

Set start method to fork and device to metal

In [2]:
import torch.multiprocessing as mp
mp.set_start_method("fork", force=True)  # Ensure fork is used on macOS

device = torch.device("cpu")

if torch.mps.is_available():
    #torch.mps.set_buffer_capacity(1)
    #torch.mps.set_reuse_buffers(True)
    #torch.mps.initialize()
    device = torch.device("mps")
    num_workers = 11

In [3]:
# Dataset Path
DATASET_PATHS = ["../Dataset/local_dataset_all/train"]
DATASET_PATHS_TEST = ["../Dataset/local_dataset_all/test"]
DATASET_PATHS_VALID = ["../Dataset/local_dataset_all/val"] # Not used yet
DATASET_PATH_SPLIT = "../Dataset/MIT_local_data" # This dataset is split between train, val and test
PRICES_FILE_PATH = "../Dataset/prices.txt"
IMAGE_SIZE = (200, 200)
BATCH_SIZE = 64
SEED = 42

In [4]:
from lightning.pytorch import seed_everything

np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
seed_everything(SEED)

Seed set to 42


42

In [5]:
import helper_functions as hp
num_classes = hp.get_num_classes(PRICES_FILE_PATH)
print(num_classes)

class ImagePriceDataset(Dataset):
    def __init__(self, dataset_paths, transform=None):
        self.files = []
        for dataset_path in dataset_paths:
            if not os.path.exists(dataset_path):
                raise ValueError(f"Path {dataset_path} doesn't exist.")
            for f in os.listdir(dataset_path):
                if f.endswith(".jpg"):
                    self.files.append((f, dataset_path))
        
        self.transform = transform

    def __len__(self):
        return len(self.files)

    def __getitem__(self, idx):
        file_name, dataset_path = self.files[idx]
        image, label = hp.load_image_labels_classify(file_name, dataset_path, IMAGE_SIZE, num_classes)

        # Wenn das Bild im float32 Format vorliegt, skaliere es und wandle es in uint8 um
        if image.dtype == np.float32:
            image = (image * 255).astype(np.uint8)  # Skalieren auf [0, 255] und in uint8 umwandeln

        # Konvertiere das numpy-Array in ein PIL-Image
        image = Image.fromarray(image)

        if self.transform:
            image = self.transform(image)

        return image, torch.tensor(label, dtype=torch.float32)
    
# Transforms
transform = Compose([
    #Resize(IMAGE_SIZE),               # Skaliere die Bilder auf die gewünschte Größe
    Lambda(lambda x: hp.custom_augmentation(x)),
    RandomHorizontalFlip(p=0.5),       # Zufälliges horizontales Spiegeln mit 50% Wahrscheinlichkeit
    RandomRotation(degrees=180),        # Zufällige Rotation um bis zu ±15 Grad
    #RandomResizedCrop(IMAGE_SIZE, scale=(0.9, 1.0)),
    ColorJitter(brightness=0.04,        # Zufällige Anpassung der Helligkeit
                contrast=0.1,          # Kontrast
                saturation=0.2,        # Sättigung
                hue=0.07),              # Farbton
    ToTensor()                         # Konvertiere das Bild zu einem Tensor
])

transform_test = Compose([
    ToTensor()                         # Konvertiere das Bild zu einem Tensor
])


# Dataset
train_dataset = ImagePriceDataset(DATASET_PATHS, transform=transform)
test_dataset = ImagePriceDataset(DATASET_PATHS_TEST, transform_test)
# Train-Valid Split
if len(DATASET_PATHS_VALID) > 0:
    val_dataset = ImagePriceDataset(DATASET_PATHS_VALID, transform=transform)
else:    
    train_size = int(0.8 * len(train_dataset))
    val_size = len(train_dataset) - train_size
    train_dataset, val_dataset = random_split(train_dataset, [train_size, val_size])

if DATASET_PATH_SPLIT is not None:
    dataset = ImagePriceDataset([DATASET_PATH_SPLIT], transform=transform_test) # We use no augmentation for the split dataset
    train_size = int(0.7 * len(dataset))
    val_size = int(0.1 * len(dataset))
    test_size = len(dataset) - train_size - val_size
    train_split_dataset, val_split_dataset, test_split_dataset = random_split(dataset, [train_size, val_size, test_size])
    train_dataset = ConcatDataset([train_dataset, train_split_dataset]) 
    val_dataset = ConcatDataset([val_dataset, val_split_dataset])
    test_dataset = ConcatDataset([test_dataset, test_split_dataset])


# DataLoader
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=num_workers)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=num_workers)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=num_workers)

print(f"Training set size: {len(train_loader.dataset)} samples")
print(f"Validation set size: {len(val_loader.dataset)} samples")
print(f"Test set size: {len(test_loader.dataset)} samples")


61
Training set size: 18364 samples
Validation set size: 2338 samples
Test set size: 4276 samples


## Tenserflow Lite

In [6]:
model = hp.TrainModel.load_from_checkpoint("checkpoints/classifier/train_Efficientnet-classifier_20Jan-23:18:57.ckpt")
model.eval()
# Assuming you have a PyTorch model instance `model`
# and a dummy input tensor that matches your model's input size
dummy_input = torch.randn(1, 3, 200, 200)  # Adjust dimensions as needed

# Export the model to ONNX
torch.onnx.export(model.to("cpu"), dummy_input, "quantization/model.onnx", verbose=False)


/Users/mats/miniconda3/envs/ML/lib/python3.11/site-packages/pytorch_lightning/utilities/parsing.py:209: Attribute 'model' is an instance of `nn.Module` and is already saved during checkpointing. It is recommended to ignore them using `self.save_hyperparameters(ignore=['model'])`.


In [7]:
import onnx
from onnx_tf.backend import prepare

# Load the ONNX model
onnx_model = onnx.load("quantization/model.onnx")

# Convert to TensorFlow
tf_rep = prepare(onnx_model)

# Export the TensorFlow model
tf_rep.export_graph("quantization/model.pb")

NotFoundError: dlopen(/Users/mats/miniconda3/envs/ML/lib/python3.11/site-packages/tensorflow-plugins/libmetal_plugin.dylib, 0x0006): Symbol not found: __ZN3tsl8internal10LogMessageC1EPKcii
  Referenced from: <D2EF42E3-3A7F-39DD-9982-FB6BCDC2853C> /Users/mats/miniconda3/envs/ML/lib/python3.11/site-packages/tensorflow-plugins/libmetal_plugin.dylib
  Expected in:     <2814A58E-D752-317B-8040-131217E2F9AA> /Users/mats/miniconda3/envs/ML/lib/python3.11/site-packages/tensorflow/python/_pywrap_tensorflow_internal.so

In [None]:
import tensorflow as tf

def representative_dataset():
  np.random.shuffle(x_train)
  for sample in x_train[:50]:
    yield [sample.astype(np.float32)]

'''def representative_dataset():
  for data in tf.data.Dataset.from_tensor_slices((x_train)).batch(1).take(100):
    yield {"image": tf.dtypes.cast(data, tf.float32)}'''

converter = tf.lite.TFLiteConverter.from_saved_model("quantization/model.pb")

converter.optimizations = [tf.lite.Optimize.DEFAULT]  # Default optimizations, which include quantization
converter.representative_dataset = representative_dataset
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.inference_input_type = tf.int8  
converter.inference_output_type = tf.int8  


tflite_model_quantized = converter.convert()

# Save the optimized model to a file
with open("quantization/model_optimized_quantized.tflite", "wb") as f:
    f.write(tflite_model_quantized)


# Quantization with Pytorch (Lightning)

In [None]:
import torch
import torch.quantization
from torchvision import models

# Load your pre-trained model
model = hp.TrainModel.load_from_checkpoint("checkpoints/classifier/train_Efficientnet-classifier_20Jan-23:18:57.ckpt")
model.eval()

# Specify the quantization configuration
model.qconfig = torch.quantization.get_default_qconfig('fbgemm')

# Fuse the layers (necessary for quantization)
#model_fused = torch.quantization.fuse_modules(model, [['conv1', 'bn1', 'relu']])

# Prepare the model for quantization
model_prepared = torch.quantization.prepare(model)

# Calibrate the model with a representative dataset
# For example, you can use a few batches of your training data
# Here, we assume `data_loader` is your data loader
for inputs, _ in train_loader:
    model_prepared(inputs)

# Convert the model to a quantized version
model_quantized = torch.quantization.convert(model_prepared)


/Users/mats/miniconda3/envs/ML/lib/python3.11/site-packages/pytorch_lightning/utilities/parsing.py:209: Attribute 'model' is an instance of `nn.Module` and is already saved during checkpointing. It is recommended to ignore them using `self.save_hyperparameters(ignore=['model'])`.


RuntimeError: Mismatched Tensor types in NNPack convolutionOutput