In [1]:
import os
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import pandas as pd
import torchvision
import torch.nn.utils.prune as prune
import torch.quantization
from torchvision import transforms
from PIL import Image
import random
import gradio as gr

In [37]:
seed = 42  # Choose your desired seed value
random.seed(seed)
if torch.cuda.is_available():
  torch.cuda.manual_seed_all(seed)
else:
  torch.manual_seed(seed)

In [38]:
def batch_mean_and_sd(loader):

    cnt = 0
    fst_moment = torch.empty(3)
    snd_moment = torch.empty(3)

    for images, _ in loader:
        b, c, h, w = images.shape
        nb_pixels = b * h * w
        sum_ = torch.sum(images, dim=[0, 2, 3])
        sum_of_square = torch.sum(images ** 2,
                                  dim=[0, 2, 3])
        fst_moment = (cnt * fst_moment + sum_) / (cnt + nb_pixels)
        snd_moment = (cnt * snd_moment + sum_of_square) / (cnt + nb_pixels)
        cnt += nb_pixels

    mean, std = fst_moment, torch.sqrt(snd_moment - fst_moment ** 2)
    return mean,std

In [None]:
mean, std = batch_mean_and_sd(image_dataloader)
print("mean and std: \n", mean, std)

In [40]:
class MinMaxScaler(nn.Module):
  def __init__(self, min_values, max_values):
    self.min_values = min_values
    self.max_values = max_values

  def __call__(self, data):
    return (data - self.min_values) / (self.max_values - self.min_values)

In [41]:
transform = transforms.Compose([
    transforms.Resize(600),
    transforms.ToTensor(),
    transforms.Normalize((0.2498, 0.3010, 0.1964), (0.1668, 0.1603, 0.1697)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    MinMaxScaler(1.8, 7.1)
])

In [5]:
# Define the custom dataset class
class ImageLabelDataset(Dataset):
    def __init__(self, image_dir, label_file, transform=None):
        self.image_dir = image_dir
        self.label_file = label_file
        self.transform = transform

        # Read labels from CSV file
        label_df = pd.read_csv(label_file)
        self.image_paths = label_df['image'].tolist()
        self.labels = label_df['FVC'].tolist()

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, index):
        image_path = os.path.join(self.image_dir, self.image_paths[index])
        image = Image.open(image_path)
        label = self.labels[index]

        if self.transform is not None:
            image = self.transform(image)

        return image, label

In [6]:
train_dataset = ImageLabelDataset('/home/emmanuel/Project/ImageDataset/Training', '/home/emmanuel/Project/ImageDataset/Training/training.csv', transform=transform)
val_dataset = ImageLabelDataset('/home/emmanuel/Project/ImageDataset/Validation', '/home/emmanuel/Project/ImageDataset/Validation/validation.csv', transform=transform)

In [7]:
train_dataloader = DataLoader(train_dataset, batch_size=4, shuffle=True, num_workers=2)
val_dataloader = DataLoader(val_dataset, batch_size=4, shuffle=True, num_workers=2)

In [6]:
class EfficientVisionTransformer(nn.Module):

  def __init__(self, d_model=600, nhead=8, num_encoder_layers=6, num_decoder_layers=6, dim_feedforward=2048, dropout=0.2, batch_first=True):


    super().__init__()
    # Patch embedding
    self.patch_embedding = nn.Conv2d(3, d_model, kernel_size=16, stride=16)

    # Efficient Transformer encoder
    encoder_layer = nn.TransformerEncoderLayer(d_model, nhead, dim_feedforward, dropout, batch_first=True)
    self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_encoder_layers)

    # Decoder
    self.decoder = nn.Linear(d_model, 1)

  def forward(self, x):
    x = self.patch_embedding(x)
    x = x.flatten(2).transpose(1, 2)
    x = model.transformer_encoder(x)
    x = self.decoder(x.mean(dim=1))
    return x

In [14]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = torch.compile(EfficientVisionTransformer()).to(device)

In [53]:
# Define the optimizer and loss function
optimizer = optim.Adam(model.parameters(), lr=0.0001)
criterion = nn.MSELoss(reduction='mean')

# Model Optimization

