<a href="https://colab.research.google.com/github/dysonspheres/FRI-ML-Door-Detection-Model/blob/main/Door_Detection_Model_Test.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# General Information for this project

* When creating strings for file paths, do not include the content folder. The google collab runtime is already inside of the content folder
* Run blocks sequently to train model correctly

In [None]:
%matplotlib inline

In [None]:
import os
import glob

data_dir = './drive/MyDrive/image_dump'

In [None]:
local_data_dir = './data'

# This code block creates the necessary file structure in the LOCAL google collab file system
if not os.path.isdir(local_data_dir):
  os.mkdir(local_data_dir)

training_dir = os.path.join(local_data_dir, 'training')
if not os.path.isdir(training_dir):
  os.mkdir(training_dir)

closed_training_dir = os.path.join(training_dir, 'closed/')
if not os.path.isdir(closed_training_dir):
  os.mkdir(closed_training_dir)

open_training_dir = os.path.join(training_dir, 'open/')
if not os.path.isdir(open_training_dir):
  os.mkdir(open_training_dir)

#create validation dir (for testing)
validation_dir = os.path.join(local_data_dir,'validation')
if not os.path.isdir(validation_dir):
  os.mkdir(validation_dir)

#create closed in validation
closed_validation_dir = os.path.join(validation_dir, 'closed/')
if not os.path.isdir(closed_validation_dir):
  os.mkdir(closed_validation_dir)

#create open in validation
open_validation_dir = os.path.join(validation_dir,'open/')
if not os.path.isdir(open_validation_dir):
  os.mkdir(open_validation_dir)

In [None]:
import shutil
import random

# this code block sorts images from Casey's drive file (image_dump) into 
# the train and validation data sets with an 80-20 split
# DO NOT LOAD THIS TWICE IN THE SAME RUNTIME, IT WILL RUIN THE DATASETS

data_dir = './drive/MyDrive/image_dump'
split = 0.8
closed_img_dir = data_dir + '/closed/closed*'
print(closed_img_dir) # debug, remove later
open_img_dir = data_dir + '/open/open*'
closed_imgs_size = len(glob.glob(closed_img_dir))
open_imgs_size = len(glob.glob(open_img_dir))
print(closed_imgs_size) # debug, remove later

closed_list = glob.glob(closed_img_dir)
open_list = glob.glob(open_img_dir)

random.shuffle(closed_list)
random.shuffle(open_list)

closed_list = enumerate(closed_list)
open_list = enumerate(open_list)

for i, img in closed_list:
  if i < (closed_imgs_size * split):
    if not os.path.exists(closed_training_dir + os.path.basename(img)):
      shutil.move(img, closed_training_dir)
  else:
    if not os.path.exists(closed_validation_dir + os.path.basename(img)):
      shutil.move(img, closed_validation_dir)

for i, img in open_list:
  if i < (open_imgs_size * split):
    if not os.path.exists(open_training_dir + os.path.basename(img)):
      shutil.move(img, open_training_dir)
  else:
    if not os.path.exists(open_validation_dir + os.path.basename(img)):
      shutil.move(img, open_validation_dir)

./drive/MyDrive/image_dump/closed/closed*
51


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# This code blocks serves to show you a random sample of the pulled images. It has no other purpose for the actual model itself

import matplotlib.pyplot as plt
import numpy as np
import cv2
from scipy import ndimage

from IPython.core.pylabtools import figsize

samples_closed = [os.path.join(closed_training_dir,np.random.choice(os.listdir(closed_training_dir),1)[0]) for _ in range(8)]
samples_open = [os.path.join(open_training_dir,np.random.choice(os.listdir(open_training_dir),1)[0]) for _ in range(8)]

nrows = 4
ncols = 4

fig, ax = plt.subplots(nrows,ncols,figsize = (10,10))
ax = ax.flatten()

for i in range(nrows*ncols):
  if i < 8:
    pic = plt.imread(samples_closed[i%8])
    ax[i].imshow(pic)
    ax[i].set_axis_off()
  else:
    pic = plt.imread(samples_open[i%8])
    ax[i].imshow(pic)
    ax[i].set_axis_off()
plt.show()

In [None]:
import torch
import torchvision
from torchvision import datasets, transforms

train_dir = './data/training'
test_dir = './data/validation'

train_transforms = transforms.Compose([transforms.Resize((224,224)),
                                       transforms.ToTensor(),                                
                                       torchvision.transforms.Normalize(
                                           mean=[0.485, 0.456, 0.406],
                                           std=[0.229, 0.224, 0.225],
    ),
                                       ])
test_transforms = transforms.Compose([transforms.Resize((224,224)),
                                      transforms.ToTensor(),
                                      torchvision.transforms.Normalize(
                                          mean=[0.485, 0.456, 0.406],
                                          std=[0.229, 0.224, 0.225],
    ),
                                      ])

