In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
from tqdm import tqdm
from PIL import Image

In [2]:

# Step 1: Data Augmentation and Loading
transform = transforms.Compose([
    transforms.Resize((128, 128)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]),
])

dataset_dir = 'dataset'  # Replace with your dataset directory
full_dataset = datasets.ImageFolder(root=dataset_dir, transform=transform)

# Split dataset into training (80%) and testing (20%)
train_size = int(0.8 * len(full_dataset))
test_size = len(full_dataset) - train_size
train_dataset, test_dataset = torch.utils.data.random_split(full_dataset, [train_size, test_size])

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=4)

In [3]:
class BasicBlock(nn.Module):
    expansion = 1  # This attribute helps in adjusting the number of output features

    def __init__(self, in_planes, planes, stride=1, downsample=None):
        super(BasicBlock, self).__init__()
        # First convolution
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        # Second convolution
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)
        # Shortcut connection
        self.downsample = downsample
        self.stride = stride

    def forward(self, x):
        identity = x  # Save the input value for the shortcut

        # Forward pass through the first conv layer
        out = self.conv1(x)
        out = self.bn1(out)
        out = nn.ReLU(inplace=True)(out)

        # Forward pass through the second conv layer
        out = self.conv2(out)
        out = self.bn2(out)

        # If downsample is provided, apply it to the shortcut path
        if self.downsample is not None:
            identity = self.downsample(x)

        # Add the shortcut to the output of the second conv layer
        out += identity
        out = nn.ReLU(inplace=True)(out)  # Apply ReLU after adding the shortcut

        return out

class DiffusionBlock(nn.Module):
    def __init__(self, channels):
        super(DiffusionBlock, self).__init__()
        self.noise_level = nn.Parameter(torch.rand(1))  # Learnable noise level parameter
        self.conv = nn.Conv2d(channels, channels, kernel_size=1, padding=0)  # 1x1 convolution to mix features

    def forward(self, x):
        noise = torch.randn_like(x) * self.noise_level
        return self.conv(x + noise)
    
class CustomResNetDiffusion(nn.Module):
    def __init__(self, block, layers, num_classes=1000):
        super(CustomResNetDiffusion, self).__init__()
        self.in_planes = 64
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        self.layer1 = self._make_layer(block, 64, layers[0], diffusion=True)
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2, diffusion=True)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2, diffusion=True)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2, diffusion=True)
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * block.expansion, num_classes)

    def _make_layer(self, block, planes, blocks, stride=1, diffusion=False):
        downsample = None
        layers = []
        if stride != 1 or self.in_planes != planes * block.expansion:
            downsample = nn.Sequential(
                nn.Conv2d(self.in_planes, planes * block.expansion, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(planes * block.expansion),
            )
        if diffusion:
            layers.append(DiffusionBlock(self.in_planes))  # Assuming DiffusionBlock is defined
        layers.append(block(self.in_planes, planes, stride, downsample))
        self.in_planes = planes * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.in_planes, planes))
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x



def model_diffusion(num_classes=1000):
    return CustomResNetDiffusion(BasicBlock, [3,6,6,3], num_classes)


# Instantiate the model
num_classes = len(full_dataset.classes)  # Number of classes in your dataset
model = model_diffusion(num_classes=num_classes)


In [4]:

# Step 3: Define Loss Function, Optimizer, and Learning Rate Scheduler
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)



In [5]:
import time

# Step 4: Training the Model
num_epochs = 20

# Record the overall start time
overall_start_time = time.time()

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    
    # Record the start time of the epoch
    epoch_start_time = time.time()

    for images, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}"):
        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    # Learning rate scheduler step
    scheduler.step()

    # Calculate and print the average loss per epoch
    average_loss = running_loss / len(train_loader)
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {average_loss:.4f}')

    # Record the end time of the epoch
    epoch_end_time = time.time()
    epoch_duration = epoch_end_time - epoch_start_time
    print(f'Epoch {epoch+1} completed in {epoch_duration:.2f} seconds')

# Record the overall end time
overall_end_time = time.time()
total_training_time = overall_end_time - overall_start_time

print(f"Training completed in {total_training_time:.2f} seconds")

# Save the trained model
model_path = 'custom_model_diffusion_copy.pth'
torch.save(model.state_dict(), model_path)
print(f"Model saved to {model_path}")


Epoch 1/20:   0%|          | 0/516 [00:00<?, ?it/s]

