In [None]:
import numpy as np
import pandas as pd

In [None]:
class Expert:
    def __init__(self, dataset, labeler_id, modus="perfect", param=None, nLabels=800, prob=0.5):
        self.labelerId = labeler_id
        self.dataset = dataset
        self.data = dataset.getData()[["Image ID", str(self.labelerId)]]
        self.nLabels = nLabels
        self.param = param
        self.prob = prob
        self.modus = modus

        if self.modus == "perfect":
            self.predictions = self.data

    def setTrainTestVal(self, train_filenames, val_filenames, test_filenames):
        """
        Sets the train, test and validation data so that the expert can choose which data could be labeld for training
        """
        self.train_data = self.data[self.data["Image ID"] in train_filenames]
        self.test_data = self.data[self.data["Image ID"] in test_filenames]
        self.val_data = self.data[self.data["Image ID"] in val_filenames]

    def setPredictions(self, ids):
        """
        Sets the images with index ids from the train dataset as labeled
        """
        self.indices_labeled  = ids
        self.indices_unlabeled = list(set(ids) - set(indices_labeled))

    def initPredictions(self, n):
        """
        Randomly chooses n images for labeling
        """
        all_indices = list(range(len(self.train_data["Image ID"])))
        self.setPredictions(all_indices)

    def setPredictionFunction(self, fn):
        self.predict = fn

    def predictUnlabeled(self, img, target, fnames):
        preds = []
        for i, x in enumerate(fnames):
            if np.random.uniform(0,1) > self.prob:
                    preds.append(self.predict(img[i], target[i], x))
                else:
                    preds.append(np.random.randint(2, size=1))

    def predict(self, img, target, fnames):
        """
        img: the input image
        target: the GT label
        fname: filename (id for the image)
        """
        return np.array([self.predictions[self.predictions["Image ID"] == image_id][str(self.labelerId)].values for image_id in fnames]).ravel()

class NihExpert:
    """A class used to represent an Expert on NIH ChestX-ray data.

    Parameters
    ----------
    labeler_id : int
        the Reader ID to specify which radiologist the expert object represents
    target : str
        the target to make predictions for

    Attributes
    ----------
    labeler_id : int
        the Reader ID to specify which radiologist the expert object represents
    target : str
        the target to make predictions for
    image_id_to_prediction : dict of {int : str}
        a dictionary that maps the image id to the prediction the radiologist made for the specified target

    Methods
    -------
    predict(image_ids)
        makes a prediction for the given image ids
    """

    def __init__(self, labeler_id: int, target: str, PATH, numLabels=800, prob=0.5):
        self.labelerId = labeler_id
        self.target = target
        self.maxLabels = numLabels
        self.prob = prob
        
        self.resetPredictionCount()

        individual_labels = pd.read_csv(PATH + "labels.csv")

        expert_labels = individual_labels[individual_labels["Reader ID"] == self.labelerId][
            ["Image ID", self.target + "_Expert_Label", self.target + "_GT_Label"]]
        expert_labels = expert_labels.fillna(0)

        self.image_id_to_prediction = pd.Series(expert_labels[self.target + "_Expert_Label"].values,
                                                index=expert_labels["Image ID"]).to_dict()


    def predict(self, image_ids):
        """Returns the experts predictions for the given image ids. Works only for image ids that are labeled by the expert

        Parameters
        ----------
        image_ids : list of int
            the image ids to get the radiologists predictions for

        Returns
        -------
        list of int
            returns a list of 0 or 1 that represent the radiologists prediction for the specified target
        """
        return [self.image_id_to_prediction[image_id] for image_id in image_ids]

    def predict_unlabeled_data(self, image_ids):
        """Returns the experts predictions for the given image ids. Works for all image ids, returns -1 if not labeled by expert

        Parameters
        ----------
        image_ids : list of int
            the image ids to get the radiologists predictions for

        Returns
        -------
        list of int
            returns a list of 0 or 1 that represent the radiologists prediction for the specified target, or -1 if no prediction
        """
        return [self.image_id_to_prediction[image_id] if image_id in self.image_id_to_prediction else -1 for image_id in image_ids]
    
    def predictNew(self, image_ids):
        """
        Returns the expert prediction for the first n predictions
        For every other prediction is predicts with the probability (random guessing)
        """
        length = len(image_ids)
        if (self.predictions + length) <= self.maxLabels:
            self.predictions += length
            return [self.image_id_to_prediction[image_id] for image_id in image_ids]
        else:
            temp_predictions = [self.image_id_to_prediction[image_id] for image_id in image_ids[:(self.maxLabels - self.predictions)]]
            self.predictions = self.maxLabels
            for image_id in image_ids[(self.maxLabels - self.predictions):]:
                if np.random.uniform(0,1) > self.prob:
                    temp_predictions.append(self.image_id_to_prediction[image_id])
                else:
                    temp_predictions.append(np.random.randint(2, size=1))
    
    def resetPredictionCount(self):
        self.predictions = 0

