In [1]:
import re
import glob
import math
from pathlib import Path
from typing import Tuple, Dict, List, Optional

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy import stats


In [2]:
def list_traces_sorted(traces_path: str) -> List[Path]:
    """
    Return a list of trace files sorted by their numeric index in 'trace_<idx>.txt'.
    Accepts absolute or relative path.
    """
    paths = [Path(p) for p in glob.glob(str(Path(traces_path) / "trace_*.txt"))]
    if not paths:
        raise FileNotFoundError(f"No trace_*.txt files under: {traces_path}")
    # extract numeric index
    def idx(p: Path) -> int:
        m = re.match(r"trace_(\d+)\.txt$", p.name)
        if not m:
            raise ValueError(f"Bad trace filename: {p.name}")
        return int(m.group(1))
    paths.sort(key=idx)
    return paths

In [3]:
def load_inputs_matrix(inputs_file: str, ncols: int = 7) -> np.ndarray:
    """
    Load 'inputs.txt' as numeric matrix with ncols columns (default 7: V1..V7).
    Assumes space-separated values with no header.
    """
    df = pd.read_csv(inputs_file, sep=r"\s+", header=None, engine="python")
    if df.shape[1] != ncols:
        raise ValueError(f"Expected {ncols} columns, got {df.shape[1]} in {inputs_file}")
    return df.values  # (N, ncols)

In [4]:
def load_traces_matrix(traces_path: str) -> Tuple[np.ndarray, List[int]]:
    """
    Read all traces into a matrix of shape (N, S).
    Returns (traces, idx_list), where idx_list contains the numeric file indices (0-based).
    """
    files = list_traces_sorted(traces_path)
    idx_list = []
    rows = []
    for f in files:
        m = re.match(r"trace_(\d+)\.txt$", f.name)
        idx_list.append(int(m.group(1)))
        # robust read: allow whitespace/newlines; each file is 1D vector
        data = np.loadtxt(f, dtype=float)
        rows.append(np.atleast_1d(data))
    # sanity: ensure equal length (pad/raise if needed)
    lens = [r.size for r in rows]
    if len(set(lens)) != 1:
        raise ValueError(f"Traces have varying lengths: {set(lens)}")
    X = np.vstack(rows)  # (N, S)
    return X, idx_list

In [5]:
def align_inputs_to_traces(inputs: np.ndarray, idx_list: List[int]) -> np.ndarray:
    """
    Align inputs rows to trace files by 0-based index in filename.
    """
    max_idx = max(idx_list)
    if max_idx >= inputs.shape[0]:
        raise IndexError(
            f"inputs has only {inputs.shape[0]} rows, but file index {max_idx} appears."
        )
    aligned = inputs[np.array(idx_list), :]  # pick rows by 0-based indices
    return aligned


In [6]:
def window_traces(X: np.ndarray, qs: int, qe: Optional[int]) -> np.ndarray:
    """
    Slice columns [qs:qe] (1-based inclusive in your R; here we accept 1-based for familiarity).
    If qe is None, use full width. Returns a copy.
    """
    S = X.shape[1]
    if qe is None:
        qe = S
    # convert to 0-based python slicing
    qs0 = max(0, qs - 1)
    qe0 = min(S, qe)  # slice end is exclusive in Python, so keep as is
    if qs0 >= qe0:
        raise ValueError(f"Bad window [{qs}, {qe}] for S={S}")
    return X[:, qs0:qe0].copy()



In [7]:
def split_fixed_random(
    inputs_aligned: np.ndarray,
    v: float,
) -> Tuple[np.ndarray, np.ndarray]:
    """
    Return (fixed_rows, random_rows) as arrays of row indices based on V1 == v criterion.
    """
    V1 = inputs_aligned[:, 0]
    fixed_rows = np.where(V1 == v)[0]
    random_rows = np.where(V1 != v)[0]
    if fixed_rows.size == 0 or random_rows.size == 0:
        raise ValueError("One of the groups is empty (fixed_rows or random_rows).")
    return fixed_rows, random_rows

In [8]:

def extract_groups(
    X: np.ndarray, fixed_rows: np.ndarray, random_rows: np.ndarray
) -> Tuple[np.ndarray, np.ndarray]:
    """
    Build group matrices (fixed, random) by selecting corresponding rows.
    """
    fixed = X[fixed_rows, :]
    random = X[random_rows, :]
    return fixed, random


In [9]:
def tvla_welch_tcurve(fixed: np.ndarray, random: np.ndarray) -> np.ndarray:
    """
    Compute per-sample Welch's t-statistic (column-wise).
    Returns (S,) array of t-values.
    """
    if fixed.shape[1] != random.shape[1]:
        raise ValueError("fixed and random must have same number of columns")
    S = fixed.shape[1]
    tvals = np.empty(S, dtype=float)
    for i in range(S):
        t = stats.ttest_ind(fixed[:, i], random[:, i], equal_var=False, nan_policy="omit")
        tvals[i] = t.statistic
    return tvals