Epoch 1/20: 100%|██████████| 516/516 [16:01<00:00,  1.86s/it]


Epoch [1/20], Loss: 1.8115
Epoch 1 completed in 961.99 seconds


Epoch 2/20: 100%|██████████| 516/516 [16:06<00:00,  1.87s/it]


Epoch [2/20], Loss: 1.2639
Epoch 2 completed in 966.26 seconds


Epoch 3/20: 100%|██████████| 516/516 [16:06<00:00,  1.87s/it]


Epoch [3/20], Loss: 1.0252
Epoch 3 completed in 966.60 seconds


Epoch 4/20: 100%|██████████| 516/516 [15:20<00:00,  1.78s/it]


Epoch [4/20], Loss: 0.8863
Epoch 4 completed in 920.35 seconds


Epoch 5/20: 100%|██████████| 516/516 [15:00<00:00,  1.75s/it]


Epoch [5/20], Loss: 0.7875
Epoch 5 completed in 900.64 seconds


Epoch 6/20: 100%|██████████| 516/516 [14:30<00:00,  1.69s/it]


Epoch [6/20], Loss: 0.7157
Epoch 6 completed in 870.68 seconds


Epoch 7/20: 100%|██████████| 516/516 [14:27<00:00,  1.68s/it]


Epoch [7/20], Loss: 0.6499
Epoch 7 completed in 867.38 seconds


Epoch 8/20: 100%|██████████| 516/516 [14:28<00:00,  1.68s/it]


Epoch [8/20], Loss: 0.4283
Epoch 8 completed in 868.74 seconds


Epoch 9/20: 100%|██████████| 516/516 [15:01<00:00,  1.75s/it]


Epoch [9/20], Loss: 0.3714
Epoch 9 completed in 901.41 seconds


Epoch 10/20: 100%|██████████| 516/516 [19:09<00:00,  2.23s/it]


Epoch [10/20], Loss: 0.3412
Epoch 10 completed in 1149.95 seconds


Epoch 11/20: 100%|██████████| 516/516 [16:46<00:00,  1.95s/it]


Epoch [11/20], Loss: 0.3193
Epoch 11 completed in 1006.73 seconds


Epoch 12/20: 100%|██████████| 516/516 [15:43<00:00,  1.83s/it]


Epoch [12/20], Loss: 0.3044
Epoch 12 completed in 943.83 seconds


Epoch 13/20: 100%|██████████| 516/516 [15:42<00:00,  1.83s/it]


Epoch [13/20], Loss: 0.2803
Epoch 13 completed in 942.99 seconds


Epoch 14/20: 100%|██████████| 516/516 [15:10<00:00,  1.76s/it]


Epoch [14/20], Loss: 0.2682
Epoch 14 completed in 910.33 seconds


Epoch 15/20: 100%|██████████| 516/516 [22:03<00:00,  2.57s/it]


Epoch [15/20], Loss: 0.2327
Epoch 15 completed in 1323.65 seconds


Epoch 16/20: 100%|██████████| 516/516 [19:55<00:00,  2.32s/it]


Epoch [16/20], Loss: 0.2184
Epoch 16 completed in 1195.71 seconds


Epoch 17/20: 100%|██████████| 516/516 [15:28<00:00,  1.80s/it]


Epoch [17/20], Loss: 0.2180
Epoch 17 completed in 928.70 seconds


Epoch 18/20: 100%|██████████| 516/516 [15:31<00:00,  1.81s/it]


Epoch [18/20], Loss: 0.2063
Epoch 18 completed in 931.79 seconds


Epoch 19/20: 100%|██████████| 516/516 [15:28<00:00,  1.80s/it]


Epoch [19/20], Loss: 0.2041
Epoch 19 completed in 928.78 seconds


Epoch 20/20: 100%|██████████| 516/516 [15:32<00:00,  1.81s/it]


Epoch [20/20], Loss: 0.2065
Epoch 20 completed in 932.19 seconds
Training completed in 19418.74 seconds
Model saved to custom_model_diffusion_copy.pth


In [6]:
# Step 5: Evaluate the Model
model.eval()
correct = 0
total = 0

with torch.no_grad():
    for images, labels in tqdm(test_loader, desc="Evaluating"):
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

# Calculate accuracy
accuracy = 100 * correct / total
print(f'Accuracy on the test images: {accuracy:.2f}%')

