<a href="https://colab.research.google.com/github/Miyazono45/learn-pytorch/blob/main/intro_to_torch.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Simple NN (We'll use Iris Dataset)

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

import pandas as pd
import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split

# ct = torch.randn(2,3,4)
# print(ct)

# cr = torch.randn(2,2,15)
# print(cr)
# cr = cr.reshape(-1,3)
# print(cr)
# crx = cr[:,-1:]
# print(crx)



In [None]:
class Model(nn.Module):
  def __init__(self, input_feature = 3, fh1 = 6, fh2 = 6, output_feature = 3):
    super().__init__()
    self.fc1 = nn.Linear(input_feature, fh1)
    self.fc2 = nn.Linear(fh1, fh2)
    self.out = nn.Linear(fh2, output_feature)

  # to keep NN forwarding
  def forward(self, x):
    y = F.relu(self.fc1(x))
    y = F.relu(self.fc2(y)) # Fixed: Changed x to y
    y = self.out(y) # Fixed: Changed x to y

    return y

# Randomize
torch.manual_seed(22)
model = Model()

In [None]:
# Dataset for train
url_dataset = "/content/dataset/flower_dataset.csv"
df = pd.read_csv(url_dataset)

# Remap the species, size, fragrance value into float
df['fragrance'] = df['fragrance'].apply(lambda species: float(1.0) if species == "strong" else float(0.5) if species == "mild" else float(0.0))
df['size'] = df['size'].apply(lambda size: float(1.0) if size == "large" else float(0.5) if size == "medium" else float(0.0))
df['species'] = df['species'].apply(lambda species: float(1.0) if species == "rose" else float(0.5) if species == "shoeblack plant" else float(0.0))

train_data = df.drop('species', axis=1)
test_data = df['species']

train_data_value = train_data.values
test_data_value = test_data.values

In [None]:
# Splitting train and test data
x_train, x_test, y_train, y_test = train_test_split(train_data_value, test_data_value, test_size=0.25, random_state=22)

# Convert train and test data into tensor
x_train = torch.FloatTensor(x_train)
x_test = torch.FloatTensor(x_test)

y_train = torch.LongTensor(y_train)
y_test = torch.LongTensor(y_test)

In [None]:
# Set criterion (measure the error), how far off the predictions are from the data
criterion = nn.CrossEntropyLoss()
# Using Adam Optimizer, lr = learning rate (if error doesn't go down after iteration, then lower the LR)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

In [None]:
#  Train Model
epochs = 500
losses = []

for i in range(epochs):
  # Forward and get prediction
  test_pred = model.forward(x_train)
  # Measure loss
  loss = criterion(test_pred, y_train)
  # Track loss
  losses.append(loss.detach().numpy())

  # Print epoch
  if i % 10 == 0:
    print(f'Epoch {i} with loss {loss}')

  # Back propragation
  optimizer.zero_grad()
  loss.backward()
  optimizer.step()

In [None]:
# Graph the losses
plt.plot(range(epochs), losses)
plt.ylabel("Loss")
plt.xlabel("Epoch")

In [None]:
# Evaluate Model
with torch.no_grad():
  y_eval = model.forward(x_test)
  loss = criterion(y_eval, y_test) # find differet from evaluate and test

# Show correct
correct = 0
with torch.no_grad():
  for i, data in enumerate(x_test):
    y_val = model.forward(data)
    print(f'{i+1}. {str(y_val)} {y_test[i]}')

    if y_val.argmax().item() == y_test[i]:
      correct += 1


print(f'Accuracy: {correct/len(y_test)}')

# CNN (Conv Neural Network) with MNIST Dataset

In [None]:
import torch, os
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
from torchvision import datasets, transforms
from torchvision.utils import make_grid

import numpy as np
import pandas as pd
from sklearn.metrics import confusion_matrix
from sklearn.model_selection import train_test_split
from PIL import Image
import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
!rm -rf '/root/.cache/kagglehub/datasets/imsparsh/flowers-dataset'

