<a href="https://colab.research.google.com/github/felipecacique/Pytorch-Tutorial-Youtube/blob/main/Copy_of_Animal_Image_Dataset_(90_Different_Animals).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:

# IMPORTANT: RUN THIS CELL IN ORDER TO IMPORT YOUR KAGGLE DATA SOURCES
# TO THE CORRECT LOCATION (/kaggle/input) IN YOUR NOTEBOOK,
# THEN FEEL FREE TO DELETE THIS CELL.
# NOTE: THIS NOTEBOOK ENVIRONMENT DIFFERS FROM KAGGLE'S PYTHON
# ENVIRONMENT SO THERE MAY BE MISSING LIBRARIES USED BY YOUR
# NOTEBOOK.

import os
import sys
from tempfile import NamedTemporaryFile
from urllib.request import urlopen
from urllib.parse import unquote, urlparse
from urllib.error import HTTPError
from zipfile import ZipFile
import tarfile
import shutil

CHUNK_SIZE = 40960
DATA_SOURCE_MAPPING = 'animal-image-dataset-90-different-animals:https%3A%2F%2Fstorage.googleapis.com%2Fkaggle-data-sets%2F1554380%2F3952946%2Fbundle%2Farchive.zip%3FX-Goog-Algorithm%3DGOOG4-RSA-SHA256%26X-Goog-Credential%3Dgcp-kaggle-com%2540kaggle-161607.iam.gserviceaccount.com%252F20240430%252Fauto%252Fstorage%252Fgoog4_request%26X-Goog-Date%3D20240430T220144Z%26X-Goog-Expires%3D259200%26X-Goog-SignedHeaders%3Dhost%26X-Goog-Signature%3D1c18538cfaf7f49648831da23772732d12f2eb637af47eff3aadf483bd520f3c693653d34acb2e2379f256fe8ecb6a65aba83fc6c1152fa01d90112bdba9cb4ba58b92615573f0a8814bc6c9605f1ad19795ca53e3c25b81b985e31f09d9f758f3b6f664691807fb0c9147d5f233acfdb321d93eff4480755594728717c7ee5b19815e2e829987c3b98f48f94ee1cbfaf15be4c92e464ac706523ba0122687b149983140fe7760c60cf3d319eb810d7cb631cdecfc0b77c36960170590cf072c52c85a37e6ff20d4f3f953ee3181dbbebc82422aa4a9f7e506c0f372939fc3786726176c88a0bd9e29350b5960ca6ec2e2aa8585b5df924227fd7bb421eb0e2a'

KAGGLE_INPUT_PATH='/kaggle/input'
KAGGLE_WORKING_PATH='/kaggle/working'
KAGGLE_SYMLINK='kaggle'

!umount /kaggle/input/ 2> /dev/null
shutil.rmtree('/kaggle/input', ignore_errors=True)
os.makedirs(KAGGLE_INPUT_PATH, 0o777, exist_ok=True)
os.makedirs(KAGGLE_WORKING_PATH, 0o777, exist_ok=True)

try:
  os.symlink(KAGGLE_INPUT_PATH, os.path.join("..", 'input'), target_is_directory=True)
except FileExistsError:
  pass
try:
  os.symlink(KAGGLE_WORKING_PATH, os.path.join("..", 'working'), target_is_directory=True)
except FileExistsError:
  pass

for data_source_mapping in DATA_SOURCE_MAPPING.split(','):
    directory, download_url_encoded = data_source_mapping.split(':')
    download_url = unquote(download_url_encoded)
    filename = urlparse(download_url).path
    destination_path = os.path.join(KAGGLE_INPUT_PATH, directory)
    try:
        with urlopen(download_url) as fileres, NamedTemporaryFile() as tfile:
            total_length = fileres.headers['content-length']
            print(f'Downloading {directory}, {total_length} bytes compressed')
            dl = 0
            data = fileres.read(CHUNK_SIZE)
            while len(data) > 0:
                dl += len(data)
                tfile.write(data)
                done = int(50 * dl / int(total_length))
                sys.stdout.write(f"\r[{'=' * done}{' ' * (50-done)}] {dl} bytes downloaded")
                sys.stdout.flush()
                data = fileres.read(CHUNK_SIZE)
            if filename.endswith('.zip'):
              with ZipFile(tfile) as zfile:
                zfile.extractall(destination_path)
            else:
              with tarfile.open(tfile.name) as tarfile:
                tarfile.extractall(destination_path)
            print(f'\nDownloaded and uncompressed: {directory}')
    except HTTPError as e:
        print(f'Failed to load (likely expired) {download_url} to path {destination_path}')
        continue
    except OSError as e:
        print(f'Failed to load {download_url} to path {destination_path}')
        continue

