# Pytorch Engite Metrics [link](https://pytorch.org/ignite/metrics.html#attach-engine)

In [2]:
import torch
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print (device)

cuda


In [3]:
class Classifier(torch.nn.Module):
    def __init__(self, input_dim, num_classes):
        super().__init__()
        self.linear = torch.nn.Linear(input_dim, num_classes)
    def forward(self, x):
        return self.linear(x)
    
input_dim = 5
num_classes = 2
data_dim = 10
data = torch.randn(data_dim, input_dim), torch.randint(0, num_classes, (data_dim,))

model = Classifier(input_dim, num_classes)
model.to(device)
data[0].to(device)
data[1].to(device)

tensor([0, 0, 1, 1, 0, 1, 1, 0, 0, 0], device='cuda:0')

In [4]:
from ignite.metrics import Precision

# Define the metric
precision = Precision(average=None, device=device)   # <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<

# Start accumulation:
# for x, y in data:
x = data[0].to(device)
y = data[1].to(device)
y_pred = model(x)
precision.update((y_pred, y))  # <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<

# Compute the result
print("Precision: ", precision.compute())# <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<

# Reset metric
precision.reset() 

# Start new accumulation:
# for x, y in data:
y_pred = model(x)
precision.update((y_pred, y)) 

# Compute new result
print("Precision: ", precision.compute())

Precision:  tensor([0.8333, 0.7500], device='cuda:0', dtype=torch.float64)
Precision:  tensor([0.8333, 0.7500], device='cuda:0', dtype=torch.float64)
Precision:  tensor([0.8333, 0.7500], device='cuda:0', dtype=torch.float64)
Precision:  tensor([0.8333, 0.7500], device='cuda:0', dtype=torch.float64)


In [10]:
from ignite.metrics import  Recall

def to_li

# Define the metric
recall = Recall(average=None, device = device)

# Start accumulation:
# for x, y in data:
x = data[0].to(device)
y = data[1].to(device)
y_pred = model(x)
recall.update((y_pred, y))

# Compute the result
print("recall: ", recall.compute())

# Reset metric
recall.reset()

# Start new accumulation:
# for x, y in data:
y_pred = model(x)
recall.update((y_pred, y))

# Compute new result
print("recall: ", recall.compute())

recall:  tensor([0.4000, 0.4000], device='cuda:0', dtype=torch.float64)
recall:  tensor([0.4000, 0.4000], device='cuda:0', dtype=torch.float64)


In [16]:
z = torch.sigmoid(model(x))
z[:,0].reshape([-1])

tensor([0.7209, 0.6030, 0.4973, 0.4262, 0.5408, 0.4756, 0.5571, 0.5731, 0.4637,
        0.7081], device='cuda:0', grad_fn=<ReshapeAliasBackward0>)

In [17]:
from ignite.contrib.metrics import ROC_AUC
# Define the metric
"""
ROC_AUC expects y to be comprised of 0’s and 1’s. y_pred must either
be probability estimates or confidence values. To apply an activation to y_pred, use output_transform as shown below:
"""


def sigmoid_output_transform(output):
    y_pred, y = output
    y_pred = torch.sigmoid(y_pred)[:,0].reshape([-1])
    y = y.reshape([-1])
    print(y.shape, y_pred.shape)
    return y_pred, y

roc_auc = ROC_AUC(output_transform=sigmoid_output_transform, device = device)

# Start accumulation:
# for x, y in data:
x = data[0].to(device)
y = data[1].to(device)
y_pred = model(x)
# z, y = sigmoid_output_transform((y_pred, y))
roc_auc.update((y_pred, y))

# Compute the result
print("roc_auc: ", roc_auc.compute())

# Reset metric
roc_auc.reset()



ValueError: y should be a 1d array, got an array of shape (10, 2) instead.

In [8]:

def sigmoid_output_transform(output):
    y_pred, y = output
    y_pred = torch.sigmoid(y_pred)[:,0].reshape([-1])
    print(y_pred.shape)
    return y_pred, y
x = (torch.tensor([[3.4, -2.1]]), torch.tensor([1]))
sigmoid_output_transform(x)

torch.Size([1])


(tensor([0.9677]), tensor([1]))

In [21]:
from ignite.metrics import Loss

# Define the metric
cm = ConfusionMatrix(num_classes=num_classes, device=device)   # <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<

# Start accumulation:
# for x, y in data:
x = data[0].to(device)
y = data[1].to(device)
y_pred = model(x)
cm.update((y_pred, y))  # <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<

# Compute the result
print("cm: ", cm.compute())

# Reset metric
cm.reset() # <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<

# Start new accumulation:
# for x, y in data:
y_pred = model(x)
cm.update((y_pred, y)) 

# Compute new result
print("cm: ", cm.compute())

cm:  tensor([[2, 3],
        [3, 2]], device='cuda:0')
cm:  tensor([[2, 3],
        [3, 2]], device='cuda:0')


In [7]:
from ignite.metrics import Loss

criterion = torch.nn.NLLLoss()
metric = Loss(criterion)
y_pred = torch.tensor([[0.1, 0.4, 0.5], [0.1, 0.7, 0.2]])
y_true = torch.tensor([2, 2]).long()