In [10]:

def ksla_curve(fixed: np.ndarray, random: np.ndarray) -> np.ndarray:
    """
    Compute per-sample two-sample KS statistic (D) column-wise.
    Returns (S,) array of KS D-values in [0,1].
    """
    if fixed.shape[1] != random.shape[1]:
        raise ValueError("fixed and random must have same number of columns")
    S = fixed.shape[1]
    dvals = np.empty(S, dtype=float)
    for i in range(S):
        d = stats.ks_2samp(fixed[:, i], random[:, i], alternative="two-sided", method="auto")
        dvals[i] = d.statistic
    return dvals

In [11]:
def tvla_power_curve(
    fixed: np.ndarray,
    random: np.ndarray,
    min_fixed: int = 10,
    max_steps: int = 100,
) -> Tuple[np.ndarray, np.ndarray]:
    """
    Compute 'power curve':
    for increasing m (traces per group), compute max |t| over all samples using first m traces.
    Returns (m_values, max_abs_t_per_m).
    """
    m_max = min(fixed.shape[0], random.shape[0])
    if m_max < min_fixed:
        raise ValueError("Not enough traces per group for power curve.")
    # choose up to max_steps m-values from [min_fixed..m_max]
    ms = np.unique(np.round(np.linspace(min_fixed, m_max, num=min(max_steps, m_max - min_fixed + 1)))).astype(int)
    out = np.empty(ms.size, dtype=float)
    for j, m in enumerate(ms):
        tvals = tvla_welch_tcurve(fixed[:m, :], random[:m, :])
        out[j] = np.nanmax(np.abs(tvals))
    return ms, out


In [12]:
def plot_tvla_curve(
    tvals: np.ndarray,
    qs: int,
    qe: int,
    threshold: float = 4.5,
    title: str = "TVLA (Welch)",
    figsize: Tuple[float, float] = (10, 4),
    out_pdf: Optional[str] = None,
) -> None:
    x = np.arange(qs, qs + tvals.size)
    plt.figure(figsize=figsize)
    plt.plot(x, tvals, lw=1.3)
    plt.axhline(+threshold, color="r", ls="--")
    plt.axhline(-threshold, color="r", ls="--")
    plt.title(title)
    plt.xlabel(f"Sample index ({qs}-{qe})")
    plt.ylabel("t-value")
    plt.grid(True, alpha=0.3)
    if out_pdf:
        plt.savefig(out_pdf, bbox_inches="tight")
        plt.close()
    else:
        plt.show()

In [13]:
def plot_ksla_curve(
    dvals: np.ndarray,
    qs: int,
    qe: int,
    threshold: Optional[float] = None,
    title: str = "KSLA (KS statistic)",
    figsize: Tuple[float, float] = (10, 4),
    out_pdf: Optional[str] = None,
) -> None:
    x = np.arange(qs, qs + dvals.size)
    plt.figure(figsize=figsize)
    plt.plot(x, dvals, lw=1.3, color="darkgreen")
    if threshold is not None:
        plt.axhline(threshold, color="r", ls="--")
    plt.title(title)
    plt.xlabel(f"Sample index ({qs}-{qe})")
    plt.ylabel("KS D")
    plt.ylim(0, 1)
    plt.grid(True, alpha=0.3)
    if out_pdf:
        plt.savefig(out_pdf, bbox_inches="tight")
        plt.close()
    else:
        plt.show()

In [14]:

def plot_power_curve(
    m_values: np.ndarray,
    max_abs_t: np.ndarray,
    title: str = "TVLA max-|t| vs traces-per-group",
    figsize: Tuple[float, float] = (10, 4),
    out_pdf: Optional[str] = None,
) -> None:
    plt.figure(figsize=figsize)
    plt.bar(m_values, max_abs_t, width=max(1, int(len(m_values) / 40)))
    plt.title(title)
    plt.xlabel("Traces per group (m)")
    plt.ylabel("Max |t| across window")
    plt.grid(axis="y", alpha=0.3)
    if out_pdf:
        plt.savefig(out_pdf, bbox_inches="tight")
        plt.close()
    else:
        plt.show()