print('Data source import complete.')


In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All"
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
import numpy as np
import pandas as pd
import os
import matplotlib.pyplot as plt
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from PIL import Image
from sklearn.metrics import classification_report, f1_score, confusion_matrix
import random

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision import datasets, transforms
from torchvision.utils import make_grid
%matplotlib inline

In [None]:
# Set the device
device = "cuda" if torch.cuda.is_available() else "cpu"

# Set the device globally
torch.set_default_device(device)

### Loading Dataset

In [None]:
# Loading Dataset
path = "/kaggle/input/animal-image-dataset-90-different-animals/animals/animals"

data = {"imgpath": [], "labels": []}

category = os.listdir(path)
for folder in category:
  folderpath = os.path.join(path, folder)
  filelist = os.listdir(folderpath)
  for file in filelist:
    fpath = os.path.join(folderpath, file)
    data["imgpath"].append(fpath)
    data["labels"].append(folder)

df = pd.DataFrame(data)


In [None]:
# from sklearn.utils import shuffle
# df = shuffle(df)
# df = df[:300]
# df

In [None]:
# Convert labels to numbers
lb = LabelEncoder()
df["encoded_labels"] = lb.fit_transform(df['labels'])
num_classes = len(df["encoded_labels"].unique())
num_classes

# # decode
# df["decoded_labels"] = lb.inverse_transform(df['encoded_labels'])
# df

### Show sample from data


In [None]:
plt.figure(figsize=(15,12))
for i, row in df.sample(n=16).reset_index().iterrows():
  plt.subplot(4, 4, i+1)
  image_path = row["imgpath"]
  image = Image.open(image_path)
  plt.imshow(image)
  plt.title(row["labels"])
  plt.axis("off")
plt.show()

### Split the dataset into train and test

In [None]:
# Split into train, valid and test
train_df, temp_df = train_test_split(df, train_size=0.70, shuffle=True, random_state=42)
valid_df, test_df = train_test_split(temp_df, train_size=0.70, shuffle=True, random_state=42)
train_df = train_df.reset_index(drop=True)
valid_df = valid_df.reset_index(drop=True)
test_df = test_df.reset_index(drop=True)
train_df

### Image augmentation


In [None]:
# Define data augmentation transformations
augmentation_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomResizedCrop(224, scale=(0.8, 1.0)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),
    transforms.RandomRotation(20),
    transforms.RandomAffine(0, shear=10, scale=(0.8, 1.2)),
    transforms.RandomPerspective(distortion_scale=0.2, p=0.5, interpolation=3),
    transforms.ToTensor(),
])

# augmentation_transform = transforms.Compose([
#     transforms.Resize((224, 224)),
#     transforms.RandomHorizontalFlip(),
#     transforms.RandomVerticalFlip(),
#     transforms.RandomRotation(15),
#     transforms.ColorJitter(brightness=0.15, contrast=0.15, saturation=0.15, hue=0.15),
#     transforms.RandomResizedCrop(224, scale=(0.8, 1.0)),
#     transforms.ToTensor(),
# ])


data_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

### Create the data loaders

In [None]:
# Define the dataset class - encapsulate in a class allows for memory data efficiency during training
class AnimalDataset(Dataset):
  def __init__(self, df, transform=None):
    self.df = df
    self.transform = transform

  def __len__(self):
    return len(self.df)

  def __getitem__(self, idx):
    img_path = self.df.iloc[idx]['imgpath']
    label = self.df.iloc[idx]['encoded_labels']
    img = Image.open(img_path)
    if self.transform:
      img = self.transform(img)
    label = torch.tensor(label)

    # # Plot the image
    # plt.figure(figsize=(4, 4))
    # plt.imshow(img.permute(1, 2, 0))  # Assuming images are in format (C, H, W)
    # plt.title(f"Label: {label.item()}")
    # plt.axis('off')
    # plt.show()

    return img, label

