In [1]:
# This serves as a template which will guide you through the implementation of this task.  It is advised
# to first read the whole template and get a sense of the overall structure of the code before trying to fill in any of the TODO gaps
# First, we import necessary libraries:
import numpy as np
from torchvision import transforms
from torch.utils.data import DataLoader, TensorDataset
import os
import torch
from torchvision import transforms
import torchvision.datasets as datasets
import torch.nn as nn
import torch.nn.functional as F
import torchvision.models as models
import torch.optim as optim



device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

def generate_embeddings():
    """
    Transform, resize and normalize the images and then use a pretrained model to extract 
    the embeddings.
    """
    # TODO: define a transform to pre-process the images
    #1st resize it to a gerneral size, then converts the image into a PyTorch tensor and finally normalize it
    train_transforms = transforms.Compose([transforms.Resize((224,224)),
                                          transforms.ToTensor(),
                                          transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])]) 

    train_dataset = datasets.ImageFolder(root="dataset/", transform=train_transforms)
    # Hint: adjust batch_size and num_workers to your PC configuration, so that you don't run out of memory
    train_loader = DataLoader(dataset=train_dataset,
                              batch_size=64, #change: ?
                              shuffle=False,
                              pin_memory=True, num_workers=8) #change:was 16

    # TODO: define a model for extraction of the embeddings (Hint: load a pretrained model,
    #  more info here: https://pytorch.org/vision/stable/models.html)
    model = models.resnet50(pretrained=True)
    
    embeddings = []
    embedding_size = 2048
    
    num_images = len(train_dataset)
    embeddings = np.zeros((num_images, embedding_size))
    # TODO: Use the model to extract the embeddings. Hint: remove the last layers of the model to access the embeddings the model generates. 
    model = nn.Sequential(*list(model.children())[:-1])  # remove the last layer
    model.eval()
    
    with torch.no_grad(): #disable gradiant computation to speed up computation
        batch_size = train_loader.batch_size
        for i, (image, _) in enumerate(train_loader):
            print('batch size:', batch_size)
            this_batch_size = image.shape[0]
            embeddings[i*batch_size:(i*batch_size)+this_batch_size] = model(image).detach().numpy().reshape(this_batch_size,embedding_size)#gibt mir ein vektor von embeddings von unserem bild welcher horizontal eingefügt wird
    
    print("embeddings done")
    np.save('/Users/ewern/workspace/IML/iml_wpower/eva/Task 3/dataset/embeddings.npy', embeddings)

def get_data(file, train=True):
    """
    Load the triplets from the file and generate the features and labels.

    input: file: string, the path to the file containing the triplets
          train: boolean, whether the data is for training or testing

    output: X: numpy array, the features
            y: numpy array, the labels
    """
    triplets = []  #here we have a triplets list with triplets
    with open(file) as f:
        for line in f:
            triplets.append(line)

    # generate training data from triplets
    train_dataset = datasets.ImageFolder(root="/Users/ewern/workspace/IML/iml_wpower/eva/Task 3/dataset/",
                                         transform=None)
    filenames = [s[0].split('/')[-1].replace('.jpg', '') for s in train_dataset.samples]
    filenames = [s.replace('food\\','') for s in filenames]
    embeddings = np.load('/Users/ewern/workspace/IML/iml_wpower/eva/Task 3/dataset/embeddings.npy')
    # TODO: Normalize the embeddings across the dataset
    embeddings /= np.linalg.norm(embeddings, axis=1, keepdims=True)

    file_to_embedding = {}
    for i in range(len(filenames)):
        file_to_embedding[filenames[i]] = embeddings[i]
    X = []
    y = []
    # use the individual embeddings to generate the features and labels for triplets
    for t in triplets:  #wir gehen durch jedes triplet
        emb = [file_to_embedding[a] for a in t.split()]
        X.append(np.hstack([emb[0], emb[1], emb[2]])) #stacks arrays in one array so all our embeddings are now in one array
        y.append(1)
        # Generating negative samples (data augmentation)
        if train:
            X.append(np.hstack([emb[0], emb[2], emb[1]]))
            y.append(-1)
    X = np.vstack(X)
    y = np.hstack(y)
    return X, y

# Hint: adjust batch_size and num_workers to your PC configuration, so that you don't run out of memory
def create_loader_from_np(X, y = None, train = True, batch_size=32, shuffle=True, num_workers = 4):#change of b_s 64
    """
    Create a torch.utils.data.DataLoader object from numpy arrays containing the data.
    input: X: numpy array, the features
           y: numpy array, the labels
    output: loader: torch.data.util.DataLoader, the object containing the data
    """
    if train:
        dataset = TensorDataset(torch.from_numpy(X).type(torch.float), 
                                torch.from_numpy(y).type(torch.long))
    else:
        dataset = TensorDataset(torch.from_numpy(X).type(torch.float))
    loader = DataLoader(dataset=dataset,
                        batch_size=batch_size,
                        shuffle=shuffle,
                        pin_memory=True, num_workers=num_workers)
    return loader

