# The Photometric LSST Astronomical Time-series Classification Challenge (PLAsTiCC): code that runs the performance metric

*Alex Malz (NYU)*, *Renee Hlozek (U. Toronto)*, *Tarek Alam (UCL)*, *Anita Bahmanyar (U. Toronto)*, *Rahul Biswas (U. Stockholm)*, *Rafael Martinez-Galarza (Harvard)*, *Gautham Narayan (STScI)*

In [1]:
import numpy as np
import pandas as pd

# This is the code available on GitHub for calculating metrics,
# as well as performing other diagnostics on probability tables.

In [2]:

def make_class_pairs(data_info_dict):
    """
    Paris the paths to classifier output and truth tables for each classifier.

    Parameters
    ----------
    data_info_dict: dictionary
       
    Returns
    -------
    data_info_dict: dictionary
        updated keywords: class_pairs, dict - classifier: [path_to_class_output, path_to_truth_tables] 
    """
    
    for name in data_info_dict['names']:
        data_info_dict['class_pairs'][name] = [data_info_dict['classifications'][name], data_info_dict['truth_tables'][name]]
    
    return(data_info_dict['class_pairs'])
        
def make_file_locs(data_info_dict):
    """
    Set paths to data directory, classifier output and truth tables.

    Parameters
    ----------
    data_info_dict: dictionary                  
        
    Returns
    -------
    data_info_dict: dictionary
        updated keywords: dirname - data directory, str
                          classifications, dict - classifier: path to classifier output, str
                          truth_tables, dict - classifier: path to truth tables - str
    """
    
    # get the names of classifiers to be considered
    names = data_info_dict['names']
    
    # set data directory
    data_info_dict['dirname'] = topdir + data_info_dict['label'] + '/'

    for name in names:
        # get the path to classifier output
        data_info_dict['classifications'][name] = '%s/predicted_prob_%s.csv'%(name, name)
        
        # get the path to truth table
        data_info_dict['truth_tables'][name] = '%s/truth_table_%s.csv'%(name, name)
        
    return data_info_dict

def process_strings(dataset, cc):
    """
    Get info on directory name and classifier.

    Parameters
    ----------
    dataset: dictionary   
    cc: classifier name, str
        
    Returns
    -------
    loc: data directory, str
    text: version label, str
   """
    
    loc = dataset['dirname']
    text = dataset['label'] + ' ' + dataset['names'][cc]
    
    return loc, text

def just_read_class_pairs(pair, dataset, cc):
    """
    Reads predicted probabilities and truth table.

    Parameters
    ----------
    pair: list of str - [path_to_classifier_output, path_to_truth_table]
    dataset: dictionary
    cc: classifier name, str
        
    Returns
    -------
    prob_mat: probability matrix (output from classifier)
    tvec: truth vector
    """
    
    loc, text = process_strings(dataset, cc)
    clfile = pair[0]
    truthfile = pair[1]
    
    # read classifier output
    prob_mat = pd.read_csv(loc + clfile, delim_whitespace=True).values
    
    # read truth table
    truth_values = pd.read_csv(loc + truthfile, delim_whitespace=True).values
    tvec = np.where(truth_values==1)[1]
    
    return prob_mat, tvec

In [3]:
# Build dictionary to store classification results
mystery = {}
mystery['label'] = 'Unknown'
mystery['names'] = ['RandomForest', 'KNeighbors', 'MLPNeuralNet']
mystery['classifications'] = {}
mystery['truth_tables'] = {}
mystery['class_pairs'] = {}
mystery['probs'] = {}
mystery['truth'] = {}

In [4]:
# Read classifier output and truth tables
topdir = '../examples/'
mystery = make_file_locs(mystery)
mystery['class_pairs'] = make_class_pairs(mystery)
for nm, name in enumerate(mystery['names']):
    probm, truthv = just_read_class_pairs(mystery['class_pairs'][name], mystery, nm)
    mystery['probs'][name] = probm
    mystery['truth'][name] = truthv
M_classes = np.shape(probm)[-1]

# we need the class labels in the dataset in a consistently sorted order 
# and will assume the weights of the weightvec correspond to this order
class_labels = sorted(np.unique(mystery['truth']['RandomForest']))

In [5]:
topdir = '../examples/'
mystery = make_file_locs(mystery)
mystery['class_pairs'] = make_class_pairs(mystery)
mystery['class_pairs']

{'RandomForest': ['RandomForest/predicted_prob_RandomForest.csv',
  'RandomForest/truth_table_RandomForest.csv'],
 'KNeighbors': ['KNeighbors/predicted_prob_KNeighbors.csv',
  'KNeighbors/truth_table_KNeighbors.csv'],
 'MLPNeuralNet': ['MLPNeuralNet/predicted_prob_MLPNeuralNet.csv',
  'MLPNeuralNet/truth_table_MLPNeuralNet.csv']}