In [None]:
class Expert:
    def __init__(self, dataset, labeler_id, modus="perfect", param=None, nLabels=800, prob=0.5):
        self.labelerId = labeler_id
        self.dataset = dataset
        self.data = dataset.getData()[["Image ID", str(self.labelerId)]]
        #self.data["Image ID"] = self.data["Image ID"].astype('category')
        self.nLabels = nLabels
        self.param = param
        self.prob = prob
        self.modus = modus
        
        self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

        if self.modus == "perfect":
            self.predictions = self.data
            self.predictions["Image ID"] = self.predictions["Image ID"].astype('category')
            
        self.prebuild_predictions = []
        self.prebuild_filenames = []

    def predict(self, img, target, fnames):
        """
        img: the input image
        target: the GT label
        fname: filename (id for the image)
        """
        #return np.array([self.predictions[self.predictions["Image ID"] == image_id][str(self.labelerId)].values for image_id in fnames]).ravel()
        return np.array([self.predictions.loc[self.predictions["Image ID"] == image_id, str(self.labelerId)].values[0] for image_id in fnames])

    def setModel(self, model):
        self.model = model
        
    def predictModel(self, img, target, fnames):
        #if len(img.shape) == 3:
        if img.dim() == 3:
            img = img.unsqueeze(0)
        with torch.no_grad():
            outputs = self.model(img)
            _, predicted = torch.max(outputs.data, 1)
        #return predicted
        return predicted
    
    def predictImage(self, img):
        return self.predictModel(img, None, None)
    
    def getModel(self):
        return self.model
    
    def saveModel(self, path, name):
        torch.save(self.model, path + "/" + name + "_" + str(labeler_id))
        
    def loadModel(self, path, name):
        self.model = torch.load(path + "/" + name + "_" + str(labeler_id))
        model.eval()
        
    def predictWithModel(self, img, target, filename):
        """
        Checks with the model if the expert would be correct
        If it predicts 1 than it returns the true label
        If it predicts 0 than is returns the opposit label
        """
        predicted = self.predictModel(img, target, filename)
        result = []
        target = target.cpu().detach().numpy()
        for i, pred in enumerate(predicted):
            if pred == 1:
                result.append(target[i])
            else:
                result.append(1 - target[i])
        return result
    
    def init_model_predictions(self, train_dataloader):
        for i, (input, target, hpred) in enumerate(train_dataloader):
            result = self.predictWithModel(input.to(self.device), target, hpred)
            self.prebuild_predictions += result
            self.prebuild_filenames += hpred
    
    def predict_model_predefined(self, img, target, filenames):
        return [self.prebuild_predictions[self.prebuild_filenames.index(filename)] for filename in filenames]


