In [None]:
import os
import json
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import timm
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score
import numpy as np

# The NVIDIA hardware handshake
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Mathematical engine bound to: {device}")

# Define our established data topography
DATASET_DIR = "../dataset/PlantVillage"

Mathematical engine bound to: mps


In [10]:
dataset_full = datasets.ImageFolder(root=DATASET_DIR)
classes = dataset_full.classes

manifest = {}
for idx, class_name in enumerate(classes):
    # PlantVillage nomenclature is strictly formatted: "Species___Disease"
    parts = class_name.split("___")
    species = parts[0].lower()
    disease = parts[1].lower() if len(parts) > 1 else "unknown"
    
    if species not in manifest:
        manifest[species] = {}
    
    manifest[species][disease] = idx

# Crystallize the contract to disk for the Node.js backend
with open("plant_manifest.json", "w") as f:
    json.dump(manifest, f, indent=4)

print(f"Taxonomy contract sealed. Total classes modeled: {len(classes)}")
print("Sample of the taxonomy dictionary:")
print(json.dumps(list(manifest.items())[0:2], indent=2))

Taxonomy contract sealed. Total classes modeled: 38
Sample of the taxonomy dictionary:
[
  [
    "apple",
    {
      "apple_scab": 0,
      "black_rot": 1,
      "cedar_apple_rust": 2,
      "healthy": 3
    }
  ],
  [
    "blueberry",
    {
      "healthy": 4
    }
  ]
]


In [None]:
standard_transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

dataset_full = datasets.ImageFolder(root=DATASET_DIR, transform=standard_transform)

# Rigorous Stratification: We extract the targets to ensure the 80/20 split 
# respects the inherent class imbalance of the dataset.
targets = dataset_full.targets
train_idx, val_idx = train_test_split(
    np.arange(len(targets)), 
    test_size=0.2, 
    stratify=targets, 
    random_state=42
)

train_data = torch.utils.data.Subset(dataset_full, train_idx)
val_data = torch.utils.data.Subset(dataset_full, val_idx)

# The VRAM is capacious. Revert to an optimal batch size.
OPTIMAL_BATCH_SIZE = 32

train_loader = DataLoader(train_data, batch_size=OPTIMAL_BATCH_SIZE, shuffle=True, num_workers=2)
val_loader = DataLoader(val_data, batch_size=OPTIMAL_BATCH_SIZE, shuffle=False, num_workers=2)

Memory intake choked. Physical batch size locked to 2.


In [12]:
num_classes = len(dataset_full.classes)


model = timm.create_model('swinv2_tiny_window16_256', pretrained=True, num_classes=num_classes)
model = model.to(device)

print(f"Swin V2 Architecture initialized. Final layer adapted to output {num_classes} logits.")

model.safetensors:   0%|          | 0.00/119M [00:00<?, ?B/s]

Swin V2 Architecture initialized. Final layer adapted to output 38 logits.


In [19]:
class FocalLoss(nn.Module):
    def __init__(self, alpha=1, gamma=2, reduction='mean'):
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma
        self.reduction = reduction

    def forward(self, inputs, targets):
        ce_loss = F.cross_entropy(inputs, targets, reduction='none')
        pt = torch.exp(-ce_loss)
        focal_loss = self.alpha * (1 - pt) ** self.gamma * ce_loss
        
        if self.reduction == 'mean':
            return focal_loss.mean()
        return focal_loss.sum()

criterion = FocalLoss(gamma=2.0)
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4, weight_decay=1e-2)

In [None]:
epochs = 3
# We no longer need extreme accumulation because our physical batch size is 32
accumulation_steps = 1 

for epoch in range(epochs):
    model.train()
    running_loss = 0.0
    optimizer.zero_grad() 
    
    for i, (inputs, labels) in enumerate(train_loader):
        inputs, labels = inputs.to(device), labels.to(device)
        
        # Native CUDA mixed precision
        with torch.autocast(device_type="cuda", dtype=torch.float16):
            outputs = model(inputs)
            loss = criterion(outputs, labels)
        
        loss.backward() 
        optimizer.step()
        optimizer.zero_grad() 
            
        running_loss += loss.item()
        
    avg_loss = running_loss / len(train_loader)
    print(f"Epoch {epoch+1}/{epochs} - Trajectory Loss: {avg_loss:.4f}")

RuntimeError: MPS backend out of memory (MPS allocated: 9.06 GiB, other allocations: 4.59 MiB, max allowed: 9.07 GiB). Tried to allocate 12.00 MiB on private pool. Use PYTORCH_MPS_HIGH_WATERMARK_RATIO=0.0 to disable upper limit for memory allocations (may cause system failure).

