In [2]:
import tensorflow as tf
import datetime
import os
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, random_split

In [4]:
# Load the TensorBoard notebook extension
%load_ext tensorboard 

In [6]:
# Path to the main directory
main_dir = r"C:\Users\User\Desktop\Shahla share\ai\Project\real_waste\realwaste-main/RealWaste"
# Define transformations for data augmentation and preprocessing
transform = transforms.Compose([
    transforms.Resize((256, 256)),            # Resize to 256x256
    transforms.CenterCrop(224),              # Crop to 224x224
    transforms.ToTensor(),                   # Convert to tensor
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])  # Normalize
])

# Load the dataset
dataset = datasets.ImageFolder(root=main_dir, transform=transform)

# Print class-to-index mapping
print("Class-to-Index Mapping:", dataset.class_to_idx)

# Split the dataset into training, validation, and test sets
train_size = int(0.7 * len(dataset))  # 70% for training
val_size = int(0.15 * len(dataset))   # 15% for validation
test_size = len(dataset) - train_size - val_size  # Remaining 15% for testing

train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

# Create data loaders
batch_size = 32

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Check the number of samples in each split
print(f"Training set size: {len(train_dataset)}")
print(f"Validation set size: {len(val_dataset)}")
print(f"Test set size: {len(test_dataset)}")


Class-to-Index Mapping: {'Cardboard': 0, 'Food Organics': 1, 'Glass': 2, 'Metal': 3, 'Miscellaneous Trash': 4, 'Paper': 5, 'Plastic': 6, 'Textile Trash': 7, 'Vegetation': 8}
Training set size: 3326
Validation set size: 712
Test set size: 714


In [8]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class ResNet18(nn.Module):
    def __init__(self, num_classes=9):
        super(ResNet18, self).__init__()
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        # ResNet layers (basic blocks)
        self.layer1 = self._make_layer(64, 64, 2)
        self.layer2 = self._make_layer(64, 128, 2, stride=2)
        self.layer3 = self._make_layer(128, 256, 2, stride=2)
        self.layer4 = self._make_layer(256, 512, 2, stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512, num_classes)

    def _make_layer(self, in_channels, out_channels, blocks, stride=1):
        layers = []
        layers.append(self._block(in_channels, out_channels, stride))
        for _ in range(1, blocks):
            layers.append(self._block(out_channels, out_channels))
        return nn.Sequential(*layers)

    def _block(self, in_channels, out_channels, stride=1):
        return nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x

# Instantiate the model
resnet18_model = ResNet18(num_classes=9)
print(resnet18_model)