#     def shuffle(self):
#             random.shuffle(self.indices)

In [None]:
# Create custom datasets with data augmentation
train_dataset = AnimalDataset(train_df, transform = augmentation_transform)
valid_dataset = AnimalDataset(valid_df, transform = data_transform)
test_dataset = AnimalDataset(test_df, transform = data_transform)

In [None]:
# Create DataLoader instances for training and test
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False)
valid_loader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

### Model Structure

In [None]:
# Model Class
class ConvolutionalNetwork(nn.Module):
  def __init__(self):
    super().__init__()
    self.conv1 = nn.Conv2d(3, 64, kernel_size=3, padding=1)
    self.conv2 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
    self.conv3 = nn.Conv2d(128, 256, kernel_size=3, padding=1)
    # Fully connected layer
    self.fc1 = nn.Linear(256 * 28 * 28, 512)
    self.fc2 = nn.Linear(512, 256)
    self.fc3 = nn.Linear(256, num_classes)

  def forward(self, X):
    X = F.relu(self.conv1(X))
    X = F.max_pool2d(X,2,2) # 2x2 kernel and stride 2
    # Second Pass
    X = F.relu(self.conv2(X))
    X = F.max_pool2d(X,2,2) # 2x2 kernel and stride 2
    # Third Pass
    X = F.relu(self.conv3(X))
    X = F.max_pool2d(X,2,2) # 2x2 kernel and stride 2

    # Re-View to flatten it out
    X = X.view(-1, 256 * 28 * 28)

    # Fully connected layers
    X = F.relu(self.fc1(X))
    X = F.relu(self.fc2(X))
    X = self.fc3(X)
    return F.log_softmax(X, dim=1)


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import models, transforms

class ConvolutionalNetwork(nn.Module):
    def __init__(self):
        super(ConvolutionalNetwork, self).__init__()
        # Load the pretrained model
        self.pretrained_model = models.efficientnet_b3(weights=models.EfficientNet_B3_Weights.IMAGENET1K_V1)
        # Freezing the layers of the pretrained model
        for param in self.pretrained_model.parameters():
            param.requires_grad = False
        # Modify the final layers
        self.pretrained_model._fc = nn.Identity()  # Remove the final fully connected layer

        # # Data Augmentation Layers
        # self.augment = nn.Sequential(
        #     transforms.RandomHorizontalFlip(),
        #     transforms.RandomVerticalFlip(),
        #     transforms.RandomRotation(15),
        #     transforms.ColorJitter(brightness=0.15, contrast=0.15, saturation=0.15, hue=0.15),
        #     transforms.RandomResizedCrop(224, scale=(0.8, 1.0))
        # )

        # New classifier layers
        self.fc1 = nn.Linear(1000, 256)  # Adjusted input size to match the output size of the backbone
        self.relu = nn.ReLU()
        self.batch_norm = nn.BatchNorm1d(256)  # Adjusted batch norm size
        self.dropout = nn.Dropout(0.45)
        self.fc2 = nn.Linear(256, num_classes)  # Adjusted output size to match the number of classes

    def forward(self, x):
        # x = self.augment(x)
        x = self.pretrained_model(x)
        x = self.fc1(x)
        x = self.relu(x)
        x = self.batch_norm(x)
        x = self.dropout(x)
        x = self.fc2(x)
        return x

In [None]:
# Create an instance of our model
torch.manual_seed(42)
model = ConvolutionalNetwork().to(device)
# model

### Training the model

In [None]:
# Loss function optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [None]:
import time
start_time = time.time()

# Create Variable to track things
epochs = 5
train_losses = []
valid_losses = []
test_losses = []
train_correct = []
valid_correct = []
test_correct = []