In [None]:
#Trains and Optimize a PyTorch model for a specified number of epochs, tracking loss and accuracy on both training and validation sets.

# Define pruning and quantization schedule
prune_quantize_schedule = [
    (0, 0),  # No pruning at the beginning
    (3, 0.4),  # Prune 40% after 2 epochs

]

train_losses, val_losses = [], []  # Track losses

num_epochs = 5

for epoch in range(num_epochs):

  # Training phase
  model.train()  # Set model to training mode
  running_loss = 0.0 # Initialize counters

  for images, labels in train_dataloader:
    images = images.float().to(device)  # Move images to GPU if available
    labels = labels.float().to(device) # Move labels to GPU if available


    outputs = model(images)  # Forward pass
    labels = labels.view(-1, 1)

    loss = criterion(outputs, labels)


    optimizer.zero_grad()  # Clear gradients
    loss.backward()  # Backward pass
    optimizer.step()  # Update model parameters


    # Update counters
    running_loss += loss.item() * images.size(0)

  epoch_train_loss = running_loss / len(train_dataloader.dataset)
  train_losses.append(epoch_train_loss)


  # Validation phase
  model.eval()  # Set model to evaluation mode
  running_loss, running_corrects = 0.0, 0

  with torch.no_grad():  # Disable gradient calculation during validation
    for images, labels in val_dataloader:
      images = images.float().to(device)
      labels = labels.float().to(device)

      outputs = model(images)
      labels = labels.view(-1, 1)
      loss = criterion(outputs, labels)


      running_loss += loss.item() * images.size(0)


  epoch_val_loss = running_loss / len(val_dataloader.dataset)
  val_losses.append(epoch_val_loss)

  # Print epoch results
  print("=============================")
  print(f"Epoch {epoch+1}/{num_epochs}")
  print(f"Train Loss: {epoch_train_loss:.3f}")
  print(f"Val Loss: {epoch_val_loss:.3f}")

  for prune_epoch, prune_ratio in prune_quantize_schedule:
    if epoch == prune_epoch:
      for name, module in model.named_modules():
        if isinstance(module, nn.Linear):
          prune.l1_unstructured(module, name='weight', amount=prune_ratio)  # Apply pruning

      for name, module in model.named_modules():
        if isinstance(module, nn.Conv2d):
          torch.quantization.quantize_dynamic(module, {nn.Conv2d}, dtype=torch.qint8)# Apply quantization

      print("=================================================")
      print(f"Pruning model by {prune_ratio:.2f} at epoch {epoch + 1}")
      print(f"Quantizing model at epoch {epoch + 1}")

# Model Analysis

In [40]:
model = EfficientVisionTransformer()  # Instantiate the model architecture

In [8]:
print(model)

OptimizedModule(
  (_orig_mod): EfficientVisionTransformer(
    (patch_embedding): Conv2d(3, 600, kernel_size=(16, 16), stride=(16, 16))
    (transformer_encoder): TransformerEncoder(
      (layers): ModuleList(
        (0-5): 6 x TransformerEncoderLayer(
          (self_attn): MultiheadAttention(
            (out_proj): NonDynamicallyQuantizableLinear(in_features=600, out_features=600, bias=True)
          )
          (linear1): Linear(in_features=600, out_features=2048, bias=True)
          (dropout): Dropout(p=0.2, inplace=False)
          (linear2): Linear(in_features=2048, out_features=600, bias=True)
          (norm1): LayerNorm((600,), eps=1e-05, elementwise_affine=True)
          (norm2): LayerNorm((600,), eps=1e-05, elementwise_affine=True)
          (dropout1): Dropout(p=0.2, inplace=False)
          (dropout2): Dropout(p=0.2, inplace=False)
        )
      )
    )
    (decoder): Linear(in_features=600, out_features=1, bias=True)
  )
)


# Compute Model FLOPS

In [16]:
from torchinfo import summary

# Assuming your model is named 'model'
model = EfficientVisionTransformer()
model = torch.load('/home/emmanuel/Project/model3.pt', map_location=torch.device('cpu'))
# Load your model here
# Get model summary, including FLOPs
summary(model, input_tensor=(1,3,600,600))