In [20]:
def run_tvla_pipeline(
    name: str,
    traces_path: str,
    inputs_file: str,
    v: float = 0.5,
    qs: int = 1,
    qe: Optional[int] = None,
    tvla_threshold: float = 4.5,
    min_fixed: int = 10,
    out_dir: str = "./out_tvla",
    save_plots: bool = True,
) -> Dict[str, object]:
    """
    End-to-end TVLA like your R function:
    1) load traces and inputs, align by numeric index
    2) select window [qs:qe]
    3) split into fixed vs random by V1==v
    4) compute Welch t-curve
    5) save plot/csv
    6) compute power curve and save
    """
    Path(out_dir).mkdir(parents=True, exist_ok=True)
    # load / align
    X, idx_list = load_traces_matrix(traces_path)
    inputs = load_inputs_matrix(inputs_file, ncols=7)
    inputs_aligned = align_inputs_to_traces(inputs, idx_list)
    # window
    Xw = window_traces(X, qs, qe)
    S = Xw.shape[1]
    if qe is None:  # for labeling
        qe = qs + S - 1
    # split
    fixed_rows, random_rows = split_fixed_random(inputs_aligned, v)
    fixed, random = extract_groups(Xw, fixed_rows, random_rows)
    # t-curve
    tvals = tvla_welch_tcurve(fixed, random)
    # save t-curve CSV
    t_csv = str(Path(out_dir) / f"tvalues_{name}.csv")
    pd.DataFrame({"sample": np.arange(qs, qs + S), "t_value": tvals}).to_csv(t_csv, index=False)
    # plot t-curve
    t_pdf = str(Path(out_dir) / f"tvla_{name}.pdf")
    if save_plots:
        plot_tvla_curve(tvals, qs, qe, threshold=tvla_threshold, title=f"TVLA — {name}", out_pdf=t_pdf)
    # leakage points
    leaks = np.where(np.abs(tvals) > tvla_threshold)[0]
    # power curve
    #m_vals, max_abs_t = tvla_power_curve(fixed, random, min_fixed=min_fixed, max_steps=100)
    #p_csv = str(Path(out_dir) / f"tvla_power_curve_{name}.csv")
    #pd.DataFrame({"traces_per_group": m_vals, "max_abs_t": max_abs_t}).to_csv(p_csv, index=False)
    #p_pdf = str(Path(out_dir) / f"tvla_power_curve_{name}.pdf")
    #if save_plots:
     #   plot_power_curve(m_vals, max_abs_t, title=f"TVLA power curve — {name}", out_pdf=p_pdf)

    return dict(
        name=name,
        t_values=tvals,
        leakage_points=(leaks + qs),  # back to 1-based labeling of sample index
        fixed_count=fixed.shape[0],
        random_count=random.shape[0],
        window=(qs, qe),
        csv_tvalues=t_csv,
        pdf_tvalues=t_pdf if save_plots else None,
        #csv_power=p_csv,

        )


In [16]:
def run_ksla_pipeline(
    name: str,
    traces_path: str,
    inputs_file: str,
    v: float = 0.5,
    qs: int = 1,
    qe: Optional[int] = None,
    ks_threshold: Optional[float] = None,   # if None, no horizontal line
    out_dir: str = "./out_tvla",
    save_plots: bool = True,
) -> Dict[str, object]:
    """
    Mirror of TVLA driver but computing KS curve (for convenience).
    """
    Path(out_dir).mkdir(parents=True, exist_ok=True)
    # load / align / window
    X, idx_list = load_traces_matrix(traces_path)
    inputs = load_inputs_matrix(inputs_file, ncols=7)
    inputs_aligned = align_inputs_to_traces(inputs, idx_list)
    Xw = window_traces(X, qs, qe)
    S = Xw.shape[1]
    if qe is None:
        qe = qs + S - 1
    # split groups
    fixed_rows, random_rows = split_fixed_random(inputs_aligned, v)
    fixed, random = extract_groups(Xw, fixed_rows, random_rows)
    # ks curve
    dvals = ksla_curve(fixed, random)
    # save CSV
    ks_csv = str(Path(out_dir) / f"ksla_values_{name}.csv")
    pd.DataFrame({"sample": np.arange(qs, qs + S), "ks": dvals}).to_csv(ks_csv, index=False)
    # plot
    ks_pdf = str(Path(out_dir) / f"ksla_{name}.pdf")
    if save_plots:
        plot_ksla_curve(dvals, qs, qe, threshold=ks_threshold, title=f"KSLA — {name}", out_pdf=ks_pdf)

    leaks = None
    if ks_threshold is not None:
        leaks = (np.where(dvals > ks_threshold)[0] + qs)

    return dict(
        name=name,
        ks_values=dvals,
        leakage_points=leaks,
        fixed_count=fixed.shape[0],
        random_count=random.shape[0],
        window=(qs, qe),
        csv_ks=ks_csv,
        pdf_ks=ks_pdf if save_plots else None,
    )

In [None]:

# paths
traces_path = "/Users/andrew/Desktop/thesis/only-traces/capture_traces/unprotected"
inputs_file = "/Users/andrew/Desktop/thesis/only-traces/capture_traces/unprotected/inputs.txt"

# full-window TVLA (1..24430)
res_tvla = run_tvla_pipeline(
    name="unprotected_new_nn",
    traces_path=traces_path,
    inputs_file=inputs_file,
    v=0.5,
    qs=1,
    qe=24430,
    tvla_threshold=4.5,
    out_dir="./out_tvla"
)

# optional KSLA on same split (set threshold if you want a line; or compute quantile externally)
res_ksla = run_ksla_pipeline(
    name="unprotected_new_nn",
    traces_path=traces_path,
    inputs_file=inputs_file,
    v=0.5,
    qs=1,
    qe=24430,
    ks_threshold=0.2,
    out_dir="./out_tvla"
)