#datasets
train_data = datasets.ImageFolder(train_dir,transform=train_transforms)
test_data = datasets.ImageFolder(test_dir,transform=test_transforms)

#dataloader
trainloader = torch.utils.data.DataLoader(train_data, shuffle = True, batch_size=16)
testloader = torch.utils.data.DataLoader(test_data, shuffle = True, batch_size=16)

In [None]:
# compile everything needed to train the model into one function

def make_train_step(model, optimizer, loss_fn):
  def train_step(x,y):
    #make prediction
    yhat = model(x)
    #enter train mode
    model.train()
    #compute loss
    loss = loss_fn(yhat,y)

    loss.backward()
    optimizer.step()
    optimizer.zero_grad()
    #optimizer.cleargrads()

    return loss
  return train_step

In [None]:
# Code for the model itself

from torchvision import datasets, models, transforms
from torchvision.models import ResNet18_Weights
import torch.nn as nn

device = "cuda" if torch.cuda.is_available() else "cpu"
model = models.resnet18(weights=ResNet18_Weights.DEFAULT)

#freeze all params
for params in model.parameters():
  params.requires_grad_ = False

#add a new final layer
nr_filters = model.fc.in_features  #number of input features of last layer
model.fc = nn.Linear(nr_filters, 1)

model = model.to(device)

Downloading: "https://download.pytorch.org/models/resnet18-f37072fd.pth" to /root/.cache/torch/hub/checkpoints/resnet18-f37072fd.pth
100%|██████████| 44.7M/44.7M [00:01<00:00, 25.4MB/s]


In [None]:
# Code for the loss, optimizer, and train step

from torch.nn.modules.loss import BCEWithLogitsLoss
from torch.optim import lr_scheduler

#loss
loss_fn = BCEWithLogitsLoss() #binary cross entropy with sigmoid, so no need to use sigmoid in the model

#optimizer
optimizer = torch.optim.Adam(model.fc.parameters()) 

#train step
train_step = make_train_step(model, optimizer, loss_fn)

In [None]:
### ACTUAL MODEL TRAINING CODE WILL TAKE FOREVER (play chess while waiting)

%%capture
!pip install tqdm
from tqdm import tqdm


losses = []
val_losses = []

epoch_train_losses = []
epoch_test_losses = []

n_epochs = 10
early_stopping_tolerance = 3
early_stopping_threshold = 0.03

for epoch in range(n_epochs):
  epoch_loss = 0
  for i ,data in tqdm(enumerate(trainloader), total = len(trainloader)): #iterate ove batches
    x_batch , y_batch = data
    x_batch = x_batch.to(device) #move to gpu
    y_batch = y_batch.unsqueeze(1).float() #convert target to same nn output shape
    y_batch = y_batch.to(device) #move to gpu


    loss = train_step(x_batch, y_batch)
    epoch_loss += loss/len(trainloader)
    losses.append(loss)
    
  epoch_train_losses.append(epoch_loss)
  print('\nEpoch : {}, train loss : {}'.format(epoch+1,epoch_loss))

  #validation doesnt requires gradient
  with torch.no_grad():
    cum_loss = 0
    for x_batch, y_batch in testloader:
      x_batch = x_batch.to(device)
      y_batch = y_batch.unsqueeze(1).float() #convert target to same nn output shape
      y_batch = y_batch.to(device)

      #model to eval mode
      model.eval()

      yhat = model(x_batch)
      val_loss = loss_fn(yhat,y_batch)
      cum_loss += loss/len(testloader)
      val_losses.append(val_loss.item())


    epoch_test_losses.append(cum_loss)
    print('Epoch : {}, val loss : {}'.format(epoch+1,cum_loss))  
    
    best_loss = min(epoch_test_losses)
    
    #save best model
    if cum_loss <= best_loss:
      best_model_wts = model.state_dict()
    
    #early stopping
    early_stopping_counter = 0
    if cum_loss > best_loss:
      early_stopping_counter +=1

    if (early_stopping_counter == early_stopping_tolerance) or (best_loss <= early_stopping_threshold):
      print("/nTerminating: early stopping")
      break #terminate training
    
#load best model
model.load_state_dict(best_model_wts)


In [None]:
from scipy import ndimage
import matplotlib.pyplot as plt 
from matplotlib import transforms

torch.save(model.state_dict(), './drive/MyDrive/image_dump/model.pth')



def inference(test_data):
  idx = torch.randint(1, len(test_data), (1,))
  sample = torch.unsqueeze(test_data[idx][0], dim=0).to(device)

  if torch.sigmoid(model(sample)) < 0.5:
    print("Prediction : Closed")
  else:
    print("Prediction : Open")

  img = test_data[idx][0].permute(1, 2, 0)
  rotated_img = ndimage.rotate(img, -90)

  plt.imshow(rotated_img)

inference(test_data)