Layer (type:depth-idx)                                            Param #
EfficientVisionTransformer                                        --
├─Conv2d: 1-1                                                     461,400
├─TransformerEncoder: 1-2                                         --
│    └─ModuleList: 2-1                                            --
│    │    └─TransformerEncoderLayer: 3-1                          2,778,008
│    │    └─TransformerEncoderLayer: 3-2                          2,778,008
│    │    └─TransformerEncoderLayer: 3-3                          2,778,008
│    │    └─TransformerEncoderLayer: 3-4                          2,778,008
│    │    └─TransformerEncoderLayer: 3-5                          2,778,008
│    │    └─TransformerEncoderLayer: 3-6                          2,778,008
├─Linear: 1-3                                                     361
Total params: 17,129,809
Trainable params: 17,129,809
Non-trainable params: 0

# **System Testing**
### **Functionality Testing**

In [None]:
batch_size = 1
channels = 3
height = 600
width = 600

In [None]:
x = torch.randn(batch_size, channels, height, width)

In [None]:
def test_patch_embedding_output_shape(sample_data):
    model = EfficientVisionTransformer()
    output = model.patch_embedding(sample_data)
    assert output.shape == (batch_size, height, height // 16, width // 16)

In [None]:
def test_decoder_output_shape(sample_data):
    model = EfficientVisionTransformer()
    output = model(x)
    assert output.shape == (batch_size, 1)

In [None]:
def test_transformer_encoder(sample_data):
    x = model.patch_embedding(sample_data)
    x = x.flatten(2).transpose(1, 2)
    output = model.transformer_encoder(x)
    assert output.shape == (batch_size, ((height // 16) * (width // 16)), height)

## Performance Testing
 ### Execution Time on a CPU

In [55]:
import torch
import torchvision.models as models
from torchvision.models import ResNet50_Weights

from torch.profiler import profile, record_function, ProfilerActivity

model = EfficientVisionTransformer()
model = torch.load('/home/emmanuel/Project/model3.pt', map_location=torch.device('cpu'))

inputs = torch.randn(1, 3, 600, 600).to(device)
with profile(activities=[ProfilerActivity.CPU], record_shapes=True) as prof:
    with record_function("model_inference"):
        model(inputs)
print(prof.key_averages().table(sort_by="cpu_time_total", row_limit=10))

STAGE:2024-01-03 09:50:13 5071:5071 ActivityProfilerController.cpp:311] Completed Stage: Warm Up


--------------------------------------------  ------------  ------------  ------------  ------------  ------------  ------------  
                                        Name    Self CPU %      Self CPU   CPU total %     CPU total  CPU time avg    # of Calls  
--------------------------------------------  ------------  ------------  ------------  ------------  ------------  ------------  
                             model_inference         3.05%      18.514ms       100.00%     606.853ms     606.853ms             1  
                                aten::linear         0.07%     412.000us        43.03%     261.134ms      10.445ms            25  
          aten::scaled_dot_product_attention         0.00%      22.000us        42.31%     256.740ms      42.790ms             6  
    aten::_scaled_dot_product_attention_math         3.19%      19.362ms        42.30%     256.718ms      42.786ms             6  
                                 aten::addmm        37.10%     225.170ms        40.

STAGE:2024-01-03 09:50:14 5071:5071 ActivityProfilerController.cpp:317] Completed Stage: Collection
STAGE:2024-01-03 09:50:14 5071:5071 ActivityProfilerController.cpp:321] Completed Stage: Post Processing


## Memory Usage

In [56]:
import torch
import torchvision.models as models
from torch.profiler import profile, record_function, ProfilerActivity

model = EfficientVisionTransformer()
model = torch.load('/home/emmanuel/Project/model3.pt', map_location=torch.device('cpu'))

inputs = torch.randn(1, 3, 600, 600).to(device)
with profile(activities=[ProfilerActivity.CPU],profile_memory=True, record_shapes=True) as prof:
  with record_function("model_inference"):
    model(inputs)

print(prof.key_averages().table(sort_by="self_cpu_memory_usage", row_limit=10))

STAGE:2024-01-03 09:50:50 5071:5071 ActivityProfilerController.cpp:311] Completed Stage: Warm Up