In [None]:
class Expert:
    def __init__(self, dataset, labeler_id, modus="perfect", param=None, nLabels=800, prob=0.5):
        self.labelerId = labeler_id
        self.dataset = dataset
        self.data = dataset.getData()[["Image ID", str(self.labelerId)]]
        #self.data["Image ID"] = self.data["Image ID"].astype('category')
        self.nLabels = nLabels
        self.param = param
        self.prob = prob
        self.modus = modus
        
        self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

        self.predictions = self.data
        self.predictions["Image ID"] = self.predictions["Image ID"].astype('category')
            
        self.prebuild_predictions = []
        self.prebuild_filenames = []

        self.prebuild_predictions_ssl = []
        self.prebuild_filenames_ssl = []

        self.prebuild_predictions_al = []
        self.prebuild_filenames_al = []

    def predict(self, img, target, fnames):
        """
        img: the input image
        target: the GT label
        fname: filename (id for the image)
        """
        #return np.array([self.predictions[self.predictions["Image ID"] == image_id][str(self.labelerId)].values for image_id in fnames]).ravel()
        return np.array([self.predictions.loc[self.predictions["Image ID"] == image_id, str(self.labelerId)].values[0] for image_id in fnames])

    def predictSLL(self, img, target, fnames):
        outputs = self.getSSLOutputs(img, target, fnames)
        _, predicted = torch.max(outputs.data, 1)
        return predicted

    def getSSLOutput(self, img, target, fnames):
        if img.dim() == 3:
            img = img.unsqueeze(0)
        with torch.no_grad():
            outputs = self.sslModel.predict(img)
            return outputs

    def predictAL(self, img, target, fnames):
        return self.predictWithModel(img, target, fnames)

    def getALOutputs(self, img, target, fnames):
        if img.dim() == 3:
            img = img.unsqueeze(0)
        with torch.no_grad():
            outputs = self.alModel(img)
            _, predicted = torch.max(outputs.data, 1)
        return predicted

    def get_model_prediction(self, model, img, target, fnames):
        if img.dim() == 3:
            img = img.unsqueeze(0)
        with torch.no_grad():
            outputs = model(img)
            _, predicted = torch.max(outputs.data, 1)
        return predicted

    def loadModel(self, path, mod):
        model = torch.load(path)
        if mod == "SLL":
            self.sslModel = model
        elif mod == "AL":
            self.alModel = model

    def init_model_predictions(self, train_dataloader, mod):
        if mod == "SSL":
            for i, (input, target, hpred) in enumerate(train_dataloader):
                result = self.predictSLL(input.to(self.device), target, hpred)
                self.prebuild_predictions_ssl += result
                self.prebuild_filenames_ssl += hpred
        elif mod == "AL":
            for i, (input, target, hpred) in enumerate(train_dataloader):
                result = self.predictAL(input.to(self.device), target, hpred)
                self.prebuild_predictions_al += result
                self.prebuild_filenames_al += hpred
    
    def predict_model_predefined(self, img, target, filenames, mod):
        if mod == "SSL":
            return [self.prebuild_predictions_ssl[self.prebuild_filenames_ssl.index(filename)] for filename in filenames]
        elif mod == "AL":
            return [self.prebuild_predictions_al[self.prebuild_filenames_al.index(filename)] for filename in filenames]

    def getModel(self, mod):
        if mod == "SSL":
            return self.sslModel
        elif mod == "AL":
            return self.alModel

    def saveModel(self, path, name, mod):
        if mod == "SSL":
            torch.save(self.sslModel, path + "/" + name + "_" + str(labeler_id))
        elif mod == "AL":
            torch.save(self.alModel, path + "/" + name + "_" + str(labeler_id))

    def setModel(self, mod, model):
        if mod == "SSL":
            self.sslModel = model
        elif mod == "AL":
            self.alModel = model

    """
    Old functions, here for compatibility and wraped from the new functions
    """
        
    def predictWithModel(self, img, target, filename):
        """
        Checks with the model if the expert would be correct
        If it predicts 1 than it returns the true label
        If it predicts 0 than is returns the opposit label
        """
        predicted = self.get_model_prediction(self.alModel, img, target, filename)
        result = []
        target = target.cpu().detach().numpy()
        for i, pred in enumerate(predicted):
            if pred == 1:
                result.append(target[i])
            else:
                result.append(1 - target[i])
        return result

In [None]:
class SSLModel():
    def __init__(self, embedded_model, linear_model):
        super().__init__()
        self.embedded_model = embedded_model
        self.linear_model = linear_model

    def predict(self, imgs):
        embedding = self.embedded_model.get_embedding(batch=imgs)
        return self.linear_model(embedding)