In [None]:
import pandas as pd
import numpy as np
import os, sys
import tqdm
import matplotlib.pyplot as plt

trainLabelPath = '../../hfactory_magic_folders/colas_data_challenge/computer_vision_challenge/dataset/labels_train.csv'
trainLabels = pd.read_csv(trainLabelPath)

In [None]:
categories = trainLabels.columns[1:6]

In [None]:
imageTrainPath = '../../hfactory_magic_folders/colas_data_challenge/computer_vision_challenge/dataset/train/'

In [None]:
! pip install -r ../../colasproject/modelResNet/requirements.txt

In [None]:
import torch
import cv2
import torchvision.transforms as transforms
from torch.utils.data import Dataset
from keras.preprocessing import image
from torchvision.transforms.autoaugment import AutoAugmentPolicy

class ImageDataset(Dataset):
    def __init__(self, csv: pd.DataFrame, train: bool, test: bool) -> Dataset():
        self.csv = csv
        self.train = train
        self.test = test
        self.all_image_names = self.csv[:]['filename']
        self.all_labels = np.array(self.csv.drop(['filename'], axis=1))
        self.train_ratio = int(0.85 * len(self.csv))
        self.valid_ratio = len(self.csv) - self.train_ratio
        # set the training data images and labels
        if self.train == True:
            print(f"Number of training images: {self.train_ratio}")
            self.image_names = list(self.all_image_names[:self.train_ratio])
            self.labels = list(self.all_labels[:self.train_ratio])
            # define the training transforms
            self.transform = transforms.Compose([
                transforms.ToPILImage(),
                transforms.Resize((512, 512)),
                #transforms.AutoAugment(),
                transforms.ToTensor(),
            ])
        # set the validation data images and labels
        elif self.train == False and self.test == False:
            print(f"Number of validation images: {self.valid_ratio}")
            self.image_names = list(self.all_image_names[-self.valid_ratio:-10])
            self.labels = list(self.all_labels[-self.valid_ratio:])
            # define the validation transforms
            self.transform = transforms.Compose([
                transforms.ToPILImage(),
                transforms.Resize((512, 512)),
                transforms.ToTensor(),
            ])
            
        elif self.test == True and self.train == False:
            self.image_names = list(self.all_image_names[-10:])
            self.labels = list(self.all_labels[-10:])
             # define the test transforms
            self.transform = transforms.Compose([
                transforms.ToPILImage(),
                transforms.Resize((512, 512)),
                transforms.ToTensor(),
            ])

    def __len__(self):
        return len(self.image_names)
    
    def __getitem__(self, index):
        image = cv2.imread(imageTrainPath+self.image_names[index])
        # convert the image from BGR to RGB color format
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        # apply image transforms
        image = self.transform(image)
        targets = self.labels[index]
        nameFile = self.image_names[index]
        
        return {
            'image': torch.tensor(image, dtype=torch.float32),
            'label': torch.tensor(targets, dtype=torch.float32),
            'filename': str(nameFile)
        }
    
class TestDataset():
    def __init__(self, path: str) -> Dataset():
        self.fileName = [file for file in os.listdir(path)]
        self.path = path
        self.transform = transforms.Compose([
            transforms.ToPILImage(),
            transforms.Resize((512, 512)),
            transforms.ToTensor(),
        ])
    
    def __len__(self):
        return len(self.fileName)
        
    def __getitem__(self, index):
        image = cv2.imread(self.path+self.fileName[index])
        # convert the image from BGR to RGB color format
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        # apply image transforms
        image = self.transform(image)
        nameFile = self.fileName[index]

        return {
            'image': torch.tensor(image, dtype=torch.float32),
            'filename': str(nameFile)
        }


In [None]:
from torchvision import models as models
import torch.nn as nn
from torch.nn import functional as F
from torch.autograd import Variable

