In [14]:
import torch
from torch import nn

torch.__version__

'2.0.0+cu117'

In [15]:
from torchvision import transforms

batch_size = 32
img_size = 224

device = 'cuda'

train_transform = transforms.Compose([
    transforms.RandomRotation(10),
    transforms.RandomResizedCrop(img_size, scale=(0.8, 1.0)),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])

test_transform = transforms.Compose([
    # transforms.Resize(230),
    # transforms.CenterCrop(img_size),
    # transforms.ToTensor(),
    # transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    transforms.ToTensor()
    ])

In [16]:
from torchvision import datasets
from torch.utils.data import DataLoader

train_set = datasets.ImageFolder("./Dataset/train", transform=train_transform)
trainloader = DataLoader(train_set, batch_size=batch_size, shuffle=True, num_workers=2)

test_set = datasets.ImageFolder("./Dataset/test", transform=test_transform)
testloader = DataLoader(test_set, shuffle=True)

In [17]:
len(train_set), len(test_set)


(1310, 371)

In [18]:
feature, target = next(iter(trainloader))
feature.shape

torch.Size([32, 3, 224, 224])

In [19]:
from torchvision.models import mobilenet_v2

class CustomMobileNetv2(nn.Module):
  def __init__(self, output_size):
    super().__init__()
    self.mnet = mobilenet_v2(pretrained=True)
    self.freeze()

    self.mnet.classifier = nn.Sequential(
        nn.Linear(1280, output_size),
        nn.LogSoftmax(1)
    )

  def forward(self, x):
    return self.mnet(x)
  
  def freeze(self):
    for param in self.mnet.parameters():
      param.requires_grad = False

  def unfreeze(self):
    for param in self.mnet.parameters():
      param.requires_grad = True

In [20]:
model = CustomMobileNetv2(10).to(device)



In [21]:
loss_fn = nn.CrossEntropyLoss() # this is also called "criterion"/"cost function" in some places
optimizer = torch.optim.Adam(params=model.parameters(), lr=1e-5)

In [22]:
from timeit import default_timer as timer 
def print_train_time(start: float, end: float, device: torch.device = None):
    """Prints difference between start and end time.

    Args:
        start (float): Start time of computation (preferred in timeit format). 
        end (float): End time of computation.
        device ([type], optional): Device that compute is running on. Defaults to None.

    Returns:
        float: time between start and end in seconds (higher is longer).
    """
    total_time = end - start
    print(f"Train time on {device}: {total_time:.3f} seconds")
    return total_time

In [23]:
def accuracy_fn(y_true, y_pred):
    """Calculates accuracy between truth labels and predictions.
    Args:
        y_true (torch.Tensor): Truth labels for predictions.
        y_pred (torch.Tensor): Predictions to be compared to predictions.
    Returns:
        [torch.float]: Accuracy value between y_true and y_pred, e.g. 78.45
    """
    correct = torch.eq(y_true, y_pred).sum().item()
    acc = (correct / float(len(y_pred))) * 100
    return acc

In [24]:
# from tqdm.auto import tqdm
# from timeit import default_timer as timer 

# # Set the seed and start the timer
# torch.manual_seed(42)
# train_time_start_on_cpu = timer()

# # Set the number of epochs (we'll keep this small for faster training times)
# epochs = 25

# # Create training and testing loop
# for epoch in tqdm(range(epochs)):
#     print(f"Epoch: {epoch}\n-------")
#     ### Training
#     train_loss = 0
#     # Add a loop to loop through training batches
#     for batch, (X, y) in enumerate(trainloader):
#         model.train() 
#         # 1. Forward pass
#         y_pred = model(X.to(device))

#         # 2. Calculate loss (per batch)
#         loss = loss_fn(y_pred.to(device), y.to(device))
#         train_loss += loss # accumulatively add up the loss per epoch 

#         # 3. Optimizer zero grad
#         optimizer.zero_grad()

#         # 4. Loss backward
#         loss.backward()

#         # 5. Optimizer step
#         optimizer.step()

