In [32]:
from sklearn.metrics import accuracy_score
import numpy as np

# Parameters

In [33]:
nWorkers = 5    # Number of Channels
nSamples = 450   # Number of Data
nClasses = 4    # Number of Classes

# Generate Fake Data

In [34]:
from fake_data_generator import RandomConfusionMatrixChannel, DataGenerator

def generate_sample_data(nSamples, nWorkers, nClass, truePredictionRate=.8):
    workers = []
    confusion_matrices = []
    for _ in range(nWorkers):
        worker = RandomConfusionMatrixChannel()
        worker.train(nClass, truePredictionRate)
        workers.append(worker)
        confusion_matrices.append(worker.confusionMatrix)

    samples = {}
    lables = []
    data_generator = DataGenerator(nClass)
    for n in range(nSamples):
        data = data_generator.generate()
        sample = {w:[workers[w].estimate(data)] for w in range(nWorkers)}
        samples[n] = sample
        lables.append(data.label)


    return (samples, lables, confusion_matrices)


def generate_temporal_dependent_data(nSamples, nWorkers, nClass, nActivityChunk, truePredictionRate=.8):
    workers = []
    confusion_matrices = []
    for _ in range(nWorkers):
        worker = RandomConfusionMatrixChannel()
        worker.train(nClass, truePredictionRate)
        workers.append(worker)
        confusion_matrices.append(worker.confusionMatrix)

    samples = {}
    lables = []
    data_generator = DataGenerator(nClass)
    
    chunkSize = nSamples // nActivityChunk

    for n in range(nActivityChunk):
        data = data_generator.generate()
        for m in range(chunkSize):
            index = (n*chunkSize) + m
            sample = {w:[workers[w].estimate(data)] for w in range(nWorkers)}
            samples[index] = sample
            lables.append(data.label)

    data = data_generator.generate()
    for n in range(len(lables), nSamples):
        sample = {w:[workers[w].estimate(data)] for w in range(nWorkers)}
        samples[n] = sample
        lables.append(data.label)

    return (samples, lables, confusion_matrices)

# Test

In [35]:
# samples, labels, confusion_matrices = generate_sample_data(nSamples, nWorkers, nClasses, .8)
samples, labels, confusion_matrices = generate_temporal_dependent_data(nSamples, nWorkers, nClasses, 10, .8)

Run EM algorithm

In [36]:
from dawid_skene import run as run_em

_, _, _, _, class_marginals, error_rates, patient_classes = run_em(samples, verbose=False)

Run HMM algorithm

In [37]:
from hmmlearn import hmm
# Number of possible observations. That can be the same as number of states (for single labeler, or equal to nClasses*nWorkers for multi labelers)
labelers_list = [0] # list of labelers
n_observations = nClasses
# Number of hidden states
n_states = nClasses
estimated_labels = np.argmax(patient_classes, axis=1)
observations = np.array(labels)

# Create a Categorical Hidden Markov Model
model = hmm.CategoricalHMM(n_components=n_states, n_iter=100)

# Fit the model to the observations using the Baum-Welch algorithm
model.fit(observations.reshape(-1,1))


CategoricalHMM(n_components=4, n_features=4, n_iter=100,
               random_state=RandomState(MT19937) at 0x186C7882940)

In [38]:
print(model.emissionprob_)

[[6.00000000e-01 4.00000000e-01 8.08058396e-11 2.87858616e-19]
 [8.02651808e-18 2.62935017e-17 9.99999996e-01 3.76521034e-09]
 [2.91917130e-17 3.43910384e-36 1.67753398e-19 1.00000000e+00]
 [4.69324432e-15 2.11980408e-18 9.99999991e-01 8.66235288e-09]]


In [43]:
print(model.transmat_)

[[9.91071429e-01 1.67314735e-03 4.46428568e-03 2.79113839e-03]
 [5.73501664e-29 2.34499952e-01 1.86083785e-02 7.46891669e-01]
 [1.11111111e-02 3.94326946e-27 9.88888889e-01 2.95304931e-30]
 [8.05915172e-31 9.72923194e-01 2.69055609e-02 1.71244779e-04]]


In [42]:
z=model.predict(estimated_labels.reshape(-1,1))
accuracy_hmm = int(100 * accuracy_score(observations.reshape(-1,1), z))

# Evaluation

In [None]:
def frobenius_norm(matrix1, matrix2):
    # Convert the input matrices to numpy arrays if they are not already
    matrix1 = np.array(matrix1)
    matrix2 = np.array(matrix2)
    
    # Calculate the Frobenius norm of the difference
    difference = matrix1 - matrix2
    norm = np.linalg.norm(difference, 'fro')
    
    return norm

def one_hot_encode(x, n_classes):
    """
    One hot encode a list of sample labels. Return a one-hot encoded vector for each label.
    : x: List of sample Labels
    : return: Numpy array of one-hot encoded labels
     """
    return np.eye(n_classes)[x]

### Accuracy
Run this cell only if true labels are available

In [None]:

true_labels = labels
estimated_labels = np.argmax(patient_classes, axis=1)

accuracy = int(100 * accuracy_score(true_labels, estimated_labels))

print("Accuracy: %{}".format(accuracy))

### Average Parameter Estimation Error
Run this cell only if true confusion matrices are available

In [None]:
parameter_error_rate = np.mean([frobenius_norm(cm_true, cm_estimate) for cm_true, cm_estimate in zip(confusion_matrices, error_rates)])

print("Average Parameter Estimation Error: {:.2f}".format(parameter_error_rate))