# For Loop Epochs
for i in range(epochs):
  trn_corr = 0
  vld_corr = 0
  tst_corr = 0

  # Train
  for b, (X_train, y_train) in enumerate(train_loader):
    b += 1 # start out batches at 1
    X_train, y_train = X_train.to(device), y_train.to(device)
    y_pred = model.forward(X_train)
    loss = criterion(y_pred, y_train)

    predicted = torch.max(y_pred.data, 1)[1]
    batch_corr = (predicted == y_train).sum()
    trn_corr += batch_corr

    # Update our parameters
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    train_losses.append(loss)
    # train_correct.append(trn_corr)
    train_correct.append(batch_corr/batch_size)


    # Test/Validation
    with torch.no_grad():
      for b_valid, (X_valid, y_valid) in enumerate(valid_loader):
        X_valid, y_valid = X_valid.to(device), y_valid.to(device)
        y_val = model.forward(X_valid)
        predicted = torch.max(y_val.data, 1)[1]
        # vld_corr += (predicted == y_valid).sum()
        vld_corr = (predicted == y_valid).sum()
        break

      loss_valid = criterion(y_val, y_valid)
      valid_losses.append(loss_valid)
      valid_correct.append(vld_corr/batch_size)

    # Print out some results
    if b%10==0:
      print(f'Epoch: {i} Batch: {b} Loss: {loss.item()} Corr: {batch_corr/batch_size} | LossValid: {loss_valid.item()} CorrValid: {vld_corr/batch_size}')

print(f'Trainig took: {(time.time() - start_time)/60} minutes')

In [None]:
# Graph the loss at epoch
train_losses = [tl.item() for tl in train_losses]
valid_losses = [vl.item() for vl in valid_losses]  # Move to CPU
plt.plot(train_losses, label="Training loss")
plt.plot(valid_losses, label="Validation loss")
plt.title("Loss at Epoch")
plt.legend()

In [None]:
# graph the accuracy at the end of each epoch
train_correct = [tl.item() for tl in train_correct]
valid_correct = [vl.item() for vl in valid_correct]  # Move to CPU
plt.plot(train_correct, label="Training Accuracy")
plt.plot(valid_correct, label="Validation Accuracy")
plt.title("Accuracy at the end of each Epoch")
plt.legend()


In [None]:
# Graph the loss at epoch
fig, axs = plt.subplots(1, 2, figsize=(15, 5))

# Plot Loss
axs[0].plot(train_losses, label="Training loss")
axs[0].plot(valid_losses, label="Validation loss")
axs[0].set_title("Loss at Epoch")
axs[0].legend()

# Plot Accuracy
axs[1].plot(train_correct, label="Training Accuracy")
axs[1].plot(valid_correct, label="Validation Accuracy")
axs[1].set_title("Accuracy at the end of each Epoch")
axs[1].legend()

plt.tight_layout()
plt.show()

### Model Evaluate

In [None]:
valid_predictions = []
correct = 0
# test_load_all =  DataLoader(valid_dataset, batch_size=100000, shuffle=False)
with torch.no_grad():
  for X_test, y_test in valid_loader:
    X_test, y_test = X_test.to(device), y_test.to(device)
    y_val = model.forward(X_test)
    predicted = torch.max(y_val, 1)[1]
    correct += (predicted == y_test).sum()
    valid_predictions.append(predicted)

# Correct
test_acc = correct.item() / len(valid_dataset)
print(f"Test accuracy: {test_acc}")

Get Predictions

In [None]:
# Concatenate the tensors
valid_predictions = torch.cat(valid_predictions, dim=0)
print(len(valid_df), len(valid_predictions))

In [None]:
predictions = valid_df
predictions['prediction'] = valid_predictions.to('cpu')
predictions['prediction_labels'] = lb.inverse_transform(predictions['prediction'])
predictions

In [None]:
# Plot some predictions
plt.figure(figsize=(15,12))
for i, row in predictions.head(20).reset_index().iterrows():
  plt.subplot(5,4,i+1)
  image = Image.open(row['imgpath'])
  plt.imshow(image)
  plt.title(f'label: {row["labels"]} | pred: {row["prediction_labels"]}', fontsize=8)
  plt.axis("off")
plt.show()