#         # Print out how many samples have been seen
#         if batch % 400 == 0:
#             print(f"Looked at {batch * len(X)}/{len(trainloader.dataset)} samples")

#     # Divide total train loss by length of train dataloader (average loss per batch per epoch)
#     train_loss /= len(trainloader)
    
#     ### Testing
#     # Setup variables for accumulatively adding up loss and accuracy 
#     test_loss, test_acc = 0, 0 
#     model.eval()
#     with torch.inference_mode():
#         for X, y in testloader:
#             # 1. Forward pass
#             test_pred = model(X.to(device))
           
#             # 2. Calculate loss (accumatively)
#             test_loss += loss_fn(test_pred.to(device), y.to(device)) # accumulatively add up the loss per epoch

#             # 3. Calculate accuracy (preds need to be same as y_true)
#             test_acc += accuracy_fn(y_true=y.to(device), y_pred=test_pred.argmax(dim=1).to(device))
        
#         # Calculations on test metrics need to happen inside torch.inference_mode()
#         # Divide total test loss by length of test dataloader (per batch)
#         test_loss /= len(testloader)

#         # Divide total accuracy by length of test dataloader (per batch)
#         test_acc /= len(testloader)

#     ## Print out what's happening
#     print(f"\nTrain loss: {train_loss:.5f} | Test loss: {test_loss:.5f}, Test acc: {test_acc:.2f}%\n")

# # Calculate training time      
# train_time_end_on_cpu = timer()
# total_train_time_model_0 = print_train_time(start=train_time_start_on_cpu, 
#                                            end=train_time_end_on_cpu,
#                                            device=str(next(model.parameters()).device))

# def train_step(model: torch.nn.Module,
#                data_loader: torch.utils.data.DataLoader,
#                loss_fn: torch.nn.Module,
#                optimizer: torch.optim.Optimizer,
#                accuracy_fn,
#                device: torch.device = device):
#     train_loss, train_acc = 0, 0
#     for batch, (X, y) in enumerate(data_loader):
#         # Send data to GPU
#         X, y = X.to(device), y.to(device)

#         # 1. Forward pass
#         y_pred = model(X)

#         # 2. Calculate loss
#         loss = loss_fn(y_pred, y)
#         train_loss += loss
#         train_acc += accuracy_fn(y_true=y,
#                                  y_pred=y_pred.argmax(dim=1)) # Go from logits -> pred labels

#         # 3. Optimizer zero grad
#         optimizer.zero_grad()

#         # 4. Loss backward
#         loss.backward()

#         # 5. Optimizer step
#         optimizer.step()

#     # Calculate loss and accuracy per epoch and print out what's happening
#     train_loss /= len(data_loader)
#     train_acc /= len(data_loader)
#     print(f"Train loss: {train_loss:.5f} | Train accuracy: {train_acc:.2f}%")

# def test_step(data_loader: torch.utils.data.DataLoader,
#               model: torch.nn.Module,
#               loss_fn: torch.nn.Module,
#               accuracy_fn,
#               device: torch.device = device):
#     test_loss, test_acc = 0, 0
#     model.eval() # put model in eval mode
#     # Turn on inference context manager
#     with torch.inference_mode(): 
#         for X, y in data_loader:
#             # Send data to GPU
#             X, y = X.to(device), y.to(device)
            
#             # 1. Forward pass
#             test_pred = model(X)
            
#             # 2. Calculate loss and accuracy
#             test_loss += loss_fn(test_pred, y)
#             test_acc += accuracy_fn(y_true=y,
#                 y_pred=test_pred.argmax(dim=1) # Go from logits -> pred labels
#             )
        
#         # Adjust metrics and print out
#         test_loss /= len(data_loader)
#         test_acc /= len(data_loader)
#         print(f"Test loss: {test_loss:.5f} | Test accuracy: {test_acc:.2f}%\n")

from tqdm.auto import tqdm
from timeit import default_timer as timer 
import gc
# Set the seed and start the timer
torch.manual_seed(42)
train_time_start_on_cpu = timer()

