In [None]:
import os
import pandas as pd
import numpy
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
import imagesize
from torchvision import datasets, models, transforms
import seaborn as sns
from PIL import Image
from PIL.ImageFile import ImageFile
import torch
from torch import nn, optim
from torch.optim import lr_scheduler
from torch.profiler import profile, record_function, ProfilerActivity
import optuna
from efficientnet_pytorch import EfficientNet

In [None]:
#vars
ImageFile.LOAD_TRUNCATED_IMAGES = True

#batch_size = 128
n_epochs = 15
num_trials = 70
validation_split = .25
images_for_train = 30000
print_every = 100

name_training = "efficient3_hyperparameters_wi"
os.mkdir(name_training)

In [None]:
device = torch.device("cuda:3" if torch.cuda.is_available() else "cpu")
# Assuming that we are on a CUDA machine, this should print a CUDA device:

ImageFile.LOAD_TRUNCATED_IMAGES = True
print(device)
print(torch.cuda.get_device_name(3))

In [None]:
data_path = "data"
#data_path = "D:\Studium\OneDrive - Hochschule Karlsruhe\own-pictures"

image = []
labels = []
width = []
height = []

for file in os.listdir(data_path):
    if file == 'glas':
        for c in os.listdir(os.path.join(data_path, file)):
            image.append(c)
            labels.append('glas')
            w, h = imagesize.get(data_path + "/" + file + "/" + c)
            width.append(w), height.append(h)
    if file == 'organic':
        for c in os.listdir(os.path.join(data_path, file)):
            image.append(c)
            labels.append('organic')
            w, h = imagesize.get(data_path + "/" + file + "/" + c)
            width.append(w), height.append(h)
    if file == 'paper':
        for c in os.listdir(os.path.join(data_path, file)):
            image.append(c)
            labels.append('paper')
            w, h = imagesize.get(data_path + "/" + file + "/" + c)
            width.append(w), height.append(h)
    if file == 'restmuell':
        for c in os.listdir(os.path.join(data_path, file)):
            image.append(c)
            labels.append('restmuell')
            w, h = imagesize.get(data_path + "/" + file + "/" + c)
            width.append(w), height.append(h)
    if file == 'wertstoff':
        for c in os.listdir(os.path.join(data_path, file)):
            image.append(c)
            labels.append('wertstoff')
            w, h = imagesize.get(data_path + "/" + file + "/" + c)
            width.append(w), height.append(h)

data = {'Images': image, 'labels': labels, 'width': width, 'height': height}
data = pd.DataFrame(data)
lb = LabelEncoder()
data['encoded_labels'] = lb.fit_transform(data['labels'])

In [None]:
data = data.sample(frac=1).reset_index(drop=True)
data = data.head(images_for_train)
tr, val = train_test_split(data, stratify=data.labels, test_size=validation_split)
tr.reset_index(drop=True)
val.reset_index(drop=True)
print(val)

In [None]:
waste_types_df = tr[['encoded_labels', 'labels']].drop_duplicates().sort_values(by='encoded_labels').reset_index(drop=True)
garbage_types = {}
for i in range(0, len(waste_types_df)):
    garbage_types[i] = waste_types_df.iloc[i].labels
print(garbage_types)

In [None]:
data

In [None]:
from torchvision import transforms

transform_tr = transforms.Compose(
    [transforms.RandomHorizontalFlip(),
     #transforms.RandomRotation(10),
     transforms.Resize((256,256), interpolation= transforms.InterpolationMode.BICUBIC),
     #transforms.CenterCrop(290),
     #transforms.Resize((300,300), interpolation= transforms.InterpolationMode.BICUBIC),
     transforms.ToTensor(),
     transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),
        ])
transform_val = transforms.Compose(
    [transforms.Resize((256,256), interpolation= transforms.InterpolationMode.BICUBIC),
     transforms.ToTensor(),
     transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),
        ])

In [None]:
from torch.utils.data import Dataset

class GarbageDataset(Dataset):
    def __init__(self, img_data, img_path, transform=None):
        self.img_path = img_path
        self.transform = transform
        self.img_data = img_data

    def __len__(self):
        return len(self.img_data)

    def __getitem__(self, index):
        img_name = os.path.join(self.img_path, self.img_data.iloc[index].labels, self.img_data.iloc[index].Images)
        image = Image.open(img_name).convert('RGB')
        #image = image.resize((300, 300))
        label = torch.tensor(self.img_data.iloc[index].encoded_labels)
        if self.transform is not None:
            image = self.transform(image)
        return image, label, img_name

