In [None]:
from google.colab import drive
drive.mount("/content/gdrive")

In [None]:
!unzip -q /content/gdrive/MyDrive/Диплом/Dataset.zip -d dataset

In [None]:
import torch
import os
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import random
import copy

from matplotlib import colors, pyplot as plt
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from sklearn.preprocessing import LabelEncoder
from PIL import Image
from torchvision import models
from tqdm import tqdm_notebook
from matplotlib.patches import Rectangle
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
print(device)

In [None]:
random.seed(0)
torch.manual_seed(0)
np.random.seed(0)

In [None]:
main_path = os.path.join("dataset/Dataset")
images_path = os.path.join(main_path,"image")
labels_path = os.path.join(main_path,"label")
bboxes = ['apple_bbox.txt','bottle_bbox.txt','yogurt_bbox.txt']
images = ['apple', 'bottle', 'yogurt','empty']

In [None]:
bboxes

In [None]:
def upload_bbox(l_path:str, item):
  path = os.path.join(l_path,item)
  with open(path,"r") as f:
    bbox_array = [list(map(float, i[:-1].split())) for i in f.readlines()]
    return bbox_array

def upload_image(im_path, item):
    print(f"upload: {item}")
    path = os.path.join(im_path, item)
    image_num = [f"{i + 1}.jpg" for i in range(len(os.listdir(path)))]
    return [os.path.join(path, i) for i in image_num]

In [None]:
a = upload_bbox(labels_path,bboxes[0])
b = upload_image(images_path,images[0])
assert len(a) == len(b)

In [None]:
def normalize(image: Image, mean: list, std : list):
    transform = transforms.Compose([ transforms.ToTensor(),
                         transforms.Normalize(mean, std)])
    return transform(image)

def create_dataset(im_list, mean, std):
    new_dataset = []
    for index, item in enumerate(im_list):
        path = os.path.join(images_path,item)
        if index == 3:
            # numbering is not important
            for image in os.listdir(path):
                image = normalize(Image.open(image_name),mean,std)
                new_dataset.append((image,index, torch.Tensor([0,0,0,0])))
        else:
            bbox = upload_bbox(labels_path,bboxes[index])
            for i in range(len(os.listdir(path))):
                image_name = os.path.join(path,f"{str(i+1)}.jpg")
                image = normalize(Image.open(image_name),mean,std)
                t_bbox = torch.Tensor(bbox[i])
                new_dataset.append((image,index,t_bbox))
    return new_dataset


In [None]:
data = create_dataset(images,[0.485, 0.456, 0.406], [0.229, 0.224, 0.225])

In [None]:
for image,label,bbox in data:
  print(type(image))
  print(type(label))
  print(type(bbox))
  break

In [None]:
from sklearn.model_selection import train_test_split
train_dataset, val_test_dataset = train_test_split(data,test_size = 0.35, shuffle =True,random_state=42)
print(f"Размер тренировочного датасета {len(train_dataset)} изображений")
print(f"Размер валидационного датасета {len(val_test_dataset)} изображений")

In [None]:
class ForDataLoader():
    def __init__(self,dataset):
        self.data = dataset
    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        x = self.data[idx][0]
        y_class = self.data[idx][1]
        y_bb = self.data[idx][2]
        return x, y_class, y_bb

In [None]:
k1 = ForDataLoader(train_dataset)
k2 = ForDataLoader(val_test_dataset)
train_loader = DataLoader(k1,
                          batch_size = 16,
                          shuffle=True)
val_loader = DataLoader(k2,
                          batch_size = 8)

In [None]:
for image,y,bbox in train_loader:
  print(image.shape)
  print(y.shape)
  print(bbox.shape)
  break