# TODO: define a model. Here, the basic structure is defined, but you need to fill in the details
class Net(nn.Module):
    """
    The model class, which defines our classifier.
    """
    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(6144, 2048) #input: 3x2048
        self.bn1 = nn.BatchNorm1d(2048)
        self.fc2 = nn.Linear(2048, 512)
        #self.bn2 = nn.BatchNorm1d(512)
        self.fc3 = nn.Linear(512, 128)
        self.out = nn.Linear(128, 1) #output:1
        self.dropout = nn.Dropout(p=0.5)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = self.bn1(x)
        x = F.relu(self.fc2(x))
        #x = self.bn2(x)
        x = F.relu(self.fc3(x))
        x = self.dropout(x)
        #x = torch.sigmoid(self.out(x))
        x = torch.tanh(self.out(x))
        return x

def train_model(train_loader):
    """
    The training procedure of the model; it accepts the training data, defines the model 
    and then trains it.
    input: train_loader: torch.data.util.DataLoader, the object containing the training data
    output: model: torch.nn.Module, the trained model
    """
    model = Net()
    model.train()
    model.to(device)
    n_epochs = 15
    # TODO: define a loss function, optimizer and proceed with training.  After each epoch, compute the loss on the 
    # validation split and print it out. This enables you to see how your model is performing 
    # on the validation data before submitting the results on the server. After choosing the 
    # best model, train it on the whole training data.
    #as seen in the demo
    #criterion = nn.CrossEntropyLoss() not working with nn output ==1
    criterion = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters())
    
    print('starting model training')
    #"""
    num_samples = len(train_loader.dataset)
    num_train_samples = int(num_samples * 0.8)
    num_val_samples = num_samples - num_train_samples
    train_set, val_set = torch.utils.data.random_split(train_loader.dataset, [num_train_samples, num_val_samples])
    train_loader = torch.utils.data.DataLoader(train_set, batch_size=train_loader.batch_size)
    val_loader = torch.utils.data.DataLoader(val_set, batch_size=train_loader.batch_size)

    best_val_loss = float('inf')
    best_model = model
    
    for epoch in range(n_epochs):
        train_loss = 0
        for [X, y] in train_loader:
            X, y = X.to(device), y.to(device)
            optimizer.zero_grad()
            y_pred = model(X).squeeze()
            loss = criterion(y_pred.float(), y.float())
            train_loss += loss.item()
            loss.backward()
            optimizer.step()

        train_loss /= len(train_loader)
        
        val_loss = 0
        model.eval()
        with torch.no_grad():
            for [X, y] in val_loader:
                X, y = X.to(device), y.to(device)
                y_pred = model(X).squeeze()
                loss = criterion(y_pred.float(), y.float())
                val_loss += loss.item()

        val_loss /= len(val_loader)
        print(f'Epoch [{epoch+1}/{n_epochs}], Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}')

        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_model = model

    # Train the best model on the whole training set
    best_model.train()
    for [X, y] in train_loader:
        X, y = X.to(device), y.to(device)
        optimizer.zero_grad()
        y_pred = best_model(X).squeeze()
        loss = criterion(y_pred.float(), y.float())
        loss.backward()
        optimizer.step()
    
    """
    running_loss = 0
    for epoch in range(n_epochs):        
        for i,[X, y] in enumerate(train_loader):
            optimizer.zero_grad()
            y_pred = torch.squeeze(model(X))
            loss = criterion(y_pred.float(), y.float())
            loss.backward()
            optimizer.step() 
            running_loss += loss.item()
            if i % 200 == 199:
                print(f'[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 200:.3f}')
                running_loss = 0.0
    """
    return model

def test_model(model, loader):
    """
    The testing procedure of the model; it accepts the testing data and the trained model and 
    then tests the model on it.

    input: model: torch.nn.Module, the trained model
           loader: torch.data.util.DataLoader, the object containing the testing data
        
    output: None, the function saves the predictions to a results.txt file
    """
    model.eval()
    predictions = []
    # Iterate over the test data
    with torch.no_grad(): # We don't need to compute gradients for testing
        for [x_batch] in loader:
            x_batch= x_batch.to(device)
            predicted = model(x_batch)
            predicted = predicted.cpu().numpy()
            print(predicted)
            # Rounding the predictions to 0 or 1
            predicted[predicted >= 0.5] = 1
            predicted[predicted < 0.5] = 0
            
            predictions.append(predicted)
        predictions = np.vstack(predictions)
    np.savetxt("results.txt", predictions, fmt='%i')


# Main function. You don't have to change this
if __name__ == '__main__':
    TRAIN_TRIPLETS = 'train_triplets.txt'
    TEST_TRIPLETS = 'test_triplets.txt'
    """
    # generate embedding for each image in the dataset
    if(os.path.exists('dataset/embeddings.npy') == False):
        generate_embeddings()
    """
    # load the training and testing data
    X, y = get_data(TRAIN_TRIPLETS)
    X_test, _ = get_data(TEST_TRIPLETS, train=False)

    # Create data loaders for the training and testing data
    train_loader = create_loader_from_np(X, y, train = True, batch_size=64)
    test_loader = create_loader_from_np(X_test, train = False, batch_size=2048, shuffle=False)

    # define a model and train it
    model = train_model(train_loader)
    
    # test the model on the test data
    test_model(model, test_loader)
    print("Results saved to results.txt")


FileNotFoundError: [Errno 2] No such file or directory: '/Users/ewern/workspace/IML/iml_wpower/eva/Task 3/dataset/embeddings.npy'