In [None]:
from torch.utils.data import DataLoader

train_dataset = GarbageDataset(tr, data_path, transform_tr)
test_dataset = GarbageDataset(val, data_path, transform_val)


In [None]:
import numpy as np

def img_display(img):
    MEAN = torch.tensor([0.485, 0.456, 0.406])
    STD = torch.tensor([0.229, 0.224, 0.225])

    img = img * STD[:, None, None] + MEAN[:, None, None]
    #img = img / 2 + 0.5  # unnormalize
    npimg = img.numpy()
    npimg = np.transpose(npimg, (1, 2, 0))
    return npimg


In [None]:
def accuracy(out, labels):
    _, pred = torch.max(out, dim=1)
    return torch.sum(pred == labels).item()

In [None]:
def get_model(model_name):
    if model_name == "resnet18":
        model = models.resnet18(pretrained=True)
        in_features = model.fc.in_features
        model.fc = nn.Linear(in_features, len(waste_types_df))
    elif model_name == "alexnet":
        model = models.alexnet(pretrained=True)
        in_features = model.classifier[1].in_features
        model.classifier = nn.Linear(in_features, len(waste_types_df))
    elif model_name == "vgg16":
        model = models.vgg16(pretrained=True)
        in_features = model.classifier[0].in_features
        model.classifier = nn.Linear(in_features, len(waste_types_df))
    elif model_name == "resnet50":
        model = models.resnet50(pretrained=True)
        in_features = model.fc.in_features
        model.fc = nn.Linear(in_features, len(waste_types_df))
    elif model_name == "efficientnet_b3":
        model = EfficientNet.from_pretrained("efficientnet-b3",num_classes=len(waste_types_df))
    elif model_name == "resnet34":
        model = models.resnet34(pretrained=True)
        in_features = model.fc.in_features
        model.fc = nn.Linear(in_features, len(waste_types_df))
    return model


def objective(trial):
    # Hyperparameters we want optimize
    params = {
        #"model_name": trial.suggest_categorical('model_name',["resnet50", "alexnet", "vgg16"]),
        "lr": trial.suggest_loguniform('lr', 1e-4, 1e-2),
        "optimizer_name": trial.suggest_categorical('optimizer_name', ["SGD", "Adam", "Adagrad", "RMSprop"]),
        "batch_size": trial.suggest_categorical("batch_size", [16, 32, 64, 128]),
    }
    #For SGD-Optimizer add momentum parameter

    # Get pretrained model
    model = get_model("efficientnet_b3")
    model = model.to(device)

    # Define criterion
    criterion = nn.CrossEntropyLoss()

    # Configure optimizer
    optimizer = getattr(torch.optim, params["optimizer_name"])(model.parameters(), lr=params["lr"])

    if params["optimizer_name"] == "SGD":
        params["momentum"] = trial.suggest_loguniform('momentum', 0.85, 0.95)
        # Configure optimizer again for SGD
        optimizer = getattr(torch.optim, params["optimizer_name"])(model.parameters(), lr=params["lr"],
                                                                   momentum=params["momentum"])

    # Train a model
    best_model, best_loss = train_model(trial, model, criterion, optimizer, num_epochs=n_epochs,
                                        b_size=params["batch_size"])

    # Return accuracy (Objective Value) of the current trial
    return best_loss