def model(pretrained, requires_grad):
    model = models.resnet50(progress=True, pretrained=pretrained)
    # to freeze the hidden layers
    if requires_grad == False:
        for param in model.parameters():
            param.requires_grad = False
    # to train the hidden layers
    elif requires_grad == True:
        for param in model.parameters():
            param.requires_grad = True
    # make the classification layer learnable
    # we have 5 classes in total
    model.fc = nn.Linear(2048, 5)
    return model


In [None]:
import torch
from tqdm import tqdm
from torchmetrics import F1Score
from torchvision.ops import sigmoid_focal_loss

f1 = F1Score(num_classes=2, multiclass=True, average='weighted', threshold=0.3).to(device)

# training function
def train(model, dataloader, optimizer, criterion, trainData, device) -> int:
    print('Training')
    model.train()
    counter = 0
    train_running_loss = 0.0
    train_running_f1 = 0.0
    for i, data in tqdm(enumerate(dataloader), total=int(len(trainData)/dataloader.batch_size)):
        train_running_f1_loop = 0.0
        counter += 1
        data, target = data['image'].to(device), data['label'].to(device)
        optimizer.zero_grad()
        outputs = model(data)
        # apply sigmoid activation to get all the outputs between 0 and 1
        outputs = torch.sigmoid(outputs)
        loss = criterion(outputs, target)
        train_running_loss += loss.item()

        for j in range(outputs.shape[0]):
            train_running_f1_loop += f1(outputs[j], target[j].to(int)).item()
        
        train_running_f1 += train_running_f1_loop / outputs.shape[0]
        
        # backpropagation
        loss.backward()
        # update optimizer parameters
        optimizer.step()
        
    train_loss = train_running_loss / counter
    train_f1 = train_running_f1 / counter
    return train_loss, train_f1

# validation function
def validate(model, dataloader, criterion, valData, device) -> int:
    print('Validating')
    model.eval()
    counter = 0
    val_running_loss = 0.0
    val_running_f1 = 0.0
    with torch.no_grad():
        for i, data in tqdm(enumerate(dataloader), total=int(len(valData)/dataloader.batch_size)):
            val_running_f1_loop = 0.0
            counter += 1
            data, target = data['image'].to(device), data['label'].to(device)
            outputs = model(data)
            # apply sigmoid activation to get all the outputs between 0 and 1
            outputs = torch.sigmoid(outputs)
            loss = criterion(outputs, target)
            val_running_loss += loss.item()
            for j in range(outputs.shape[0]):
                val_running_f1_loop += f1(outputs[j], target[j].to(int)).item()

            val_running_f1 += val_running_f1_loop / outputs.shape[0]
        val_loss = val_running_loss / counter
        val_f1 = val_running_f1 / counter
        return val_loss, val_f1

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib
from torch.utils.data import DataLoader
matplotlib.style.use('ggplot')
# initialize the computation device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
#intialize the model
model = model(pretrained=True, requires_grad=False).to(device)
# learning parameters
lr = 0.0001
epochs = 200
batch_size = 256
optimizer = optim.Adam(model.parameters(), lr=lr)
criterion = nn.BCELoss()

In [None]:
trainData = ImageDataset(trainLabels, train=True, test=False)
valData = ImageDataset(trainLabels, train=False, test=False)
# train data loader
trainLoader = DataLoader(
    trainData, 
    batch_size=batch_size,
    shuffle=True
)
# validation data loader
validLoader = DataLoader(
    valData, 
    batch_size=batch_size,
    shuffle=False)

In [None]:
import warnings
warnings.filterwarnings("ignore")

train_loss = []
valid_loss = []
train_f1_score = []
val_f1_score = []