import kagglehub

# Download latest version
path = kagglehub.dataset_download("imsparsh/flowers-dataset")

print("Path to dataset files:", path)

In [None]:
# Convert kaggle dataset into Tensor 4 Dimension
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Resize((224, 224))
])

In [None]:
# Custom Dataset Class
class ImageTrainDataset(Dataset): # In this case, i don't have dataset with labeled csv, so we need to create itself!
  def __init__(self, root_dir=None, transform=None):
    self.root_dir = root_dir    # in this case in "train" folder
    self.transform = transform  # transform to augment

    # Create list class and index itself (ie daisy: 0, sunflower:1)
    self.classFolder = sorted(folder.name for folder in os.scandir(self.root_dir) if folder.is_dir()) # get all flower folder name in "train"
    self.classIndex = {class_name: index for index, class_name in enumerate(self.classFolder)}        # create index for each class {daisy: 1}

    # Collect all image path and combine with labels
    self.img_path = []
    self.label_img = []

    for class_name in self.classFolder:
      class_path = os.path.join(self.root_dir, class_name) # "train/daisy"
      for img_name in os.listdir(class_path): # search all file inside "train/daisy"
        self.img_path.append(os.path.join(class_path, img_name))  # "train/daisy/img_a.jpg"
        self.label_img.append(self.classIndex[class_name])        # get 0,1,2,3 based on classIndex (classIndex['daisy'])

  def __len__(self):
    return len(self.img_path)

  def __getitem__(self, index):
    image_path = self.img_path[index]
    y_label = self.label_img[index]
    images = Image.open(image_path).convert("RGB")

    if self.transform:
      images = self.transform(images)

    return images, y_label

In [None]:
# Custom Dataset Class
class ImageTestDataset(Dataset): # In this case, i don't have dataset with labeled csv, so we need to create itself!
  def __init__(self, csv_file, train_dir, test_dir, transform=None):
    self.csv_file = csv_file
    self.train_dir = train_dir
    self.test_dir = test_dir
    self.transform = transform

    self.classTrainFolder = sorted(folder.name for folder in os.scandir(self.train_dir) if folder.is_dir()) # get all flower folder name in "train"
    self.classTrainIndex = {class_name: index for index, class_name in enumerate(self.classTrainFolder)}        # create index for each class {daisy: 1}

    # Collect all image path and combine with labels
    self.test_img_path = []
    self.label_img = []

    # Loop to collect img_path in test_dir and get label from csv_file
    for img_file in os.listdir(self.test_dir):
      self.test_img_path.append(os.path.join(self.test_dir, img_file))  #  Append "test/Image_1.jpg"
      label = self.csv_file[self.csv_file[0] == img_file][1].iloc[0]    # see csv_file > is current row match file current file_name > select second column (label "sunflower") > get first value (or current value)
      self.label_img.append(self.classTrainIndex[label])                # Append sunflower[label] (3)

  def __len__(self):
    return len(self.test_img_path)

  def __getitem__(self, index):
    test_image_path = self.test_img_path[index]
    test_images = Image.open(test_image_path).convert("RGB")
    y_label = self.label_img[index]

    if self.transform:
      test_images = self.transform(test_images)

    return test_images, y_label

In [None]:
# DataLoader
# Since we'll split the train data and test data, we don't need train_test_split from sklearn
dataset_train = ImageTrainDataset(root_dir="/kaggle/input/flowers-dataset/train", transform=transform)
loader_train = DataLoader(dataset_train, batch_size=16, shuffle=True)

df_test = pd.concat(map(pd.read_csv, ['/kaggle/input/flowers-dataset/Testing_set_flower.csv', '/kaggle/input/flowers-dataset/sample_submission.csv']), ignore_index=True, axis=1)
dataset_test = ImageTestDataset(csv_file=df_test, train_dir="/kaggle/input/flowers-dataset/train", test_dir="/kaggle/input/flowers-dataset/test", transform=transform)
loader_test = DataLoader(dataset_test, batch_size=8, shuffle=False)

