In [1]:
import os, time
import numpy as np
import random
random.seed(42)
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score, confusion_matrix, f1_score, classification_report

import torch
torch.manual_seed(42)
from torch import nn
from torch.optim import SGD, Adam
from torch.utils.data import DataLoader, RandomSampler
from torch.utils.data.dataset import Dataset
from torchvision.models import resnet
from torchvision import transforms, datasets, models
from torch.optim.lr_scheduler import ReduceLROnPlateau

print('PyTorch %s %s' % (torch.__version__, torch.cuda.get_device_properties(0) if torch.cuda.is_available() else 'CPU'))

PyTorch 1.11.0 _CudaDeviceProperties(name='NVIDIA GeForce RTX 3080', major=8, minor=6, total_memory=10239MB, multi_processor_count=68)


In [2]:
#https://www.kaggle.com/code/halfendt/dog-breed-classifier-pytorch-resnet-152
def load_transform_images(images_path, presplit, train_split, test_split, val_split, batch_size, threads, mean, std):
    train_transform = transforms.Compose([
                                         #transforms.RandomRotation(degrees=15),
                                         #transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
                                         #transforms.RandomResizedCrop((224,224)),
                                         transforms.Resize((224,224)),
                                         transforms.RandomHorizontalFlip(),
                                         transforms.ToTensor(),
                                         transforms.Normalize(torch.Tensor(mean),
                                                              torch.Tensor(std))])

    test_transform = transforms.Compose([
                                        transforms.Resize((224,224)),
                                        #transforms.CenterCrop((224,224)),
                                        transforms.ToTensor(),
                                        transforms.Normalize(torch.Tensor(mean),
                                                             torch.Tensor(std))])

    val_transform = transforms.Compose([
                                       transforms.Resize((224,224)),
                                       #transforms.CenterCrop((224,224)),
                                       transforms.ToTensor(),
                                       transforms.Normalize(torch.Tensor(mean),
                                                            torch.Tensor(std))])

    if presplit:
        try:
            training_set = datasets.ImageFolder(root=images_path+'/train', transform=train_transform)
            validation_set = datasets.ImageFolder(root=images_path+'/val', transform=val_transform)
        except FileNotFoundError:
            raise Exception('Not presplit into Training and Validation sets')
        try:
            testing_set = datasets.ImageFolder(root=images_path+'/test', transform=test_transform)
        except:
            testing_set = validation_set
        dataset = training_set
    else:
        dataset = datasets.ImageFolder(root=images_path, transform=train_transform)
        train_size = int(train_split * len(dataset))
        test_size = int(test_split * len(dataset))
        val_size = len(dataset) - train_size - test_size
        training_set, testing_set, validation_set = torch.utils.data.random_split(dataset, [train_size, test_size, val_size])

    training_set_loader = DataLoader(training_set, batch_size=batch_size, num_workers=threads, shuffle=True)
    validation_set_loader = DataLoader(validation_set, batch_size=batch_size, num_workers=threads, shuffle=True)
    testing_set_loader = DataLoader(testing_set, batch_size=batch_size, num_workers=threads, shuffle=False)

    return training_set_loader, testing_set_loader, validation_set_loader, dataset, training_set, testing_set, validation_set

In [3]:
images_path = './data/images/Images/'
results_path = images_path+'_results'
presplit = False
train_split = 0.7
val_split = 0.2
test_split = 0.1
batch_size = 32
threads = 0
mean = [0.485, 0.456, 0.406]
std = [0.229, 0.224, 0.225]

training_set_loader, testing_set_loader, validation_set_loader, dataset, training_set, testing_set, validation_set = \
                  load_transform_images(images_path, presplit, train_split, test_split, val_split, batch_size, threads, mean, std)

In [4]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
from collections import OrderedDict
from torch.nn import init
import math

def conv_bn(inp, oup, stride):
	return nn.Sequential(
		nn.Conv2d(inp, oup, 3, stride, 1, bias=False),
		nn.BatchNorm2d(oup),
		nn.ReLU(inplace=True)
	)