metric.reset()
metric.update((y_pred, y_true))
print('metric loss', metric.compute())
print('normal loss', criterion(y_pred, y_true))

metric loss -0.3499999940395355
normal loss tensor(-0.3500)


# Custom Roc_auc

In [25]:
from typing import Any, Callable, cast, Tuple, Union

import torch

from ignite import distributed as idist
from ignite.exceptions import NotComputableError
from ignite.metrics import EpochMetric


def roc_auc_compute_fn(y_preds: torch.Tensor, y_targets: torch.Tensor) -> float:
    from sklearn.metrics import roc_auc_score

    y_true = y_targets.cpu().numpy()
    y_pred = y_preds.cpu().numpy()
    return roc_auc_score(y_true, y_pred)



class Custom_ROC_AUC(EpochMetric):
    """Computes Area Under the Receiver Operating Characteristic Curve (ROC AUC)
    accumulating predictions and the ground-truth during an epoch and applying
    `sklearn.metrics.roc_auc_score <https://scikit-learn.org/stable/modules/generated/
    sklearn.metrics.roc_auc_score.html#sklearn.metrics.roc_auc_score>`_ .

    Args:
        output_transform: a callable that is used to transform the
            :class:`~ignite.engine.engine.Engine`'s ``process_function``'s output into the
            form expected by the metric. This can be useful if, for example, you have a multi-output model and
            you want to compute the metric with respect to one of the outputs.
        check_compute_fn: Default False. If True, `roc_curve
            <https://scikit-learn.org/stable/modules/generated/sklearn.metrics.roc_auc_score.html#
            sklearn.metrics.roc_auc_score>`_ is run on the first batch of data to ensure there are
            no issues. User will be warned in case there are any issues computing the function.
        device: optional device specification for internal storage.

    Note:

        ROC_AUC expects y to be comprised of 0's and 1's. y_pred must either be probability estimates or confidence
        values. To apply an activation to y_pred, use output_transform as shown below:

        .. code-block:: python

            def sigmoid_output_transform(output):
                y_pred, y = output
                y_pred = torch.sigmoid(y_pred)
                return y_pred, y
            avg_precision = ROC_AUC(sigmoid_output_transform)

    Examples:

        .. include:: defaults.rst
            :start-after: :orphan:

        .. testcode::

            roc_auc = ROC_AUC()
            #The ``output_transform`` arg of the metric can be used to perform a sigmoid on the ``y_pred``.
            roc_auc.attach(default_evaluator, 'roc_auc')
            y_pred = torch.tensor([[0.0474], [0.5987], [0.7109], [0.9997]])
            y_true = torch.tensor([[0], [0], [1], [0]])
            state = default_evaluator.run([[y_pred, y_true]])
            print(state.metrics['roc_auc'])

        .. testoutput::

            0.6666...
    """

    def __init__(
        self,
        output_transform: Callable = lambda x: x,
        check_compute_fn: bool = False,
        device: Union[str, torch.device] = torch.device("cpu"),
    ):

        try:
            from sklearn.metrics import roc_auc_score  # noqa: F401
        except ImportError:
            raise ModuleNotFoundError("This contrib module requires scikit-learn to be installed.")

        super(Custom_ROC_AUC, self).__init__(
            roc_auc_compute_fn, output_transform=output_transform, check_compute_fn=check_compute_fn, device=device
        )
        
    def compute(self) -> float:
        if len(self._predictions) < 1 or len(self._targets) < 1:
            raise NotComputableError("EpochMetric must have at least one example before it can be computed.")

        if self._result is None:
            _prediction_tensor = torch.cat(self._predictions, dim=0)
            _target_tensor = torch.cat(self._targets, dim=0)

            ws = idist.get_world_size()
            if ws > 1:
                # All gather across all processes
                _prediction_tensor = cast(torch.Tensor, idist.all_gather(_prediction_tensor))
                _target_tensor = cast(torch.Tensor, idist.all_gather(_target_tensor))

            self._result = 0.0
            if idist.get_rank() == 0:
                # Run compute_fn on zero rank only
                _out = self._output_transform((_prediction_tensor, _target_tensor))
                self._result = self.compute_fn(_out[0], _out[1])

            if ws > 1:
                # broadcast result to all processes
                self._result = cast(float, idist.broadcast(self._result, src=0))

        return self._result

In [26]:
def sigmoid_output_transform(output):
    y_pred, y = output
    y_pred = torch.sigmoid(y_pred)[:,0].reshape([-1])
    y = y.reshape([-1])
    print(y.shape, y_pred.shape)
    return y_pred, y

roc_auc = Custom_ROC_AUC(output_transform=sigmoid_output_transform, device = device)

# Start accumulation:
# for x, y in data:
x = data[0].to(device)
y = data[1].to(device)
y_pred = model(x)
# z, y = sigmoid_output_transform((y_pred, y))
roc_auc.update((y_pred, y))

# Compute the result
print("roc_auc: ", roc_auc.compute())

# Reset metric
roc_auc.reset()



torch.Size([10]) torch.Size([10])
roc_auc:  0.5
