In [None]:
from google.colab import drive
drive.mount('/content/drive')

#Architecture

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from tqdm import tqdm
import torch.optim as optim
import torchvision.models as models

#output - vector of vectors, labels - vactor
def accuracy(outputs, labels):
    return torch.tensor(torch.sum(outputs.argmax(1) == labels.argmax(1)).item() / len(labels))

#implement Base for Classification
class ImageClassificationBase(nn.Module):
    def training_step(self, batch):
        #print(batch.shape)
        #print()
        #batch - (img, label)
        self.train()
        images, labels = batch 
        out = self(images)                  # Generate predictions
        loss = F.cross_entropy(out, labels) # Calculate loss
        return loss
    
    def validation_step(self, batch):
        #batch - (img, label)
        self.eval() 
        images, labels = batch 
        out = self(images)                    # Generate predictions
        loss = F.cross_entropy(out, labels)   # Calculate loss
        acc = accuracy(out, labels)           # Calculate accuracy
        return {'val_loss': loss.detach(), 'val_acc': acc}
        

    def validation_epoch_end(self, outputs):
        #outputs - list of dicts of validations 
        batch_losses = [x['val_loss'] for x in outputs]
        epoch_loss = torch.stack(batch_losses).mean()   # Combine losses
        batch_accs = [x['val_acc'] for x in outputs]
        epoch_acc = torch.stack(batch_accs).mean()      # Combine accuracies
        return {'val_loss': epoch_loss.item(), 'val_acc': epoch_acc.item()}
    
    def epoch_end(self, epoch, result):
        print(f"Epoch {epoch+1}: train_loss: {result['train_loss']}, val_loss: {result['val_loss']}, val_acc: {result['val_acc']}")

class ImageClassificationInception(nn.Module):
    def training_step(self, batch):
        #print(batch.shape)
        #print()
        #batch - (img, label)
        self.train() 
        images, labels = batch 
        outputs = self(images) 
        loss1 = F.cross_entropy(outputs, labels)
        loss = loss1
        return loss
    
    def validation_step(self, batch):
        #batch - (img, label)
        self.eval() 
        images, labels = batch 
        outputs = self(images) 
        loss1 = F.cross_entropy(outputs, labels)
        loss = loss1
        acc = accuracy(outputs, labels)           # Calculate accuracy
        return {'val_loss': loss.detach(), 'val_acc': acc}
        

    def validation_epoch_end(self, outputs):
        #outputs - list of dicts of validations 
        batch_losses = [x['val_loss'] for x in outputs]
        epoch_loss = torch.stack(batch_losses).mean()   # Combine losses
        batch_accs = [x['val_acc'] for x in outputs]
        epoch_acc = torch.stack(batch_accs).mean()      # Combine accuracies
        return {'val_loss': epoch_loss.item(), 'val_acc': epoch_acc.item()}
    
    def epoch_end(self, epoch, result):
        print(f"Epoch {epoch+1}: train_loss: {result['train_loss']}, val_loss: {result['val_loss']}, val_acc: {result['val_acc']}")

In [2]:
classDecode = {1:"paper/cardboard", 2: "metal", 3: "plastic", 4: "glass"}
class ResNet(ImageClassificationBase):
    def __init__(self):
        super().__init__()
        # Use a pretrained model
        self.network = models.resnet50(pretrained=True)
        # Replace last layer
        num_ftrs = self.network.fc.in_features
        self.network.fc = nn.Linear(num_ftrs, len(classDecode.values()))
    
    def forward(self, xb):
        o = self.network(xb)
        return F.softmax(o)

class DenseNet(ImageClassificationBase):
    def __init__(self):
        super().__init__()
        self.network = models.densenet121(pretrained=True)
        self.network.classifier = nn.Linear(1024, len(classDecode.values()))
    
    def forward(self, xb):
        o = self.network(xb)
        return F.softmax(o)

class InceptionNet(ImageClassificationInception):
    def __init__(self):
        super().__init__()
        self.network = models.inception_v3(pretrained=True)
        self.network.aux_logits=False
        num_ftrs = self.network.AuxLogits.fc.in_features
        self.network.AuxLogits.fc = nn.Linear(num_ftrs, len(classDecode.values()))
        num_ftrs = self.network.fc.in_features
        self.network.fc = nn.Linear(num_ftrs,len(classDecode.values()))
    
    def forward(self, xb):
        o = self.network(xb)
        return F.softmax(o)


class VGGNet(ImageClassificationBase):
    def __init__(self):
        super().__init__()
        # Use a pretrained model
        self.network = models.vgg16(pretrained=True)
        # Replace last layer
        self.network.classifier[6] = torch.nn.Linear(4096, len(classDecode.values()))
    
    def forward(self, xb):
        o = self.network(xb)
        return F.softmax(o)