In [None]:
class BB_model(nn.Module):
    def __init__(self):
        super(BB_model, self).__init__()
        resnet = models.resnet34(pretrained=True)
        layers = list(resnet.children())[:8]
        self.features1 = nn.Sequential(*layers[:6])
        self.features2 = nn.Sequential(*layers[6:])
        self.classifier = nn.Sequential(nn.BatchNorm1d(512), nn.Linear(512, 4),nn.Sigmoid())
        self.bb = nn.Sequential(nn.BatchNorm1d(512), nn.Linear(512,4))
        
    def forward(self, x):
        x = self.features1(x).to(device)
        x = self.features2(x).to(device)
        x = F.relu(x)
        x = nn.AdaptiveAvgPool2d((1,1))(x)
        x = x.view(x.shape[0], -1)
        return self.classifier(x).to(device), self.bb(x).to(device)
model = BB_model()

In [None]:
#from torchsummary import summary
#summary(model.cuda(), (3, 256, 256))

In [None]:
def update_optimizer(optimizer, lr):
    for i, param_group in enumerate(optimizer.param_groups):
        param_group["lr"] = lr

def show_graph(hist):
    t_loss,v_loss,t_acc,v_acc = [], [], [], []
    for train_loss,val_loss,train_acc,val_acc in hist:
        t_loss.append(train_loss)
        v_loss.append(val_loss)
        t_acc.append(train_acc)
        v_acc.append(val_acc)

    fig = plt.figure(figsize=(16,16))
    ax1 = plt.subplot2grid((2,1), (0,0))
    ax2 = plt.subplot2grid((2,1), (1,0), sharex=ax1)

    ax1.plot(t_loss, label="train_loss")
    ax1.plot(v_loss, label="val_loss")
    ax1.legend(loc=2)
    ax2.plot(t_acc, label="train_acc")
    ax2.plot(v_acc, label="val_acc")
    ax2.legend(loc=2)
    plt.show()

model = BB_model().cuda()
parameters = filter(lambda p: p.requires_grad, model.parameters())
optimizer = torch.optim.Adam(parameters, lr=0.003)

In [None]:
def save_model(model,acc,path):
    if not(os.path.exists(path)):
        os.mkdir(path)
    torch.save(model.state_dict(),os.path.join(path,str(acc)+".pth"))
    

def val_metrics(model, class_loss, bbox_loss, valid_dl, C=1000, const = -1,flag = False):
    model.eval()
    total, sum_loss, correct = 0, 0, 0 
    for x, y_class, y_bb in valid_dl:

        batch = y_class.shape[0]
        x = x.to(device).float()
        y_class = y_class.to(device)
        y_bb = y_bb.to(device).float()

        with torch.no_grad():
            out_class, out_bb = model(x)
            loss_class = class_loss(out_class, y_class, reduction="sum")
            loss_bb = bbox_loss(out_bb, y_bb, reduction="none").sum(1)
            loss_bb = loss_bb.sum()
            loss = loss_class + loss_bb/C
        _, pred = torch.max(out_class, 1)
        correct += pred.eq(y_class).sum().item()
        sum_loss += loss.item()
        total += batch
    val_correct = correct/total
    if val_correct >= const and flag:
        save_model(model,val_correct,path = "saved_models")
        const = val_correct 
    return sum_loss/total, val_correct

def train_epocs(model, optimizer,class_loss, bbox_loss, train_dl, val_dl, epochs=10,C=1000,flag = False):
    hist = []
    for i in range(epochs):
        model.train()
        total, sum_loss, correct = 0, 0, 0 
        for x, y_class, y_bb in train_dl:
            batch = y_class.shape[0]
            x = x.to(device).float()
            y_class = y_class.to(device)
            y_bb = y_bb.to(device).float()
            full_loss = torch.Tensor()
            out_class, out_bb = model(x)

            loss_bb = bbox_loss(out_bb, y_bb, reduction="none").sum(1)
            loss_bb = loss_bb.sum()
            full_loss += (loss_class + loss_bb/C)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            total += batch
            sum_loss += loss.item()
            correct += pred.eq(y_class).sum().item()
        train_loss = sum_loss/total
        train_acc = correct/total
        val_loss, val_acc = val_metrics(model,class_loss, bbox_loss, val_dl, C, flag=flag)
        hist.append((train_loss,val_loss,train_acc,val_acc))
        print("train_loss %.3f train_acc %.3f val_loss %.3f val_acc %.3f" % (train_loss, train_acc, val_loss, val_acc))
    return hist