def conv_1x1_bn(inp, oup):
	return nn.Sequential(
		nn.Conv2d(inp, oup, 1, 1, 0, bias=False),
		nn.BatchNorm2d(oup),
		nn.ReLU(inplace=True)
	)

def channel_shuffle(x, groups):
	batchsize, num_channels, height, width = x.data.size()

	channels_per_group = num_channels // groups

	# reshape
	x = x.view(batchsize, groups,
		channels_per_group, height, width)

	x = torch.transpose(x, 1, 2).contiguous()

	# flatten
	x = x.view(batchsize, -1, height, width)

	return x

class InvertedResidual(nn.Module):
	def __init__(self, inp, oup, stride, benchmodel):
		super(InvertedResidual, self).__init__()
		self.benchmodel = benchmodel
		self.stride = stride
		assert stride in [1, 2]

		oup_inc = oup//2

		if self.benchmodel == 1:
			#assert inp == oup_inc
			self.banch2 = nn.Sequential(
				# pw
				nn.Conv2d(oup_inc, oup_inc, 1, 1, 0, bias=False),
				nn.BatchNorm2d(oup_inc),
				nn.ReLU(inplace=True),
				# dw
				nn.Conv2d(oup_inc, oup_inc, 3, stride, 1, groups=oup_inc, bias=False),
				nn.BatchNorm2d(oup_inc),
				# pw-linear
				nn.Conv2d(oup_inc, oup_inc, 1, 1, 0, bias=False),
				nn.BatchNorm2d(oup_inc),
				nn.ReLU(inplace=True),
			)
		else:
			self.banch1 = nn.Sequential(
				# dw
				nn.Conv2d(inp, inp, 3, stride, 1, groups=inp, bias=False),
				nn.BatchNorm2d(inp),
				# pw-linear
				nn.Conv2d(inp, oup_inc, 1, 1, 0, bias=False),
				nn.BatchNorm2d(oup_inc),
				nn.ReLU(inplace=True),
			)

			self.banch2 = nn.Sequential(
				# pw
				nn.Conv2d(inp, oup_inc, 1, 1, 0, bias=False),
				nn.BatchNorm2d(oup_inc),
				nn.ReLU(inplace=True),
				# dw
				nn.Conv2d(oup_inc, oup_inc, 3, stride, 1, groups=oup_inc, bias=False),
				nn.BatchNorm2d(oup_inc),
				# pw-linear
				nn.Conv2d(oup_inc, oup_inc, 1, 1, 0, bias=False),
				nn.BatchNorm2d(oup_inc),
				nn.ReLU(inplace=True),
			)

	@staticmethod
	def _concat(x, out):
		# concatenate along channel axis
		return torch.cat((x, out), 1)

	def forward(self, x):
		if 1==self.benchmodel:
			x1 = x[:, :(x.shape[1]//2), :, :]
			x2 = x[:, (x.shape[1]//2):, :, :]
			out = self._concat(x1, self.banch2(x2))
		elif 2==self.benchmodel:
			out = self._concat(self.banch1(x), self.banch2(x))

		return channel_shuffle(out, 2)


class ShuffleNetV2(nn.Module):
	def __init__(self, n_class=120, input_size=224, width_mult=1.):
		super(ShuffleNetV2, self).__init__()

		assert input_size % 32 == 0

		self.stage_repeats = [4, 8, 4]
		# index 0 is invalid and should never be called.
		# only used for indexing convenience.
		if width_mult == 0.5:
			self.stage_out_channels = [-1, 24,  48,  96, 192, 1024]
		elif width_mult == 1.0:
			self.stage_out_channels = [-1, 24, 116, 232, 464, 1024]
		elif width_mult == 1.5:
			self.stage_out_channels = [-1, 24, 176, 352, 704, 1024]
		elif width_mult == 2.0:
			self.stage_out_channels = [-1, 24, 224, 488, 976, 2048]
		else:
			raise ValueError(
				"""{} groups is not supported for
					   1x1 Grouped Convolutions""".format(3))

		# building first layer
		input_channel = self.stage_out_channels[1]
		self.conv1 = conv_bn(3, input_channel, 2)
		self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
		self.features = []
		# building inverted residual blocks
		for idxstage in range(len(self.stage_repeats)):
			numrepeat = self.stage_repeats[idxstage]
			output_channel = self.stage_out_channels[idxstage+2]
			for i in range(numrepeat):
				if i == 0:
				#inp, oup, stride, benchmodel):
					self.features.append(InvertedResidual(input_channel, output_channel, 2, 2))
				else:
					self.features.append(InvertedResidual(input_channel, output_channel, 1, 1))
				input_channel = output_channel


		# make it nn.Sequential
		self.features = nn.Sequential(*self.features)

		# building last several layers
		self.conv_last      = conv_1x1_bn(input_channel, self.stage_out_channels[-1])
		self.globalpool = nn.Sequential(nn.AvgPool2d(int(input_size/32)))

	# building classifier
		self.classifier = nn.Sequential(nn.Linear(self.stage_out_channels[-1], n_class))


	def forward(self, x):
		x = self.conv1(x)
		x = self.maxpool(x)
		x = self.features(x)
		x = self.conv_last(x)
		x = self.globalpool(x)
		x = x.view(-1, self.stage_out_channels[-1])
		x = self.classifier(x)
		return x

def shufflenetv2(width_mult=1.):
	model = ShuffleNetV2(width_mult=width_mult)
	return model

In [5]:
def validate():
	correct = 0
	total = 0
	model.eval()
	with torch.no_grad():
		for imgs, labels in validation_set_loader:
			imgs = imgs.to(device)
			labels = labels.to(device)
			batch_size=imgs.shape[0]
			outputs = model(imgs)
			_, predicted = torch.max(outputs, dim=1)
			total += labels.shape[0]
			correct += int((predicted==labels).sum())
		print("Accuracy ", correct/total)

In [6]:
import time
def training(model, optimizer, loss_fn, n_epochs, device, l2_lambda, train_loader):
	start = time.time()

	for epoch in range(n_epochs):
		if epoch == 75:
			learning_rate = 1e-5
			validate()
			model.train()
		elif epoch == 100:
			learning_rate = 1e-6
			validate()
			model.train()
		elif epoch == 10:
			learning_rate = 1e-4
			validate()
			model.train()
		elif epoch == 50:
			learning_rate = 1e-4
			validate()
			model.train()
		for imgs, labels in train_loader:
			imgs = imgs.to(device)
			labels = labels.to(device)
			batch_size = imgs.shape[0]
			outputs = model(imgs)
			loss = loss_fn((outputs), labels)
			l2_norm = sum(p.pow(2.0).sum() for p in model.parameters())
			loss = loss + l2_lambda* l2_norm

			optimizer.zero_grad()
			loss.backward()
			optimizer.step()
		print("Epoch: %d, Loss: %f" % (epoch, float(loss)))
	end = time.time()
	print(end - start)

In [7]:
device = torch.device('cuda:0')
model = shufflenetv2(2.0).to(device)
loss_function = nn.CrossEntropyLoss()
learning_rate = 1e-4
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
n_epochs = 150
l2_lambda = .001

In [None]:
training(model, optimizer, loss_function, n_epochs, device, l2_lambda, training_set_loader)

In [None]:
torch.save(model.state_dict(), "./models/150epochStandardAdaptiveLRWide.pt")

In [None]:
validate()

In [None]:
from ptflops import get_model_complexity_info


macs, params = get_model_complexity_info(model, ( 3, 224,224), as_strings=True, print_per_layer_stat=False, verbose=False)
print('{:<30}  {:<8}'.format('Computational complexity: ', macs))
print('{:<30}  {:<8}'.format('Number of parameters: ', params))