Method (Metric)
======

The log-loss is defined as
\begin{eqnarray*}
L &=& -\sum_{m=1}^{M}\frac{w_{m}}{N_{m}}\sum_{n=1}^{N_{m}}\ln[p_{n}(m | m)]
\end{eqnarray*}

We calculate the metric within each class $m$ by taking an average of its value $-\ln[p_{n}(m | m)]$ for each true member  $n$ of the class.  Then we weight the metrics for each class by an arbitrary weight $w_{m}$ and take a weighted average of the per-class metrics to produce a global scalar metric $L$.

In [6]:
"""
This is all from proclam but copied here so no one has to install it.
"""

import numpy as np
import sys
import collections

"""
Utility functions for PLAsTiCC metrics
"""

# from __future__ import absolute_import, division
# __all__ = ['sanitize_predictions',
#            'weight_sum', 'averager', 'check_weights',
#            'det_to_prob',
#            'prob_to_det',
#            'det_to_cm', 'prob_to_cm',
#            'cm_to_rate', 'det_to_rate', 'prob_to_rate']

RateMatrix = collections.namedtuple('rates', 'TPR FPR FNR TNR')

def sanitize_predictions(predictions, epsilon=sys.float_info.epsilon):
    """
    Replaces 0 and 1 with 0+epsilon, 1-epsilon

    Parameters
    ----------
    predictions: numpy.ndarray, float
        N*M matrix of probabilities per object, may have 0 or 1 values
    epsilon: float
        small placeholder number, defaults to floating point precision

    Returns
    -------
    predictions: numpy.ndarray, float
        N*M matrix of probabilities per object, no 0 or 1 values
    """
    assert epsilon > 0. and epsilon < 0.0005
    mask1 = (predictions < epsilon)
    mask2 = (predictions > 1.0 - epsilon)

    predictions[mask1] = epsilon
    predictions[mask2] = 1.0 - epsilon
    predictions = predictions / np.sum(predictions, axis=1)[:, np.newaxis]
    return predictions

def det_to_prob(dets, prediction=None):
    """
    Reformats vector of class assignments into matrix with 1 at true/assigned class and zero elsewhere

    Parameters
    ----------
    dets: numpy.ndarray, int
        vector of classes
    prediction: numpy.ndarray, float, optional
        predicted class probabilities

    Returns
    -------
    probs: numpy.ndarray, float
        matrix with 1 at input classes and 0 elsewhere

    Notes
    -----
    det_to_prob formerly truth_reformatter
    Does not yet handle number of classes in truth not matching number of classes in prediction, i.e. for having "other" class or secret classes not in training set.  The prediction keyword is a kludge to enable this but should be replaced.
    """
    N = len(dets)
    indices = range(N)

    if prediction is None:
        prediction_shape = (N, int(np.max(dets) + 1))
    else:
        prediction, dets = np.asarray(prediction), np.asarray(dets)
        prediction_shape = np.shape(prediction)

    probs = np.zeros(prediction_shape)
    probs[indices, dets] = 1.

    return probs

def prob_to_det(probs):
    """
    Converts probabilistic classifications to deterministic classifications by assigning the class with highest probability

    Parameters
    ----------
    probs: numpy.ndarray, float
        N * M matrix of class probabilities

    Returns
    -------
    dets: numpy.ndarray, int
        maximum probability classes
    """
    dets = np.argmax(probs, axis=1)

    return dets

def det_to_cm(dets, truth, per_class_norm=True, vb=False):
    """
    Converts deterministic classifications and truth into confusion matrix

    Parameters
    ----------
    dets: numpy.ndarray, int
        assigned classes
    truth: numpy.ndarray, int
        true classes
    per_class_norm: boolean, optional
        equal weight per class if True, equal weight per object if False
    vb: boolean, optional
        if True, print cm

    Returns
    -------
    cm: numpy.ndarray, int
        confusion matrix

    Notes
    -----
    I need to fix the norm keyword all around to enable more options, like normed output vs. not.
    """
    pred_classes, pred_counts = np.unique(dets, return_counts=True)
    true_classes, true_counts = np.unique(truth, return_counts=True)
    if vb: print((pred_classes, pred_counts), (true_classes, true_counts))

    M = max(max(pred_classes), max(true_classes)) + 1

    cm = np.zeros((M, M), dtype=float)
    # print((np.shape(dets), np.shape(truth)))
    coords = np.array(list(zip(dets, truth)))
    indices, index_counts = np.unique(coords, axis=0, return_counts=True)
    # if vb: print(indices, index_counts)
    indices = indices.T
    # if vb: print(np.shape(indices))
    cm[indices[0], indices[1]] = index_counts
    if vb: print(cm)

    if per_class_norm:
        # print(type(cm))
        # print(type(true_counts))
        # cm = cm / true_counts
        # cm /= true_counts[:, np.newaxis] #
        cm = cm / true_counts[np.newaxis, :]

    if vb: print(cm)

    return cm

