In [1]:
import numpy as np

In [2]:
def starts_with(first, last, reverse=False):
    last += 1

    ls = range(first, last)
    if reverse:
        reversed(ls)

    return dict(zip(ls, [0] * len(list(ls))))

def sample_mean(trace: np.array):
    return trace.sum() / len(trace)

In [3]:
import math

def values(trace):
    return np.mean(trace), np.var(trace), len(trace)

def test_statistic(mean_0, var_0, car_0, mean_1, var_1, car_1):
    return (mean_0 - mean_1) / math.sqrt(var_0 / car_0 + var_1 / car_1)

def degree_of_freedom(var_0, car_0, var_1, car_1):
    nom = (var_0 / car_0 + var_1 / car_1)**2
    den = (var_0 / car_0)**2 / (car_0 - 1) + (var_1 / car_1)**2 / (car_1 - 1)

    return nom / den

In [4]:
# Student's t-probability distribution function
import scipy.stats as stats

def cdf(t, v):
    """
    The cumulative distribution function as described on page 3.

    The aim of a t-test is to provide a quantitative value as a probability that the mean μ
        of two sets are different.

    :param t: the test statistic of the two traces.
    :param v: the degree of freedom of the two traces.
    :return: the cumulative distribution function.
    """
    return 2 * stats.t(v).cdf(-abs(t))

In [5]:
import scipy.special as special

special.binom(7, 3) == 35

True

In [6]:
class TraceProcessor:
    def __init__(self, order=1, prob=.01):
        """
        TraceProcessor digests traces using incremental programming. This allows for t-testing
        anywhere in the digestion process.

        :param order: The order of t-test to be performed on the data. Defaults to 1 to accommodate
            for the ASCAD dataset, which features first-order leakages in AES traces.
        :param prob: The desired probability for the t-test to reject H0.
        """
        self.order = order
        self.prob = prob

        # Maximal statistical order to be maintained
        self.max_d = self.order + 1

        # Store binomial coefficients for use in calculating the central sums
        #   (Schneider & Moradi, 2016 - page 6 - formula 3).
        self.binom = {}
        for d in range(self.max_d):
            self.binom[d] = {}
            for k in range(self.max_d):
                self.binom[d][k] = special.binom(self.max_d, k)

        # Central Moments (CM) to be maintained.
        self.CM = starts_with(1, self.max_d)

        # Central Sums (CS) to be maintained, corresponding to the CMs.
        # CS is reversed to accommodate the update style of the CS (high to low).
        self.CS = starts_with(2, self.max_d, reverse=True)

        # Number of collected traces.
        self.n = 0
        # Cardinality of all observed traces combined.
        self.cardinality = 0

    def add_trace(self, trace: np.array):
        """
        Adds traces and computes intermediate central moments.

        :param trace: should be a numpy array containing the trace.
        """
        # Please note that n = cardinality Q' in all central moment calculations,
        #   where Q' is the trace set updated with the given trace.
        self.n += 1
        self.cardinality += len(trace)

        # Second half of Schneider & Moradi, 2016 - page 6 - formula 3 is incompatible with n == 1,
        #   due to the division by n - 1.
        if self.n <= 1:
            return

        # Assuming here that y is the sample mean of the new trace
        #   (Schneider & Moradi, 2016 - page 5 - bottom right)
        #   This is not specified in their paper, which merely states y as "trace".
        #   My assumption is based on the further calculations using delta as a number,
        #   where delta should be a vector of variable length without this assumption.
        delta = sample_mean(trace) - self.CM[1]

        for central_sum_order in self.CS:
            self.update_cs(central_sum_order, delta)

        for central_moment_order in self.CM:
            self.update_cm(central_moment_order, delta)

    def update_cm(self, d, delta):
        """
        Updates the Central Moment (CM) for a given order of central moment.

        :param d: Central Moment order.
        :param delta: the delta of the new trace (Schneider & Moradi, 2016 - page 5 - bottom left).
        """
        if d == 1:
            # Schneider & Moradi, 2016 - page 5 - bottom right.
            self.CM[1] += delta / self.n
        else:
            # Schneider & Moradi, 2016 - page 6 - top left.
            self.CM[d] = self.CS[d] / self.n

    def update_cs(self, d, delta):
        """
        Updates the Central Sum (CS) for a given order of central moment.

        :param d: Central Moment order.
        :param delta: the delta of the new trace (See page 5 bottom left).
        """
        sum_prev_cs = 0
        # Schneider & Moradi, 2016 - page 6 - formula 3.
        for k in range(2, d - 2):
            # As CS is reversed, this will access the lower-order central sums
            #   which are not yet updated with the new trace.
            sum_prev_cs += self.binom[d][k] * self.CS[d-k] * (-delta / self.n) ** k

        a = (((self.n - 1) / self.n) * delta) ** d
        b = 1 - (-1 / (self.n - 1)) ** (d - 1)

        self.CS[d] += sum_prev_cs + a * b

    def t_test(self, trace):
        """
        Returns whether to reject H0. H0 being "left and right are from different distributions".
        The traces should be normally distributed for the Student's t-test to work.

        :param trace: the trace under test.
        :return: whether H0 can be rejected.
        """
        # Note that the first-order Central Moment (CM[1]) corresponds to the mean and
        #   the second-order (CM[2]) to the variance.
        mean, var, car = values(trace)

        t = test_statistic(self.CM[1], self.CM[2], self.cardinality, mean, var, car)
        v = degree_of_freedom(self.CM[2], self.cardinality, var, car)
        if t > 4.5 and v > 1000:
            # Schneider & Moradi, 2016 - page 3 - bottom left.
            return True

        return cdf(t, v) < self.prob