In [None]:
# JUST TESTING SOME CNN HEHE (IGNORE THIS)
z = 0
conv1 = nn.Conv2d(3, 6, 3)
conv2 = nn.Conv2d(6, 12, 3)
for a, b in loader_train:
  # Image.open(index).convert('RGB')
  print(f'a = {a.shape}')
  print(f'b = {b}')

  a = F.relu(conv1(a))
  a = F.max_pool2d(a, 2, 2)
  a = F.relu(conv2(a))
  a = F.max_pool2d(a, 2, 2)
  print(a.shape)

  z = z + 1

  if z >= 2:
    break

In [None]:
# Model Class
class ConvolutionalNetwork(nn.Module):
  def __init__(self):
    super().__init__()

    # Conv
    self.conv1 = nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, stride=1) # Channel, Input, padding, Stride
    # SINCE THIS CODE IN CONV SECTION, WE WILL USE nn.MaxPool2d, because F.max_pool2d is for forwarding
    self.pool1 = nn.MaxPool2d(2, 2) # we will compress / pooling this by 2
    self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1) # Because conv1 out_feature is 32, so we will use this for in_feature and mult by 2 to out_feature
    self.pool2 = nn.MaxPool2d(2, 2)
    self.conv3 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=1) # Same as before, it multiplied by 2
    self.pool3 = nn.MaxPool2d(2, 2)

    # Fully Connected Node (I DONT UNDERSTAND THIS, I'M BURNING OUT 😭)
    self.fc1 = nn.Linear(128 * 26 * 26, 512) # why is this 26 lmaoooo,
    # or maybe i know, it's because the first input (my pixel image 224) is 224, so when it got conv1, it down to (224-kernelsize)/stride+1 = 222 > pool1 = 111 > conv2 = 109 > pool2 = 54 > conv3 = 52 > pool3 = 26

    self.fc2 = nn.Linear(512, 5) # why is this 512???
    # 512 is very arbitary value, we can choose it start from 512, 1024, 2048, or something

  def forward(self, x):
    x = F.relu(self.conv1(x))
    x = self.pool1(x)
    x = F.relu(self.conv2(x))
    x = self.pool2(x)
    x = F.relu(self.conv3(x))
    x = self.pool3(x)

    # Flatten (what is this 😭)
    # Flatten it's like we laying all down value into 1 row
    x = x.view(x.size(0), -1)  # flatten (reshaped into one line)
    x = F.relu(self.fc1(x))
    x = self.fc2(x)
    return x



In [None]:
import torch

# Check if GPU is available
if torch.cuda.is_available():
    device = torch.device("cuda")
    print("GPU is available and being used.")
else:
    device = torch.device("cpu")
    print("GPU is not available, using CPU.")

In [None]:
# Create instance Model
torch.manual_seed(22)
cnnModel = ConvolutionalNetwork()
# Move the model to the selected device before initializing the optimizer
cnnModel.to(device)
cnnModel

In [None]:
# Loss Function Optimizer (Don't know what is this)
cnnCriterion = nn.CrossEntropyLoss()
cnnOptimizer = torch.optim.Adam(cnnModel.parameters(), lr=0.01)

# Move optimizer state to the same device as the model
cnnOptimizer.to(device)

In [None]:
# # Train and Testing Model
# import time
# start_time = time.time()

# # Declare variable to track epoch, loss, valid
# epoch = 100
# train_loss = []
# test_loss = []
# train_correct = []
# test_correct = []

# # Loop through epochs
# for i in range(epoch):
#   train_correct_count = 0
#   test_correct_count = 0

#   # Train
#   for batch, (x_train_b, y_train_b) in enumerate(loader_train):
#     batch += 1
#     # Move data to the device
#     x_train_b, y_train_b = x_train_b.to(device), y_train_b.to(device)

#     y_pred = cnnModel(x_train_b)
#     loss = cnnCriterion(y_pred, y_train_b)

