In [4]:
import torch
from torch import nn

In [5]:
import zipfile
zip_path = "data/fruits9.zip"

extracted_to = "data/fruits9"

with zipfile.ZipFile(zip_path, "r") as f:
    f.extractall(extracted_to)

print(f"Zip file extracted successfully. path: {extracted_to}")

Zip file extracted successfully. path: data/fruits9


In [6]:
device = "cuda" if torch.cuda.is_available() else "cpu"
device

'cpu'

In [7]:
import os, shutil
from sklearn.model_selection import train_test_split

data_dir = "data/fruits9/images"
output_dir = "data/split"

categories = os.listdir(data_dir)

for category in categories:
    img_dir = os.path.join(data_dir, category)
    images = os.listdir(img_dir)

    train_imgs, test_imgs = train_test_split(images, test_size=0.2, random_state=42)

    for phase in ['train', 'test']:
        os.makedirs(os.path.join(output_dir, phase, category), exist_ok=True)

    for img in train_imgs:
        shutil.copy(os.path.join(img_dir, img), os.path.join(output_dir, 'train', category, img))

    for img in test_imgs:
        shutil.copy(os.path.join(img_dir, img), os.path.join(output_dir, 'test', category, img))


In [8]:
from pathlib import Path

data_path = Path("data/split")

train_dir = data_path / "train"
test_dir = data_path / "test"

train_dir, test_dir

(PosixPath('data/split/train'), PosixPath('data/split/test'))

In [9]:
from torchvision import datasets, transforms
from torch.utils.data import DataLoader

train_transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.RandomCrop((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(20),
    transforms.ColorJitter(brightness=0.2, contrast=0.2),
    transforms.ToTensor(), 
    transforms.Normalize([0.5]*3, [0.5]*3) 
])

test_transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.CenterCrop((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.5]*3, [0.5]*3)
])


In [10]:
train_data = datasets.ImageFolder(root=train_dir, transform=train_transform)
test_data = datasets.ImageFolder(root=test_dir, transform=test_transform)

In [11]:
train_dataloader = DataLoader(train_data,
                              batch_size=8,
                              num_workers=2,
                              shuffle=True,
                              pin_memory=False,
                              persistent_workers=False)

test_dataloader = DataLoader(test_data,
                             batch_size=8,
                             shuffle=False,
                             num_workers=2,
                             pin_memory=False,
                             persistent_workers=False)

In [12]:
print(f"Train data size: {len(train_data)}")
print(f"Test data size: {len(test_data)}")

Train data size: 287
Test data size: 72