for epoch in range(epochs):
    print(f"Epoch {epoch+1} of {epochs}")
    train_epoch_loss, train_f1 = train(
        model, trainLoader, optimizer, criterion, trainData, device
    )
    valid_epoch_loss, val_f1 = validate(
        model, validLoader, criterion, valData, device
    )
    train_loss.append(train_epoch_loss)
    valid_loss.append(valid_epoch_loss)
    train_f1_score.append(train_f1)
    val_f1_score.append(val_f1)
    print(f'Train loss: {train_epoch_loss:.4f}')
    print(f'Val loss: {valid_epoch_loss:.4f}')
    print(f'F1-Score train: {train_f1_score[epoch]}')
    print(f'F1-Score val: {val_f1_score[epoch]}')


In [None]:
figure, axis = plt.subplots(2, 1)

figure.set_figheight(14)
figure.set_figwidth(15)
axis[0].plot(range(epochs), train_f1_score, label = "Training F1")
axis[0].plot(range(epochs), val_f1_score, label = "Validation F1")
axis[0].set_title(label='Weighted F1-scores, prob threshold=0.5')
axis[0].set_xlabel('Epochs')
axis[0].legend()

axis[1].plot(range(epochs), train_loss, label = "Training Loss")
axis[1].plot(range(epochs), valid_loss, label = "Validation Loss")
axis[1].set_title('Loss')
axis[1].set_xlabel('Epochs')
axis[1].legend()


In [None]:
# prepare the test dataset and dataloader
testData = ImageDataset(
    trainLabels, train=False, test=True
)
testLoader = DataLoader(
    testData, 
    batch_size=1,
    shuffle=False
)

In [None]:
for counter, data in enumerate(testLoader):
    image, target = data['image'].to(device), data['label']
    # get all the index positions where value == 1
    target_indices = [i for i in range(len(target[0])) if target[0][i] == 1]
    # get the predictions by passing the image through the model
    outputs = model(image)
    outputs = torch.sigmoid(outputs)
    outputs = outputs.detach().cpu()
    sorted_indices = np.argsort(outputs[0][outputs[0] > 0.3])
    best = sorted_indices[-3:]
    string_predicted = ''
    string_actual = ''
    for i in range(len(best)):
        string_predicted += f"{categories[best[i]]}, "
    for i in range(len(target_indices)):
        string_actual += f"{categories[target_indices[i]]}, "
    image = image.squeeze(0)
    image = image.detach().cpu().numpy()
    image = np.transpose(image, (1, 2, 0))
    plt.imshow(image)
    plt.axis('off')
    plt.title(f"PREDICTED: {string_predicted}\nACTUAL: {string_actual}")
    plt.show()

In [None]:
trainImagePath = "../../hfactory_magic_folders/colas_data_challenge/computer_vision_challenge/dataset/test/"
templateTest = pd.read_csv("../../hfactory_magic_folders/colas_data_challenge/computer_vision_challenge/dataset/template_test.csv")

In [None]:
import math
realTest = TestDataset(trainImagePath)

testLoader = DataLoader(
    realTest, 
    batch_size=1,
    shuffle=False
)


    

In [None]:
def thresh_round(input, threshold: int = 0.3) -> int:
    if input < threshold:
        return math.floor(input) 
    else:
        return math.ceil(input)
    
out = []
for counter, data in enumerate(testLoader):
    image, filename = data['image'].to(device), data['filename']
    outputs = model(image)
    outputs = torch.sigmoid(outputs)
    outputs = outputs.detach().cpu()
    out.append({
        'filename': filename[0],
        'FISSURE': thresh_round(outputs[0][0].item()),
        'REPARATION': thresh_round(outputs[0][1].item()),
        'FISSURE LONGITUDINALE': thresh_round(outputs[0][2].item()),
        'FAÏENCAGE': thresh_round(outputs[0][3].item()),
        'MISE EN DALLE': thresh_round(outputs[0][4].item()),
    })


In [None]:
out = pd.DataFrame(out)

In [None]:
sum(out.FISSURE), sum(out.REPARATION), sum(out['FISSURE LONGITUDINALE']), sum(out.FAÏENCAGE), sum(out['MISE EN DALLE'])

In [None]:
out.to_csv('submission05thresh.csv', index=False)