#     predicted = torch.max(y_pred.data, 1)[1]
#     batch_correct = (predicted == y_train_b).sum()
#     # train_loss.append(loss.item())
#     train_correct_count += batch_correct

#     # Update Parameter
#     cnnOptimizer.zero_grad()
#     loss.backward()
#     cnnOptimizer.step()

#     # Print result
#     # if batch%600 == 0:
#     print(f'Epoch: {i} Batch: {batch} Loss: {loss.item()}')

#   train_loss.append(loss.item())
#   train_correct.append(train_correct_count)

#   # Test
#   with torch.no_grad():
#     for batch, (x_test_b, y_test_b) in enumerate(loader_test):
#       # Move data to the device
#       x_test_b, y_test_b = x_test_b.to(device), y_test_b.to(device)

#       y_val = cnnModel(x_test_b)
#       predicted = torch.max(y_val.data, 1)[1]
#       test_correct_count += (predicted == y_test_b).sum()

#     loss = cnnCriterion(y_val, y_test_b)
#     test_loss.append(loss.item())
#     test_correct.append(test_correct_count)


# current_time = time.time()
# total = current_time - start_time
# print(f'Total Time: {total/60} minutes')

First, let's check if a GPU is available in your environment.

Now, let's move your model and data to the GPU (or CPU if GPU is not available).

In [None]:
import torch

# Check if GPU is available and define the device
if torch.cuda.is_available():
    device = torch.device("cuda")
    print("GPU is available and being used.")
else:
    device = torch.device("cpu")
    print("GPU is not available, using CPU.")

# Move the model to the selected device
cnnModel.to(device)

# Modify your training and testing loops to move data to the device as well
# For example, in your training loop:
# for batch, (x_train_b, y_train_b) in enumerate(loader_train):
#     x_train_b, y_train_b = x_train_b.to(device), y_train_b.to(device)
#     ...

# And in your testing loop:
# with torch.no_grad():
#     for batch, (x_test_b, y_test_b) in enumerate(loader_test):
#         x_test_b, y_test_b = x_test_b.to(device), y_test_b.to(device)
#         ...

print("Model and data moved to:", device)

In [None]:
# Train and Testing Model
import time
start_time = time.time()

# Declare variable to track epoch, loss, valid
epoch = 100
train_loss = []
test_loss = []
train_correct = []
test_correct = []

# Loop through epochs
for i in range(epoch):
  train_correct_count = 0
  test_correct_count = 0

  # Train
  for batch, (x_train_b, y_train_b) in enumerate(loader_train):
    batch += 1
    # Move data to the device
    x_train_b, y_train_b = x_train_b.to(device), y_train_b.to(device)

    y_pred = cnnModel(x_train_b)
    loss = cnnCriterion(y_pred, y_train_b)

    predicted = torch.max(y_pred.data, 1)[1]
    batch_correct = (predicted == y_train_b).sum()
    # train_loss.append(loss.item())
    train_correct_count += batch_correct

    # Update Parameter
    cnnOptimizer.zero_grad()
    loss.backward()
    cnnOptimizer.step()

    # Print result
    # if batch%600 == 0:
    print(f'Epoch: {i} Batch: {batch} Loss: {loss.item()}')

  train_loss.append(loss.item())
  train_correct.append(train_correct_count)

  # Test
  with torch.no_grad():
    for batch, (x_test_b, y_test_b) in enumerate(loader_test):
      # Move data to the device
      x_test_b, y_test_b = x_test_b.to(device), y_test_b.to(device)

      y_val = cnnModel(x_test_b)
      predicted = torch.max(y_val.data, 1)[1]
      test_correct_count += (predicted == y_test_b).sum()

    loss = cnnCriterion(y_val, y_test_b)
    test_loss.append(loss.item())
    test_correct.append(test_correct_count)


current_time = time.time()
total = current_time - start_time
print(f'Total Time: {total/60} minutes')