In [None]:
def train_model(trial, net, criterion, optimizer, num_epochs, b_size, step_size=7, gamma=0.1):
    train_dataloader = DataLoader(train_dataset, batch_size=b_size, shuffle=True, num_workers=7, pin_memory=True)
    test_dataloader = DataLoader(test_dataset, batch_size=b_size, shuffle=True, num_workers=7, pin_memory=True)
    exp_lr_scheduler = lr_scheduler.StepLR(optimizer, step_size=step_size, gamma=gamma)

    val_loss = []
    val_acc = []
    train_loss = []
    train_acc = []
    valid_loss_min = np.Inf
    total_step = len(train_dataloader)

    for epoch in range(1, num_epochs + 1):
        running_loss = 0.0
        correct = 0
        total = 0
        print(f'Epoch {epoch}\n')
        for batch_idx, (data_, target_, _) in enumerate(train_dataloader):
            target_ = target_.type(torch.LongTensor)
            data_, target_ = data_.to(device, non_blocking=True), target_.to(device, non_blocking=True)
            # zero the parameter gradients
            for param in net.parameters():
                param.grad = None
            #optimizer.zero_grad()
            outputs = net(data_)
            loss = criterion(outputs, target_)

            loss.backward()
            optimizer.step()

            # print statistics
            running_loss += loss.item()
            _, pred = torch.max(outputs, dim=1)
            correct += torch.sum(pred == target_).item()
            total += target_.size(0)
            #if batch_idx % print_every == 0:
                #print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(epoch, num_epochs, batch_idx, total_step, loss.item()))
        train_acc.append(100 * correct / total)
        train_loss.append(running_loss / total_step)
        print(f'\ntrain loss: {np.mean(train_loss):.4f}, train acc: {(100 * correct / total):.4f}')
        batch_loss = 0
        total_t = 0
        correct_t = 0
        with torch.no_grad():
            net.eval()
            for data_t, target_t, _ in (test_dataloader):
                target_t = target_t.type(torch.LongTensor)
                data_t, target_t = data_t.to(device), target_t.to(device)  # on GPU
                outputs_t = net(data_t)
                loss_t = criterion(outputs_t, target_t)
                batch_loss += loss_t.item()
                _, pred_t = torch.max(outputs_t, dim=1)
                correct_t += torch.sum(pred_t == target_t).item()
                total_t += target_t.size(0)
            val_acc.append(100 * correct_t / total_t)
            val_loss.append(batch_loss / len(test_dataloader))
            network_learned = batch_loss < valid_loss_min
            print(f'validation loss: {np.mean(val_loss):.4f}, validation acc: {(100 * correct_t / total_t):.4f}\n')
            # Saving the best weight
            if network_learned:
                valid_loss_min = batch_loss
                torch.save(net.state_dict(), name_training + '/resnet.pt')
                best_model = net
                best_acc = (100 * correct_t / total_t)
                best_loss = np.mean(val_loss)
                print('Detected network improvement, saving current model')

        net.train()
        exp_lr_scheduler.step()

        trial.report(best_loss, epoch)

        if trial.should_prune():
            raise optuna.exceptions.TrialPruned()

    return best_model, best_loss

In [None]:
sampler = optuna.samplers.TPESampler()
study = optuna.create_study(
    sampler=sampler,
    pruner=optuna.pruners.MedianPruner(
        n_startup_trials=2, n_warmup_steps=5, interval_steps=3
    ),
    direction='minimize')
study.optimize(func=objective, n_trials=num_trials)

In [None]:
trial = study.best_trial

print('Loss: {}'.format(trial.value))
print("Best hyperparameters: {}".format(trial.params))
text_file = open(name_training + "/best_hyperparameters.txt", "w")
n = text_file.write(f'Loss: {trial.value}\nBest hyperparams: {trial.params}')
text_file.close()

In [None]:
import matplotlib.pyplot as plt
fig = optuna.visualization.matplotlib.plot_param_importances(study)
#fig.show()
plt.savefig(name_training + '/param_importances.png', format="png",bbox_inches='tight')
plt.savefig(name_training + '/param_importances.pdf', format='pdf',bbox_inches='tight')


In [None]:
fig = optuna.visualization.matplotlib.plot_optimization_history(study)
#fig.show()
plt.savefig(name_training + '/optim_history.png', format="png",bbox_inches='tight')
plt.savefig(name_training + '/optim_history.pdf', format='pdf',bbox_inches='tight')

In [None]:
fig = optuna.visualization.matplotlib.plot_intermediate_values(study)
#fig.show()
plt.savefig(name_training + '/intermediate_values.png', format='png',bbox_inches='tight')
plt.savefig(name_training + '/intermediate_values.pdf', format='pdf',bbox_inches='tight')

In [None]:
fig = optuna.visualization.matplotlib.plot_slice(study)
#fig.show()
plt.savefig(name_training + '/plot_slice.png', format='png',bbox_inches='tight')
plt.savefig(name_training + '/plot_slice.pdf', format='pdf',bbox_inches='tight')

In [None]:
fig = optuna.visualization.matplotlib.plot_contour(study)
#fig.show()
plt.savefig(name_training + '/plot_contour.png', format='png',bbox_inches='tight')
plt.savefig(name_training + '/plot_contour.pdf', format='pdf',bbox_inches='tight')