In [None]:
model.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for inputs, labels in val_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        
        # Argmax collapses the 38 logits into a single integer prediction
        _, preds = torch.max(outputs, 1)
        
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

acc = accuracy_score(all_labels, all_preds)
f1 = f1_score(all_labels, all_preds, average='macro')

print(f"Validation Accuracy: {acc * 100:.2f}%")
print(f"Macro F1-Score: {f1:.4f}")

In [None]:
model.eval()

# We construct a tensor of shape [Batch_Size, Channels, Height, Width]
dummy_input = torch.randn(1, 3, 256, 256, device=device)
onnx_path = "swin_v2_mvp.onnx"

torch.onnx.export(
    model, 
    dummy_input, 
    onnx_path,
    export_params=True,
    opset_version=14,
    do_constant_folding=True,
    input_names=['input'],
    output_names=['output'],
    dynamic_axes={'input': {0: 'batch_size'}, 'output': {0: 'batch_size'}}
)

print(f"Architecture crystallized. Artifact {onnx_path} generated.")


In [None]:
import onnxruntime as ort
import numpy as np
from sklearn.metrics import accuracy_score, f1_score
from tqdm import tqdm # For a sleek progress bar

quantized_model_path = "swin_v2_mvp_quantized.onnx"

print("1. Initializing the ONNX Runtime Execution Provider...")
# We load the quantized brain. We explicitly use the CPU provider because 
# quantized integer operations are hyper-optimized for CPU execution.
session = ort.InferenceSession(quantized_model_path, providers=['CPUExecutionProvider'])

# Dynamically extract the exact input tensor name (e.g., "x")
input_name = session.get_inputs()[0].name
print(f"   Engine ready. Awaiting tensors at input node: '{input_name}'")

all_preds = []
all_labels = []

print("\n2. Commencing inference audit across the validation dataset...")
# We use the val_loader we already constructed earlier in the notebook
for inputs, labels in tqdm(val_loader, desc="Quantized Inference"):
    # The ONNX engine speaks strictly in NumPy arrays, not PyTorch tensors.
    # We must pull the tensors off the GPU (if they are there) and convert them.
    inputs_np = inputs.cpu().numpy()
    
    # Execute the forward pass through the quantized graph
    outputs = session.run(None, {input_name: inputs_np})[0]
    
    # The output is a matrix of raw logits. We take the argmax to find the winning class.
    preds = np.argmax(outputs, axis=1)
    
    all_preds.extend(preds)
    all_labels.extend(labels.cpu().numpy())

# 3. The Final Verdict
acc = accuracy_score(all_labels, all_preds)
f1 = f1_score(all_labels, all_preds, average='macro')

print("\n--- The Quantization Audit ---")
print(f"Quantized Accuracy: {acc * 100:.2f}%")
print(f"Quantized Macro-F1: {f1:.4f}")

In [None]:
import onnxruntime as ort
import numpy as np
from sklearn.metrics import accuracy_score, f1_score
from tqdm import tqdm

quantized_model_path = "swin_v2_mvp_quantized.onnx"

print("1. Initializing the ONNX Runtime Execution Provider...")
session = ort.InferenceSession(quantized_model_path, providers=['CPUExecutionProvider'])
input_name = session.get_inputs()[0].name
print(f"   Engine ready. Awaiting tensors strictly shaped [1, 3, 256, 256] at node: '{input_name}'")

all_preds = []
all_labels = []

print("\n2. Commencing inference audit across the validation dataset...")
for inputs, labels in tqdm(val_loader, desc="Quantized Inference"):
    inputs_np = inputs.cpu().numpy()
    labels_np = labels.cpu().numpy()
    
    # Our ONNX model is hardcoded to accept a batch size of EXACTLY 1.
    # We must slice our batch of 32 into 32 individual inferences.
    for i in range(inputs_np.shape[0]):
        # Slicing i:i+1 preserves the 4D tensor shape: [1, 3, 256, 256]
        single_leaf_tensor = inputs_np[i:i+1] 
        
        # Execute the forward pass
        outputs = session.run(None, {input_name: single_leaf_tensor})[0]
        
        # Extract the winning class prediction
        pred = np.argmax(outputs, axis=1)[0]
        
        all_preds.append(pred)
        all_labels.append(labels_np[i])

# 3. The Final Verdict
acc = accuracy_score(all_labels, all_preds)
f1 = f1_score(all_labels, all_preds, average='macro')

print("\n--- The Quantization Audit ---")
print(f"Quantized Accuracy: {acc * 100:.2f}%")
print(f"Quantized Macro-F1: {f1:.4f}")