def prob_to_cm(probs, truth, per_class_norm=True, vb=False):
    """
    Turns probabilistic classifications into confusion matrix by taking maximum probability as deterministic class

    Parameters
    ----------
    probs: numpy.ndarray, float
        N * M matrix of class probabilities
    truth: numpy.ndarray, int
        N-dimensional vector of true classes
    per_class_norm: boolean, optional
        equal weight per class if True, equal weight per object if False
    vb: boolean, optional
        if True, print cm

    Returns
    -------
    cm: numpy.ndarray, int
        confusion matrix
    """
    dets = prob_to_det(probs)

    cm = det_to_cm(dets, truth, per_class_norm=per_class_norm, vb=vb)

    return cm

def cm_to_rate(cm, vb=False):
    """
    Turns a confusion matrix into true/false positive/negative rates

    Parameters
    ----------
    cm: numpy.ndarray, int or float
        confusion matrix, first axis is predictions, second axis is truth
    vb: boolean, optional
        print progress to stdout?

    Returns
    -------
    rates: named tuple, float
        RateMatrix named tuple

    Notes
    -----
    BROKEN!
    This can be done with a mask to weight the classes differently here.
    """
    if vb: print(cm)
    diag = np.diag(cm)
    if vb: print(diag)

    TP = np.sum(diag)
    FN = np.sum(np.sum(cm, axis=0) - diag)
    FP = np.sum(np.sum(cm, axis=1) - diag)
    TN = np.sum(cm) - TP
    if vb: print((TP, FN, FP, TN))

    T = TP + TN
    F = FP + FN
    P = TP + FP
    N = TN + FN
    if vb: print((T, F, P, N))

    TPR = TP / P
    FPR = FP / N
    FNR = FN / P
    TNR = TN / N

    rates = RateMatrix(TPR=TPR, FPR=FPR, FNR=FNR, TNR=TNR)
    if vb: print(rates)

    return rates

def det_to_rate(dets, truth, per_class_norm=True, vb=False):
    cm = det_to_cm(dets, truth, per_class_norm=per_class_norm, vb=vb)
    rates = cm_to_rate(cm, vb=vb)
    return rates

def prob_to_rate(probs, truth, per_class_norm=True, vb=False):
    cm = prob_to_cm(probs, truth, per_class_norm=per_class_norm, vb=vb)
    rates = cm_to_rate(cm, vb=vb)
    return rates

def weight_sum(per_class_metrics, weight_vector, norm=True):
    """
    Calculates the weighted metric

    Parameters
    ----------
    per_class_metrics: numpy.float
        the scores separated by class (a list of arrays)
    weight_vector: numpy.ndarray floar
        The array of weights per class
    norm: boolean, optional

    Returns
    -------
    weight_sum: np.float
        The weighted metric
    """
    weight_sum = np.dot(weight_vector, per_class_metrics)

    return weight_sum

def check_weights(avg_info, M, truth=None):
    """
    Converts standard weighting schemes to weight vectors for weight_sum

    Parameters
    ----------
    avg_info: str or numpy.ndarray, float
        keyword about how to calculate weighted average metric
    M: int
        number of classes
    truth: numpy.ndarray, int, optional
        true class assignments

    Returns
    -------
    weights: numpy.ndarray, float
        relative weights per class
    """
    if type(avg_info) != str:
        avg_info = np.asarray(avg_info)
        weights = avg_info / np.sum(avg_info)
        assert(np.isclose(sum(weights), 1.))
    elif avg_info == 'per_class':
        weights = np.ones(M) / float(M)
    elif avg_info == 'per_item':
        classes, counts = np.unique(truth, return_counts=True)
        weights = np.zeros(M)
        weights[classes] = counts / float(len(truth))
        assert len(weights) == M
    return weights

def averager(per_object_metrics, truth, M):
    """
    Creates a list with the metrics per object, separated by class
    """
    group_metric = per_object_metrics
    class_metric = np.empty(M)
    for m in range(M):
        true_indices = np.where(truth == m)[0]
        how_many_in_class = len(true_indices)
        try:
            assert(how_many_in_class > 0)
            per_class_metric = group_metric[true_indices]
            # assert(~np.all(np.isnan(per_class_metric)))
            class_metric[m] = np.average(per_class_metric)
        except AssertionError:
            class_metric[m] = 0.
        # print((m, how_many_in_class, class_metric[m]))
    return class_metric