In [7]:
import math

def calc_t(H):
    mean = [0, 0]
    var = [0, 0]
    n = [0, 0]

    for ix_cat in range(2):
        for ix_bin in range(len(H[ix_cat])):
            # mean[ix_cat] += H[ix_cat][ix_bin] * range[ix_bin]
            mean[ix_cat] += H[ix_cat][ix_bin]
            n[ix_cat] += H[ix_cat][ix_bin]

        mean[ix_cat] /= n[ix_cat]

        for ix_bin in range(len(H[ix_cat])):
            # tmp = (range[ix_bin] - mean[ix_cat])
            tmp = mean[ix_cat]
            var[ix_cat] += tmp ** 2 + H[ix_cat][ix_bin]

        var[ix_cat] /= n[ix_cat]

    # t-value
    mean_diff = mean[0] - mean[1]
    var_sum = (var[0] / n[0]) + (var[1] / n[1])
    t_ret = mean_diff / math.sqrt(var_sum)

    # degree of freedom
    denom = ((var[0] / n[0]) * (var[0] / n[0])) / (n[0] - 1) + ((var[1] / n[1]) * (var[1] / n[1])) / (n[1] - 1)
    t_dof_ret = var_sum ** 2 / denom

    # cdf
    t_p_ret = cdf(t_ret, t_dof_ret)

    return t_ret, t_dof_ret, t_p_ret

In [8]:
def calc_chi(H):
    num_cats = len(H)
    num_bins = len(H[0])

    # Degrees of freedom
    chi_dof_ret = (num_bins - 1) * (num_cats - 1)

    # chi**2 value
    sum_rows = [0] * num_cats
    sum_cols = [0] * num_bins
    N = 0.0

    for ix_bin in range(num_bins):
        for ix_cat in range(num_cats):
            sum_rows[ix_cat] += H[ix_cat][ix_bin]
            sum_cols[ix_bin] += H[ix_cat][ix_bin]
            N += H[ix_cat][ix_bin]

    chi_tmp = 0.0
    for ix_bin in range(num_bins):
        for ix_cat in range(num_cats):
            E = (sum_rows[ix_cat] * sum_cols[ix_bin]) / N
            tmp = (H[ix_cat][ix_bin] - E)

            chi_tmp += tmp ** 2 / E

    chi_ret = chi_tmp
    chi_p_ret = stats.chi2(chi_tmp).cdf(chi_dof_ret)

    # Small p values give evidence to reject the null hypothesis and conclude that for these
    #   scenarios the occurrences of the observations are not independent
    return chi_ret, chi_dof_ret, chi_p_ret

In [9]:
inpt = [np.random.normal(3, 0, 100), np.random.normal(3, 0, 100)]
inpt1 = [np.random.normal(1000, 1000, 100), np.random.normal(3, 0, 100)]

print(calc_t(inpt), calc_chi(inpt))
print(calc_t(inpt1), calc_chi(inpt1))

(0.0, 598.0, 1.0) (0.0, 99, nan)
(0.0, 300.3350804540584, 1.0) (459.2465064314272, 99, 5.347906529608671e-77)


In [10]:
TRACE_TRAIN = [np.random.normal(3, 1, 1000) for _ in range(100)]

TRACE_TEST_PASS = np.random.normal(3, 1, 1000)
TRACE_TEST_REJECT = np.random.normal(5, .01, 1000)

TP = TraceProcessor()
for t in TRACE_TRAIN:
    TP.add_trace(t)

print(TP.t_test(TRACE_TEST_PASS), TP.t_test(TRACE_TEST_REJECT))

False True


In [11]:
def get_fpr(tp: TraceProcessor, trace_set):
    """
    Gets the False Positive Rate (FPR) over a set of traces.
    Equal to the rate at which H0 is falsely accepted.

    :param tp: The TraceProcessor, trained over a set of traces Q.
    :param trace_set: Set of traces, which are drawn from a different distribution than Q.
    :return: Returns the FPR over this set of traces.
    """
    fp = 0
    for trace in trace_set:
        fp += 1 - tp.t_test(trace)

    return fp / len(trace_set)

get_fpr(TP, [np.random.normal(3.1, 1, 1000) for _ in range(100)])

0.07

In [12]:
def get_fnr(tp: TraceProcessor, trace_set):
    """
    Gets the False Negative Rate (FNR) over a given set of traces.
    Equal to the rate at which H0 is falsely rejected.

    :param tp: The TraceProcessor, trained over a set of traces Q.
    :param trace_set: Set of traces, which are drawn from the same distribution as Q.
    :return: Returns the FNR over this set of traces.
    """
    fn = 0
    for trace in trace_set:
        fn += tp.t_test(trace)

    return fn / len(trace_set)

get_fnr(TP, [np.random.normal(3, 1, 1000) for _ in range(100)])

0.04