# Set the number of epochs (we'll keep this small for faster training times)
epochs = 5

# Create training and testing loop
for epoch in tqdm(range(epochs)):
    print(f"Epoch: {epoch}\n-------")
    ### Training
    train_loss = 0
    # Add a loop to loop through training batches
    for batch, (X, y) in enumerate(trainloader):
        model.train() 
        # 1. Forward pass
        y_pred = model(X.to(device))

        # 2. Calculate loss (per batch)
        loss = loss_fn(y_pred.to(device), y.to(device))
        train_loss += loss # accumulatively add up the loss per epoch 

        # 3. Optimizer zero grad
        optimizer.zero_grad()

        # 4. Loss backward
        loss.backward()

        # 5. Optimizer step
        optimizer.step()

        # Print out how many samples have been seen
        if batch % 400 == 0:
            print(f"Looked at {batch * len(X)}/{len(trainloader.dataset)} samples")

    # Divide total train loss by length of train dataloader (average loss per batch per epoch)
    train_loss /= len(trainloader)
    
    ### Testing
    # Setup variables for accumulatively adding up loss and accuracy 
    test_loss, test_acc = 0, 0 
    model.eval()
    with torch.inference_mode():
        for X, y in testloader:
            # 1. Forward pass
            test_pred = model(X.to(device))
           
            # 2. Calculate loss (accumatively)
            test_loss += loss_fn(test_pred.to(device), y.to(device)) # accumulatively add up the loss per epoch

            # 3. Calculate accuracy (preds need to be same as y_true)
            test_acc += accuracy_fn(y_true=y.to(device), y_pred=test_pred.argmax(dim=1).to(device))
        
        # Calculations on test metrics need to happen inside torch.inference_mode()
        # Divide total test loss by length of test dataloader (per batch)
        test_loss /= len(testloader)

        # Divide total accuracy by length of test dataloader (per batch)
        test_acc /= len(testloader)
        torch.cuda.empty_cache()
        gc.collect()
    ## Print out what's happening
    print(f"\nTrain loss: {train_loss:.5f} | Test loss: {test_loss:.5f}, Test acc: {test_acc:.2f}%\n")
# Calculate training time      
train_time_end_on_cpu = timer()
total_train_time_model_0 = print_train_time(start=train_time_start_on_cpu, 
                                           end=train_time_end_on_cpu,
                                           device=str(next(model.parameters()).device))

  0%|          | 0/5 [00:00<?, ?it/s]

Epoch: 0
-------
Looked at 0/1310 samples

Train loss: 2.48058 | Test loss: 2.32051, Test acc: 5.12%

Epoch: 1
-------
Looked at 0/1310 samples

Train loss: 2.22037 | Test loss: 2.08153, Test acc: 20.75%

Epoch: 2
-------
Looked at 0/1310 samples

Train loss: 1.98478 | Test loss: 1.89615, Test acc: 56.87%

Epoch: 3
-------
Looked at 0/1310 samples

Train loss: 1.80080 | Test loss: 1.72675, Test acc: 63.61%

Epoch: 4
-------
Looked at 0/1310 samples

Train loss: 1.65576 | Test loss: 1.62433, Test acc: 63.61%

Train time on cuda:0: 116.682 seconds


In [25]:
# import tqdm
# torch.manual_seed(42)

# # Measure time
# from timeit import default_timer as timer
# train_time_start_on_gpu = timer()

# epochs = 3
# for epoch in range(epochs):
#     print(f"Epoch: {epoch}\n---------")
#     train_step(data_loader=trainloader, 
#         model=model, 
#         loss_fn=loss_fn,
#         optimizer=optimizer,
#         accuracy_fn=accuracy_fn
#     )
#     test_step(data_loader=testloader,
#         model=model,
#         loss_fn=loss_fn,
#         accuracy_fn=accuracy_fn
#     )

# train_time_end_on_gpu = timer()
# total_train_time_model_1 = print_train_time(start=train_time_start_on_gpu,
#                                             end=train_time_end_on_gpu,
#                                             device=device)