--------------------------------------------  ------------  ------------  ------------  ------------  ------------  ------------  ------------  ------------  
                                        Name    Self CPU %      Self CPU   CPU total %     CPU total  CPU time avg       CPU Mem  Self CPU Mem    # of Calls  
--------------------------------------------  ------------  ------------  ------------  ------------  ------------  ------------  ------------  ------------  
                                   aten::bmm        26.17%     163.707ms        26.17%     163.707ms      13.642ms     361.97 Mb     361.97 Mb            12  
                              aten::_softmax        10.67%      66.764ms        10.67%      66.764ms      11.127ms     343.17 Mb     343.17 Mb             6  
                                 aten::addmm        36.47%     228.187ms        39.56%     247.488ms      10.312ms     148.77 Mb     148.77 Mb            24  
                                 aten::empty  

STAGE:2024-01-03 09:50:51 5071:5071 ActivityProfilerController.cpp:317] Completed Stage: Collection
STAGE:2024-01-03 09:50:51 5071:5071 ActivityProfilerController.cpp:321] Completed Stage: Post Processing


## Size (Computational and Memory footprint)

In [87]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

In [91]:
def compute_model_size(model):
    param_size = sum(p.numel() * p.element_size() for p in model.parameters())
    buffer_size = sum(b.numel() * b.element_size() for b in model.buffers())
    total_size = param_size + buffer_size
    return total_size / (1024 * 1024)  # Convert to MB

In [93]:
# Assuming your model is named 'model'
model = EfficientVisionTransformer()
model = torch.load('/home/emmanuel/Project/model3.pt', map_location=torch.device('cpu'))

num_params = count_parameters(model)
print(f"Number of parameters: {num_params}")

model_size = compute_model_size(model)
print(f"Model size (MB): {model_size:.2f}")

Number of parameters: 23892289
Model size (MB): 155.63


# Model Visualization 

In [95]:
model = EfficientVisionTransformer()
model = torch.load('/home/emmanuel/Project/model3.pt', map_location=torch.device('cpu'))



def inverse_transform(prediction, min_values, max_values, mean, std):
  """
  Applies inverse MinMaxScaler and Normalization to a prediction.

  Args:
    prediction: The predicted value (float32).
    min_values: An array of minimum values used for scaling (float32).
    max_values: An array of maximum values used for scaling (float32).
    mean: An array of mean values used for normalization (float32).
    std: An array of standard deviation values used for normalization (float32).

  Returns:
    The real-world prediction after inverse transformation (float32).
  """

  output = (prediction - 0.5) * (max_values - min_values) + min_values
  for i in range(output.shape[0]):
    for j in range(output.shape[1]):
        output[i, j] = (output[i, j] * std[j]) + mean[j]
  return output.item() # extract single value in case of tensor



def predict(image):
    if isinstance(image, np.ndarray):
      image = Image.fromarray(image)  # Convert to PIL image


    # Preprocess image (assuming PyTorch and required transformations)
    preprocessed_image = transform(image).unsqueeze(0).float()


    # Replace min_values, max_values, mean, and std with your actual values
    min_values = 1.8  # Array of minimum values
    max_values = 7.1  # Array of maximum values
    mean = [0.2498, 0.3010, 0.1964] # Mean values from normalization
    std = [0.1668, 0.1603 , 0.1697] # Std values from normalization

    # Run model inference
    prediction = model(preprocessed_image)
    output = inverse_transform(prediction, min_values, max_values, mean, std)

    arr = np.array(output)
    output = np.clip(arr, 0,1)
    # Format output to 2 decimal places
    model_output = f"This image has a Vegetation Index of {output:.2f}"

    return model_output


In [96]:
# Define Gradio interface
inputs = gr.Image(label="Upload image")
outputs = gr.Textbox(label="Fractional Vegetation Index")

interface = gr.Interface(fn=predict, inputs=inputs, outputs=outputs, title="Fractional Vegetation Index Prediction Model")

# Launch the interface
interface.launch(share=True)

Running on local URL:  http://127.0.0.1:7883
Running on public URL: https://f29d168563c83205cc.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from Terminal to deploy to Spaces (https://huggingface.co/spaces)