class Net(ImageClassificationBase):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(3, 16, 3, padding=(1,1))
        self.conv2 = nn.Conv2d(16, 16, 3, padding=(1,1))
        self.conv3 = nn.Conv2d(16, 32, 3, padding=(1,1))
        self.conv4 = nn.Conv2d(32, 32, 3, padding=1)
        self.conv5 = nn.Conv2d(32, 64, 3, padding=1)
        self.conv6 = nn.Conv2d(64, 32, 3, padding=1)
        self.pool = nn.MaxPool2d(2, 2)
        self.dropout = nn.Dropout(0.2)
        self.fc1 = nn.Linear(32 * 4 * 4 , 128)
        self.fc3 = nn.Linear(128, 6)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = self.pool(F.relu(self.conv3(x)))
        x = self.pool(F.relu(self.conv4(x)))
        x = self.pool(F.relu(self.conv5(x)))
        x = self.pool(F.relu(self.conv6(x)))
        #print(x.shape)
        x = x.view(x.size(0), -1)
        x = F.relu(self.fc1(x))
        x = self.fc3(x)
        return F.softmax(x)


# Model loading

In [3]:
def get_default_device():
    if torch.cuda.is_available():
        return torch.device('cuda')
    else:
        return torch.device('cpu')

def to_device(data, device):
    if isinstance(data, (list,tuple)):
        return [to_device(x, device) for x in data]
    return data.to(device, non_blocking=True)


In [4]:
import cv2
import os
import shutil
import pandas as pd
import os.path
import time

def read_and_resize(filename, grayscale = False, fx= 1, fy=1):
    #read file
    if grayscale:
        img_result = cv2.imread(filename, cv2.IMREAD_GRAYSCALE)
    else:
        imgbgr = cv2.imread(filename, cv2.IMREAD_COLOR)
        img_result = cv2.cvtColor(imgbgr, cv2.COLOR_BGR2RGB)
    img_result = cv2.resize(img_result, None, fx=fx, fy=fy, interpolation = cv2.INTER_CUBIC)
    return img_result

def getImageFromFolderAndMove(pathToImage, pathNewFolderImages):
    #get image and move produced image to another folder
    out = []
    if not os.path.isdir(pathNewFolderImages):
        os.mkdir(pathNewFolderImages)
    if len(os.listdir(pathToImage)) == 0:
        print("Folder toSort is empty!")
    for filename in os.listdir(pathToImage):
        imgPath = (pathToImage+"/").replace("//", "/")+filename
        out = read_and_resize(imgPath)
        shutil.move(imgPath, (pathNewFolderImages+"/").replace("//", "/")+filename)
        return [out, filename]

def resizeImage(img, size):
    #reshape to the shape of (size, size)
    return cv2.resize(img, (256,256), interpolation = cv2.INTER_AREA).astype("float32")/255

def convertImg(img, device):
    #prepare image to pass it to the model
    return torch.tensor([img.swapaxes(2,1).swapaxes(1,0)]).to(device)

def passToModel(model, image):
    return model(image).argmax(1)

def loadModel(pathTo, device):
    model = to_device(DenseNet(), device)
    model.load_state_dict(torch.load(pathTo, map_location=device))
    return model

def logInfo(pathToCsv, imgName, predict, predictClass):
    if not os.path.isfile(pathToCsv): 
        df = pd.DataFrame({'name': [],
                   'class': [],
                   'classname': []})
    else:
        df = pd.read_csv(pathToCsv)
    new_row = pd.DataFrame({'name': [imgName],
                   'class': [predict],
                   'classname': [predictClass]})
    df = df.append(new_row, ignore_index=True)
    df.to_csv(pathToCsv, index=False)


def getResizePass(pathToModel, pathToImage, pathNewFolderImages, csvFilePath, classDecode):
    device = get_default_device()
    model = loadModel(pathToModel, device)
    model.eval()
    img, filename = getImageFromFolderAndMove(pathToImage, pathNewFolderImages)
    resImg = convertImg(resizeImage(img, 256), device)

    out = passToModel(model, resImg)
    cls = classDecode[int(out[0]+1)]
    numCls = int(out[0]+1)
    log = logInfo(csvFilePath, filename, numCls, cls)
    print(f"Model predicted class {numCls}, which is a {cls}")
    
def main():
    googlePath = "./drive/MyDrive/"
    localPath = "./processing/"
    path = googlePath
    pathToModel = path + "model/denceNetModel.pt"  # pretrained model. It takes RGB images with size 256x256
    imgPath = path + "toSort"  # folder with all images to sort
    imgNewPath = path + "sorted"  # folder with all sorted images
    csvPath = path + "logGarbage.csv"  # file for the logging
    classDecoder = {1: "paper/cardboard", 2: "metal", 3: "plastic", 4: "glass"}  # all classes

    start = time.time()
    getResizePass(pathToModel, imgPath, imgNewPath, csvPath, classDecoder)
    end = time.time()
    print(f"Time of execution is {end - start} seconds!")

In [5]:
main()



FileNotFoundError: [Errno 2] No such file or directory: './drive/MyDrive/model/denceNetModel.pt'