In [None]:
device = torch.device('cuda')
#class_loss = F.cross_entropy
#bbox_loss = F.l1_loss
class_loss == nn.MSELoss()
bbox_loss == nn.L1Loss()
history = train_epocs(model.to(device), optimizer,class_loss, bbox_loss, train_loader, val_loader,C =250, epochs=10)
show_graph(history)

In [None]:
update_optimizer(optimizer, 0.0003)
bbox_loss = F.mse_loss
device = "cuda"
history = train_epocs(model.to(device), optimizer,class_loss, bbox_loss, train_loader, val_loader, C=100,epochs=100)
show_graph(history)

In [None]:
update_optimizer(optimizer, 0.0003)
#bbox_loss = F.l1_loss
history = train_epocs(model.cuda(), optimizer,class_loss, 
                      bbox_loss, train_loader, val_loader,
                      flag = True,C=10,epochs=10)
show_graph(history)

In [None]:
update_optimizer(optimizer, 0.00001)
#bbox_loss = F.l1_loss
history = train_epocs(model.cuda(), optimizer,class_loss, 
                      bbox_loss, train_loader, val_loader,
                      flag = True,C=10,epochs=25)

In [None]:
def transform_image(image:torch.Tensor, mean :list, std:list):

  image = image * torch.tensor(std).view(3, 1, 1)
  image = image + torch.tensor(mean).view(3, 1, 1)
  image = transforms.ToPILImage(mode='RGB')(image)
  return image

In [None]:
def show_bbox(bbox_array,color = "red"):
    bbox = bbox_array
    return Rectangle(bbox[0:2],width=bbox[2]-bbox[0],height=bbox[3]-bbox[1],color=color,fill=False,lw=3)

In [None]:
def IOU(boxA, boxB):
	xA = max(boxA[0], boxB[0])
	yA = max(boxA[1], boxB[1])
	xB = min(boxA[2], boxB[2])
	yB = min(boxA[3], boxB[3])
	interArea = max(0, xB - xA + 1) * max(0, yB - yA + 1)
	boxAArea = (boxA[2] - boxA[0] + 1) * (boxA[3] - boxA[1] + 1)
	boxBArea = (boxB[2] - boxB[0] + 1) * (boxB[3] - boxB[1] + 1)
	iou = interArea / float(boxAArea + boxBArea - interArea)
	return iou

In [None]:
def show_predict(model,loader):
  predict_label = ["Яблоко", "Вода","Йогурт"]
  num_image = 9

  fig = plt.figure(figsize=(16,16))
  random_index = np.random.randint(0, len(val_test_dataset),size=9)
  for i in range(num_image):
    # part of predict neural network
    a = fig.add_subplot(3, 3, i + 1)
    image, label, bbox1 = loader.dataset[random_index[i]]
    image_for_graph = copy.deepcopy(image)
    image = image.unsqueeze(0)
    pred,bbox2 = model(image)
    predict = pred.argmax(1)
    #print(predict)
    bbox2 = bbox2.detach().numpy()
    #part of visualization
    image_for_graph = transform_image(image_for_graph,[0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    plt.gca().add_patch(show_bbox(bbox2[0]))
    plt.imshow(image_for_graph)
    a.set_title(f"IOU: {np.around(IOU(bbox1,bbox2[0]))}\nPredict :{predict_label[int(predict)]}")

    

In [None]:
device = "cpu"
show_predict(model.to(device),val_loader)

In [None]:
torch.save(model.state_dict(),"last1.pth")

In [None]:
ls