In [13]:
class Food9(nn.Module):
    def __init__(self, num_classes=9, in_channels=3):
        super().__init__()

        self.conv_block_1 = nn.Sequential(
            nn.Conv2d(in_channels=in_channels, out_channels=32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.Conv2d(32, 32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )

        self.conv_block_2 = nn.Sequential(
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )

        self.conv_block_3 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.Conv2d(128, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )

        self.fc = nn.Sequential(
            nn.Flatten(),
            nn.Linear(128*28*28, 256),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(256, num_classes)
        )

    def forward(self, x):
        x = self.conv_block_1(x)
        x = self.conv_block_2(x)
        x = self.conv_block_3(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

### resnet18

In [14]:
import torchvision.models as models

model = models.resnet18(pretrained=True)

for param in model.parameters():
    param.requires_grad = False

model.fc = nn.Sequential(
    nn.Linear(model.fc.in_features, 128),
    nn.ReLU(),
    nn.Dropout(0.5),
    nn.Linear(128, 9)
)



### vgg16

In [15]:
vgg16 = models.vgg16(pretrained=True)
for param in vgg16.parameters():
    param.requires_grad = False

vgg16.classifier[6] = nn.Linear(in_features=4096, out_features=9)
vgg16.to(device)



VGG(
  (features): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace=True)
    (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU(inplace=True)
    (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (5): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (6): ReLU(inplace=True)
    (7): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (8): ReLU(inplace=True)
    (9): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (10): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (11): ReLU(inplace=True)
    (12): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (13): ReLU(inplace=True)
    (14): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (15): ReLU(inplace=True)
    (16): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1

In [16]:
torch.manual_seed(42)
model_0 = Food9().to(device)
model_0

Food9(
  (conv_block_1): Sequential(
    (0): Conv2d(3, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (4): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (5): ReLU()
    (6): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (conv_block_2): Sequential(
    (0): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (4): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (5): ReLU()
    (6): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (conv_block_3): Sequential(
    (0):

In [17]:
def train_step(model: torch.nn.Module,
               dataloader: torch.utils.data.DataLoader,
               loss_fn: torch.nn.Module,
               optimizer: torch.optim.Optimizer,
               device: torch.device):
    model.train()
    running_loss, running_correct = 0.0, 0
    total = 0

    for X, y in dataloader:
        X, y = X.to(device), y.to(device)

        optimizer.zero_grad()
        y_pred = model(X)
        loss = loss_fn(y_pred, y)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * X.size(0)
        preds = torch.argmax(y_pred, dim=1)
        running_correct += (preds == y).sum().item()
        total += y.size(0)

    avg_loss = running_loss / total
    avg_acc = running_correct / total
    return avg_loss, avg_acc

In [18]:
def test_step(model: torch.nn.Module,
              dataloader: torch.utils.data.DataLoader,
              loss_fn: torch.nn.Module,
              device: torch.device):
    
    model.eval()
    running_loss, running_correct, total = 0.0, 0, 0.0

    with torch.inference_mode():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)

            y_pred = model(X)
            loss = loss_fn(y_pred, y)

            running_loss += loss.item() * X.size(0)
            preds = y_pred.argmax(dim=1)
            running_correct += (preds == y).sum().item()
            total += y.size(0)

        avg_loss = running_loss / total
        avg_acc = running_correct / total
        return avg_acc, avg_loss

In [19]:
def save_checkpoint(model, optimizer, epoch, train_loss, train_acc, test_loss, test_acc, checkpoint_filename="checkpoint.pth"):
    checkpoint = {
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'train_loss': train_loss,
        'train_acc': train_acc,
        'test_loss': test_loss,
        'test_acc': test_acc,
    }
    torch.save(checkpoint, checkpoint_filename)
    print(f"Checkpoint saved at epoch {epoch}")

In [20]:
def load_checkpoint(model, optimizer, checkpoint_filename, device):
    if os.path.exists(checkpoint_filename):
        print(f"Loading checkpoint from {checkpoint_filename}")
        checkpoint = torch.load(checkpoint_filename, map_location=device)

        # Load model state
        model.load_state_dict(checkpoint['model_state_dict'])

        # Load optimizer state
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

        # Load epoch and loss values if you want to resume from that epoch
        epoch = checkpoint['epoch']
        train_loss = checkpoint['train_loss']
        train_acc = checkpoint['train_acc']
        test_loss = checkpoint['test_loss']
        test_acc = checkpoint['test_acc']

        print(f"Checkpoint loaded. Resuming from epoch {epoch}.")
        return epoch, train_loss, train_acc, test_loss, test_acc
    else:
        print("No checkpoint found. Starting training from scratch.")
        return 0, 0, 0, 0, 0

In [21]:
from tqdm.auto import tqdm

def train(model, train_dataloader, test_dataloader, optimizer, loss_fn, epochs, checkpoint_filename, device):
    model.to(device)

    results = {
        "train_loss": [],
        "train_acc": [],
        "test_loss": [],
        "test_acc": []
    }

    epoch, train_loss, train_acc, test_loss, test_acc = load_checkpoint(model, optimizer, checkpoint_filename, device)
    print(f"Previous checkpoint data -> epoch: {epoch} | train_loss: {train_loss:.4f} | train_acc: {train_acc*100:.4f} | test_loss: {test_loss:.4f} | test_acc: {test_acc*100:.4f}")
    for epoch in tqdm(range(epoch, epochs)):
        train_loss, train_acc = train_step(model, train_dataloader, loss_fn, optimizer, device)

        test_acc, test_loss = test_step(model, test_dataloader, loss_fn, device)

        print(f"Epoch {epoch+1}/{epochs} | "
              f"TRain loss: {train_loss:.4f}, train acc: {train_acc*100:.2f} | "
              f"test loss: {test_loss:.4f}, test acc: {test_acc*100:.2f}")
        
        results["train_loss"].append(train_loss)
        results["train_acc"].append(train_acc)
        results["test_loss"].append(test_loss)
        results["test_acc"].append(test_acc)

        # save_checkpoint(model, optimizer, epoch+1, train_loss, train_acc, test_loss, test_acc, checkpoint_filename)
        save_checkpoint(epoch=epoch,model=model, optimizer=optimizer, train_acc=train_acc, train_loss=train_loss, test_acc=test_acc, test_loss=test_loss, checkpoint_filename=checkpoint_filename)
    return results

In [25]:
checkpoint_filename = "checkpoint/model_0.pth"
# model = Food9().to(device)
optimizer = torch.optim.Adam(params=vgg16.parameters(), lr=0.001)

In [27]:
results = train(vgg16, train_dataloader, test_dataloader, optimizer, loss_fn=nn.CrossEntropyLoss(), epochs=20, checkpoint_filename=checkpoint_filename, device=device)

No checkpoint found. Starting training from scratch.
Previous checkpoint data -> epoch: 0 | train_loss: 0.0000 | train_acc: 0.0000 | test_loss: 0.0000 | test_acc: 0.0000


  0%|          | 0/20 [00:00<?, ?it/s]

Epoch 1/20 | TRain loss: 1.3822, train acc: 57.49 | test loss: 0.5382, test acc: 77.78
Checkpoint saved at epoch 0
Epoch 2/20 | TRain loss: 0.4337, train acc: 86.76 | test loss: 0.4778, test acc: 83.33
Checkpoint saved at epoch 1
Epoch 3/20 | TRain loss: 0.3472, train acc: 89.90 | test loss: 0.4019, test acc: 86.11
Checkpoint saved at epoch 2
Epoch 4/20 | TRain loss: 0.2062, train acc: 94.77 | test loss: 0.4002, test acc: 84.72
Checkpoint saved at epoch 3
Epoch 5/20 | TRain loss: 0.2296, train acc: 94.08 | test loss: 0.3945, test acc: 86.11
Checkpoint saved at epoch 4
Epoch 6/20 | TRain loss: 0.1921, train acc: 94.43 | test loss: 0.3863, test acc: 86.11
Checkpoint saved at epoch 5


KeyboardInterrupt: 

### testing on real data

In [59]:
from PIL import Image

img_path = "/home/chetan/Desktop/ai_learning/deeplearning/foodcnn1/image copy 3.png"

img = Image.open(img_path).convert('RGB')
transform = test_transform
img_tensor = transform(img).unsqueeze(dim=0).to(device)


In [60]:
# loading model
checkpoint = torch.load("checkpoint/model_0.pth")
vgg16.load_state_dict(checkpoint['model_state_dict'])
print("Model loaded successfully!")

Model loaded successfully!


In [61]:
vgg16.eval()
with torch.inference_mode():
    output = vgg16(img_tensor)
    pred_class = torch.argmax(output, dim=1).item()

class_names = train_data.classes
print("Predicted class: ", class_names[pred_class])

Predicted class:  grapes fruit


In [48]:
class_names = train_data.classes
class_names

['apple fruit',
 'banana fruit',
 'cherry fruit',
 'chickoo fruit',
 'grapes fruit',
 'kiwi fruit',
 'mango fruit',
 'orange fruit',
 'strawberry fruit']

In [49]:
pred_class

8