Evaluating: 100%|██████████| 129/129 [01:58<00:00,  1.09it/s]

Accuracy on the test images: 91.72%





In [7]:
# Step 6: Use the Model to Predict a Single Image
def predict_image(image_path):
    image = Image.open(image_path).convert('RGB')  # Convert to RGB in case of transparency channel
    image = transform(image).unsqueeze(0)  # Apply transformations and add batch dimension

    model.eval()
    with torch.no_grad():
        outputs = model(image)
        _, predicted = torch.max(outputs.data, 1)
    
    class_index = predicted.item()
    class_label = full_dataset.classes[class_index]  # Map the index to the class label
    return class_label

# Example usage:
image_path = '1.png'  # Replace with the path to your image
predicted_label = predict_image(image_path)
print(f'The predicted class for the image is: {predicted_label}')


The predicted class for the image is: Tomato_Spider_mites_Two_spotted_spider_mite


In [8]:
import torch.nn as nn
from sklearn.metrics import f1_score
import torch.nn.functional as F

class BasicBlock(nn.Module):
    expansion = 1  # This attribute helps in adjusting the number of output features

    def __init__(self, in_planes, planes, stride=1, downsample=None):
        super(BasicBlock, self).__init__()
        # First convolution
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        # Second convolution
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)
        # Shortcut connection
        self.downsample = downsample
        self.stride = stride

    def forward(self, x):
        identity = x  # Save the input value for the shortcut

        # Forward pass through the first conv layer
        out = self.conv1(x)
        out = self.bn1(out)
        out = nn.ReLU(inplace=True)(out)

        # Forward pass through the second conv layer
        out = self.conv2(out)
        out = self.bn2(out)

        # If downsample is provided, apply it to the shortcut path
        if self.downsample is not None:
            identity = self.downsample(x)

        # Add the shortcut to the output of the second conv layer
        out += identity
        out = nn.ReLU(inplace=True)(out)  # Apply ReLU after adding the shortcut

        return out

class DiffusionBlock(nn.Module):
    def __init__(self, channels):
        super(DiffusionBlock, self).__init__()
        self.noise_level = nn.Parameter(torch.rand(1))  # Learnable noise level parameter
        self.conv = nn.Conv2d(channels, channels, kernel_size=1, padding=0)  # 1x1 convolution to mix features

    def forward(self, x):
        noise = torch.randn_like(x) * self.noise_level
        return self.conv(x + noise)
    
class CustomResNetDiffusion(nn.Module):
    def __init__(self, block, layers, num_classes=1000):
        super(CustomResNetDiffusion, self).__init__()
        self.in_planes = 64
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        self.layer1 = self._make_layer(block, 64, layers[0], diffusion=True)
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2, diffusion=True)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2, diffusion=True)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2, diffusion=True)
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * block.expansion, num_classes)

    def _make_layer(self, block, planes, blocks, stride=1, diffusion=False):
        downsample = None
        layers = []
        if stride != 1 or self.in_planes != planes * block.expansion:
            downsample = nn.Sequential(
                nn.Conv2d(self.in_planes, planes * block.expansion, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(planes * block.expansion),
            )
        if diffusion:
            layers.append(DiffusionBlock(self.in_planes))  # Assuming DiffusionBlock is defined
        layers.append(block(self.in_planes, planes, stride, downsample))
        self.in_planes = planes * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.in_planes, planes))
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x


def model_diffusion(num_classes=1000):
    return CustomResNetDiffusion(BasicBlock, [3,6,6,3], num_classes)

# Load the saved model
model.load_state_dict(torch.load('custom_model_diffusion_copy.pth'))
model.eval()  # Set the model to evaluation mode

# Define the test dataset and data loader
test_transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Adjust if your model uses a different input size
    transforms.ToTensor()
])


# Collect predictions and true labels
all_preds = []
all_labels = []

with torch.no_grad():
    for inputs, labels in test_loader:
        outputs = model(inputs)
        _, preds = torch.max(outputs, 1)  # Get predicted class
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

# Calculate the F1 score
f1 = f1_score(all_labels, all_preds, average='weighted')  # Choose 'macro', 'micro', or 'weighted' depending on your needs
print(f"F1 Score: {f1}")

  model.load_state_dict(torch.load('custom_model_diffusion_copy.pth'))


F1 Score: 0.9142904212803759
