## Demo notebook to detect whether people are using safety gear or not

The data is taken from <br>


imports ...

In [25]:
import os 
import torch
from torch import nn, optim
import torchvision
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader, random_split
import torchvision.transforms.v2 as v2
import math
import time 

Parametes 

In [26]:
dataset_folder  = "../Dataset"
batch_size = 32
train_ratio = 0.85
valid_ratio = 0.0
test_ratio = 0.15
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Using device:', device)

print (f"Settings ... \n dataset path \t\t {dataset_folder} \n batch_size \t\t {batch_size}" +
       f"\n train_ratio \t\t {train_ratio} \n validation ratio \t {valid_ratio} \n test ratio \t\t {test_ratio} \n device \t\t {device}")

Using device: cuda
Settings ... 
 dataset path 		 ../Dataset 
 batch_size 		 32
 train_ratio 		 0.85 
 validation ratio 	 0.0 
 test ratio 		 0.15 
 device 		 cuda


Check the classes in the folder

In [27]:
class_labels = [name for name in os.listdir(dataset_folder) if not name.startswith(".DS_Store")]
print(class_labels)
num_classes = len(class_labels)
print(num_classes)

['Not Wearing Safety Gear', 'Wearing Safety Gear']
2


Creating dataset from the image folder 

In [28]:
mean = [0.485, 0.456, 0.406]
std = [0.229, 0.224, 0.225]

transformations = v2.Compose([
    v2.ToImage(),
    v2.RandomHorizontalFlip(),
    v2.RandomVerticalFlip(),
    v2.RandomResizedCrop(size=(256, 256), antialias=True),
    v2.RandomRotation(degrees = (0, 170)),
    v2.ToDtype(dtype = torch.float32, scale = True),
    v2.Resize(size = (256, 256), antialias = True),
    v2.Normalize(mean, std)
])

dataset = ImageFolder(dataset_folder, transform = transformations)


probe stats from the Image folder 

In [5]:
print (f"classes {dataset.classes}")
d = dataset.class_to_idx

classes ['Not Wearing Safety Gear', 'Wearing Safety Gear']


splitting the dataset <br>
this is a small dataset, we ar eonly using train and test dataset 

In [38]:
train_size = math.ceil(len(dataset) * train_ratio)
test_szie = math.floor(len(dataset) * test_ratio)
print(f"dataset {len(dataset)}, train dataset {train_size}, test size {test_szie}")

train_dataset, test_dataset = random_split(dataset,[train_size, test_szie])

train_dataloader = DataLoader(train_dataset,shuffle=True,batch_size=batch_size)
test_dataloader = DataLoader(test_dataset,shuffle=True, batch_size=batch_size,pin_memory=True)

print(f"Lenght of training dataset {len(train_dataloader.dataset)}")
print(f"Lenght of test dataset {len(test_dataloader.dataset)}")

dataset 355, train dataset 302, test size 53
Lenght of training dataset 302
Lenght of test dataset 53


try ResNet 

In [30]:
res_net = torchvision.models.resnet50(weights='IMAGENET1K_V1')

for param in res_net.parameters():
    param.requires_grad = False

res_net.fc  = torch.nn.Sequential(
    nn.Linear(2048,128),
    nn.ReLU(),
    nn.Linear(
        in_features=128,
        out_features=num_classes
    ),
)



ANy other model(s) to evalaute can go here ... 

In [31]:
model_to_evaluate = res_net

In [32]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(res_net.fc.parameters())

training method 

In [33]:
def train_model(model,model_name,criterion,optimier, data_loader,device, num_epochs=0):
    model.to(device)
    for epoch in range(num_epochs):
        #loss_batches = 0
        loss_epoch=0;
        corrects_batches = 0
        count = 0
        start = time.time()
        correct = 0
        total = 0
        data_loading_begin = time.time()
        data_loading_time = 0
        data_processing_time = 0 
        for x,y in data_loader:
            
            x,y = x.to(device), y.to(device)
            data_loading_time += (time.time()-data_loading_begin)
            processing_time_begin = time.time()
            outputs = model(x)
            loss = criterion(outputs,y)
            optimier.zero_grad()
            loss.backward()
            optimier.step()
            data_processing_time += (time.time()-processing_time_begin)
            _,preds = torch.max(outputs,1)

            correct += (preds == y).sum().item()
            total += preds.size(0)
            loss_epoch += loss.item()
            count += 1
            data_loading_begin = time.time()
        #epoch_loss = loss_batches / len(data_loader)
        epoch_acc = correct / total
        print(f"\n epoch {epoch} Loss : {(loss_epoch/count):.4f} Accuracy {epoch_acc:.2f}, time : {time.time()-start} secs")
        print(f"data loading time {data_loading_time} secs, data processing time {data_processing_time} secs")
        if (epoch % 3 == 0 ):
          torch.save(model.state_dict(),f"{model_name}_{epoch:02d}_{epoch_acc:.2f}.h5")\
          
    return model



In [39]:
train_model(model_to_evaluate,"res_net",criterion=criterion, optimier=optimizer,
            data_loader=train_dataloader, device=device, num_epochs=30)

torch.save(model_to_evaluate.state_dict(),'resNet.h5')


 epoch 0 Loss : 0.2938 Accuracy 0.86, time : 16.442638397216797 secs
data loading time 15.544307947158813 secs, data processing time 0.2253870964050293 secs

 epoch 1 Loss : 0.2740 Accuracy 0.86, time : 15.586276054382324 secs
data loading time 14.887279748916626 secs, data processing time 0.23354411125183105 secs

 epoch 2 Loss : 0.2504 Accuracy 0.89, time : 15.689343690872192 secs
data loading time 15.03190803527832 secs, data processing time 0.2397916316986084 secs

 epoch 3 Loss : 0.2521 Accuracy 0.90, time : 16.6470787525177 secs
data loading time 15.990078926086426 secs, data processing time 0.25426149368286133 secs

 epoch 4 Loss : 0.1956 Accuracy 0.92, time : 16.844648838043213 secs
data loading time 16.025765657424927 secs, data processing time 0.24703550338745117 secs

 epoch 5 Loss : 0.2050 Accuracy 0.92, time : 16.114879608154297 secs
data loading time 15.459449529647827 secs, data processing time 0.24773955345153809 secs

 epoch 6 Loss : 0.2704 Accuracy 0.91, time : 15.92

In [12]:
def calculate_accuracy(loader, model):
    correct = 0
    total = 0
    model.eval()
    with torch.no_grad():
        for x, y in loader:
            x,y = x.to(device), y.to(device)
            scores = model(x)
            _, predictions = scores.max(1)
            correct += (predictions == y).sum().item()
            total += predictions.size(0)
    model.train()
    return correct/total

In [24]:
print(f"Training Accuracy is {calculate_accuracy(train_dataloader, res_net)*100}")

print(f"Testing Accuracy is {calculate_accuracy(test_dataloader, res_net)*100}")

Training Accuracy is 95.03311258278146
Testing Accuracy is 86.79245283018868