"""
A superclass for metrics
"""
class Metric(object):

    def __init__(self, scheme=None, **kwargs):
        """
        An object that evaluates a function of the true classes and class probabilities

        Parameters
        ----------
        scheme: string
            the name of the metric
        """
        self.scheme = scheme

    def evaluate(self, prediction, truth, weights=None, **kwds):
        """
        Evaluates a function of the truth and prediction

        Parameters
        ----------
        prediction: numpy.ndarray, float
            predicted class probabilities
        truth: numpy.ndarray, int
            true classes
        weights: numpy.ndarray, float
            per-class weights

        Returns
        -------
        metric: float
            value of the metric
        """
        print('No metric specified: returning true positive rate based on maximum value')

        return # metric

"""
A metric subclass for the log-loss
"""
class LogLoss(Metric):
    def __init__(self, scheme=None):
        """
        An object that evaluates the log-loss metric

        Parameters
        ----------
        scheme: string
            the name of the metric
        """
        super(LogLoss, self).__init__(scheme)
        self.scheme = scheme

    def evaluate(self, prediction, truth, averaging='per_class'):
        """
        Evaluates the log-loss

        Parameters
        ----------
        prediction: numpy.ndarray, float
            predicted class probabilities
        truth: numpy.ndarray, int
            true classes
        averaging: string or numpy.ndarray, float
            'per_class' weights classes equally, other keywords possible
            vector assumed to be class weights

        Returns
        -------
        logloss: float
            value of the metric

        Notes
        -----
        This uses the natural log.
        """
        prediction, truth = np.asarray(prediction), np.asarray(truth)
        prediction_shape = np.shape(prediction)
        (N, M) = prediction_shape

        weights = check_weights(averaging, M, truth=truth)
        truth_mask = det_to_prob(truth, prediction)

        prediction = sanitize_predictions(prediction)

        log_prob = np.log(prediction)
        logloss_each = -1. * np.sum(truth_mask * log_prob, axis=1)[:, np.newaxis]

        # use a better structure for checking keyword support
        class_logloss = averager(logloss_each, truth, M)

        logloss = weight_sum(class_logloss, weight_vector=weights)

        assert(~np.isnan(logloss))

        return logloss


In [7]:
# This is how you run the metric with a random weight vector.

metric = 'LogLoss'
weightvec = np.ones(M_classes)

# dummy example for SNPhotCC demo data
special_classes = (1, 10, 11, 12)

# we should be using this for the PLAsTiCC data
# special_clases = (51, 99)

mask = np.array([True if classname in special_classes else False for classname in class_labels])
weightvec[mask] = 2
weightvec = weightvec / sum(weightvec)
name = np.random.choice(mystery['names'])
probm = mystery['probs'][name]
truthv = mystery['truth'][name]
LL = LogLoss()
val = LL.evaluate(prediction=probm, truth=truthv, averaging=weightvec)
print(name+' with weights '+str(weightvec)+' has '+metric+' = '+str(val))

KNeighbors with weights [0.05882353 0.11764706 0.05882353 0.05882353 0.05882353 0.05882353
 0.05882353 0.05882353 0.05882353 0.05882353 0.11764706 0.11764706
 0.11764706] has LogLoss = 20.749255306361132


Acknowledgments
===============

The DESC acknowledges ongoing support from the Institut National de Physique Nucleaire et de Physique des Particules in France; the Science & Technology Facilities Council in the United Kingdom; and the Department of Energy, the National Science Foundation, and the LSST Corporation in the United States.

DESC uses resources of the IN2P3 Computing Center (CC-IN2P3--Lyon/Villeurbanne - France) funded by the Centre National de la Recherche Scientifique; the National Energy Research Scientific Computing Center, a DOE Office of Science User Facility supported by the Office of Science of the U.S. Department of Energy under Contract No. DE-AC02-05CH11231; STFC DiRAC HPC Facilities, funded by UK BIS National E-infrastructure capital grants; and the UK particle physics grid, supported by the GridPP Collaboration.

This work was performed in part under DOE Contract DE-AC02-76SF00515.

Contributions
=======

Alex Malz: conceptualization, data curation, formal analysis, investigation, methodology, project administration, software, supervision, validation, visualization, writing - original draft

Renee Hlozek: data curation, formal analysis, funding acquisition, investigation, project administration, software, supervision, validation, visualization, writing - original draft

Tarek Alam: investigation, software, validation

Anita Bahmanyar: formal analysis, investigation, methodology, software, writing - original draft

Rahul Biswas: conceptualization, methodology, software

Rafael Martinez-Galarza: data curation, software, visualization

Gautham Narayan: data curation, formal analysis