ResNet18(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): Sequential(
      (0): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU(inplace=True)
    )
    (1): Sequential(
      (0): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU(inplace=True)
    )
  )
  (layer2): Sequential(
    (0): Sequential(
      (0): Conv2d(64, 128, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
      (1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine

In [10]:
class VGG16(nn.Module):
    def __init__(self, num_classes=9):
        super(VGG16, self).__init__()
        self.features = self._make_layers([
            64, 64, 'M', 
            128, 128, 'M', 
            256, 256, 256, 'M', 
            512, 512, 512, 'M', 
            512, 512, 512, 'M'
        ])
        self.classifier = nn.Sequential(
            nn.Linear(512 * 7 * 7, 4096),
            nn.ReLU(True),
            nn.Dropout(),
            nn.Linear(4096, 4096),
            nn.ReLU(True),
            nn.Dropout(),
            nn.Linear(4096, num_classes)
        )

    def _make_layers(self, cfg):
        layers = []
        in_channels = 3
        for x in cfg:
            if x == 'M':
                layers += [nn.MaxPool2d(kernel_size=2, stride=2)]
            else:
                layers += [
                    nn.Conv2d(in_channels, x, kernel_size=3, padding=1),
                    nn.ReLU(inplace=True),
                    nn.BatchNorm2d(x)
                ]
                in_channels = x
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.features(x)
        x = x.view(x.size(0), -1)
        x = self.classifier(x)
        return x

# Instantiate the model
vgg16_model = VGG16(num_classes=9)
print(vgg16_model)


VGG16(
  (features): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace=True)
    (2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (3): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (4): ReLU(inplace=True)
    (5): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (6): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (7): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (8): ReLU(inplace=True)
    (9): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (10): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (11): ReLU(inplace=True)
    (12): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (13): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (14): Conv2d(128, 2

In [11]:
from torch.utils.tensorboard import SummaryWriter

# Create a writer instance
writer = SummaryWriter(log_dir='runs/experiment1')


In [14]:
from sklearn.metrics import f1_score, classification_report
import torch

def evaluate_model_with_metrics(model, data_loader, device, num_classes):
    """
    Evaluate the model and calculate accuracy, F1-score, and classwise accuracy.
    """
    model.eval()
    correct = 0
    total = 0
    all_preds = []
    all_labels = []

    class_correct = [0] * num_classes
    class_total = [0] * num_classes

    with torch.no_grad():
        for inputs, labels in data_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)

            correct += (preds == labels).sum().item()
            total += labels.size(0)

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

            # Classwise metrics
            for label, pred in zip(labels, preds):
                if label == pred:
                    class_correct[label] += 1
                class_total[label] += 1

    # Overall accuracy
    accuracy = correct / total

    # F1-Score
    f1 = f1_score(all_labels, all_preds, average='weighted')

    # Classwise accuracy
    classwise_accuracy = [c / t if t > 0 else 0 for c, t in zip(class_correct, class_total)]

    return accuracy, f1, classwise_accuracy


In [15]:
def train_and_evaluate_with_tensorboard(
    model, train_loader, val_loader, test_loader, optimizer, criterion, num_epochs, device, num_classes, writer
):
    best_val_acc = 0.0

    for epoch in range(num_epochs):
        # Training phase
        model.train()
        train_loss = 0.0
        train_correct = 0
        total = 0

        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            train_loss += loss.item()
            _, preds = torch.max(outputs, 1)
            train_correct += (preds == labels).sum().item()
            total += labels.size(0)

        train_acc = train_correct / total

        # Validation phase
        val_acc, val_f1, val_classwise_acc = evaluate_model_with_metrics(model, val_loader, device, num_classes)

        # Log metrics to TensorBoard
        writer.add_scalar("Loss/train", train_loss / len(train_loader), epoch)
        writer.add_scalar("Accuracy/train", train_acc, epoch)
        writer.add_scalar("Accuracy/val", val_acc, epoch)
        writer.add_scalar("F1/val", val_f1, epoch)

        # Log classwise accuracy as individual scalars
        for i, acc in enumerate(val_classwise_acc):
            writer.add_scalar(f"Classwise Accuracy/val_class_{i}", acc, epoch)

        print(f"Epoch [{epoch+1}/{num_epochs}]")
        print(f"  Train Loss: {train_loss / len(train_loader):.4f}, Train Accuracy: {train_acc:.4f}")
        print(f"  Validation Accuracy: {val_acc:.4f}, Validation F1-Score: {val_f1:.4f}")

        # Save the best model
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            torch.save(model.state_dict(), 'best_model.pth')

    # Final evaluation on the test set
    print("Final evaluation on the test set:")
    test_acc, test_f1, classwise_acc = evaluate_model_with_metrics(model, test_loader, device, num_classes)

    print(f"  Test Accuracy: {test_acc:.4f}")
    print(f"  Test F1-Score: {test_f1:.4f}")
    print(f"  Classwise Accuracy: {classwise_acc}")

    # Log test metrics to TensorBoard
    writer.add_scalar("Accuracy/test", test_acc, num_epochs)
    writer.add_scalar("F1/test", test_f1, num_epochs)
    for i, acc in enumerate(classwise_acc):
        writer.add_scalar(f"Classwise Accuracy/test_class_{i}", acc, num_epochs)

    return test_acc, test_f1, classwise_acc


In [20]:
%tensorboard --logdir=runs



In [21]:
def test_model(model, test_loader, device):
    model = model.to(device)
    test_acc = evaluate_model(model, test_loader, device)
    print(f"Test Accuracy: {test_acc:.4f}")


In [26]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
num_classes = 9
resnet18 = ResNet18(num_classes=num_classes).to(device)
optimizer_sgd = torch.optim.SGD(resnet18.parameters(), lr=0.01, momentum=0.9)
criterion = nn.CrossEntropyLoss()

In [17]:
print("Training ResNet18 with SGD...")
resnet_test_acc, resnet_test_f1, resnet_classwise_acc = train_and_evaluate_with_tensorboard(
    resnet18, train_loader, val_loader, test_loader, optimizer_sgd, criterion,
    num_epochs=10, device=device, num_classes=num_classes, writer=writer
)


Training ResNet18 with SGD...
Epoch [1/10]
  Train Loss: 1.5923, Train Accuracy: 0.4212
  Validation Accuracy: 0.3764, Validation F1-Score: 0.3336
Epoch [2/10]
  Train Loss: 1.3292, Train Accuracy: 0.5292
  Validation Accuracy: 0.5028, Validation F1-Score: 0.4640
Epoch [3/10]
  Train Loss: 1.2135, Train Accuracy: 0.5514
  Validation Accuracy: 0.5154, Validation F1-Score: 0.4943
Epoch [4/10]
  Train Loss: 1.1254, Train Accuracy: 0.5803
  Validation Accuracy: 0.6011, Validation F1-Score: 0.5809
Epoch [5/10]
  Train Loss: 1.0264, Train Accuracy: 0.6302
  Validation Accuracy: 0.6067, Validation F1-Score: 0.5978
Epoch [6/10]
  Train Loss: 1.0034, Train Accuracy: 0.6290
  Validation Accuracy: 0.6503, Validation F1-Score: 0.6410
Epoch [7/10]
  Train Loss: 0.9552, Train Accuracy: 0.6584
  Validation Accuracy: 0.6306, Validation F1-Score: 0.6304
Epoch [8/10]
  Train Loss: 0.9118, Train Accuracy: 0.6651
  Validation Accuracy: 0.5688, Validation F1-Score: 0.5778
Epoch [9/10]
  Train Loss: 0.8640,

In [28]:
vgg16 = VGG16(num_classes=num_classes).to(device)
optimizer_adam = torch.optim.Adam(vgg16.parameters(), lr=0.001)

print("Training VGG16 with Adam...")
vgg_test_acc, vgg_test_f1, vgg_classwise_acc = train_and_evaluate_with_tensorboard(
    vgg16, train_loader, val_loader, test_loader, optimizer_adam, criterion,
    num_epochs=10, device=device, num_classes=num_classes, writer=writer
)


Training VGG16 with Adam...
Epoch [1/10]
  Train Loss: 6.5228, Train Accuracy: 0.2384
  Validation Accuracy: 0.3638, Validation F1-Score: 0.3189
Epoch [2/10]
  Train Loss: 2.1899, Train Accuracy: 0.3349
  Validation Accuracy: 0.3553, Validation F1-Score: 0.3289
Epoch [3/10]
  Train Loss: 2.1029, Train Accuracy: 0.3533
  Validation Accuracy: 0.3975, Validation F1-Score: 0.3341
Epoch [4/10]
  Train Loss: 2.0136, Train Accuracy: 0.3695
  Validation Accuracy: 0.4593, Validation F1-Score: 0.4253
Epoch [5/10]
  Train Loss: 1.9131, Train Accuracy: 0.3876
  Validation Accuracy: 0.4747, Validation F1-Score: 0.4555
Epoch [6/10]
  Train Loss: 1.8547, Train Accuracy: 0.3996
  Validation Accuracy: 0.4438, Validation F1-Score: 0.4199
Epoch [7/10]
  Train Loss: 1.8774, Train Accuracy: 0.3812
  Validation Accuracy: 0.4326, Validation F1-Score: 0.4009
Epoch [8/10]
  Train Loss: 1.7730, Train Accuracy: 0.4146
  Validation Accuracy: 0.4565, Validation F1-Score: 0.4346
Epoch [9/10]
  Train Loss: 1.7657, T