## 1. Import package

In [1]:
import numpy as np
from dataclasses import dataclass
from typing import List, Tuple
import random
import scipy.optimize as opt
import scipy.linalg as sla
from sklearn.model_selection import StratifiedKFold

## 2.Data loading and processing

In [2]:

@dataclass
class Dataset:
    Xtr: np.ndarray  # noisy train+val
    Str: np.ndarray  # noisy train+val
    Xts: np.ndarray  # clean test
    Yts: np.ndarray  # clean test

def load_npz(path):
    d=np.load(path);
    return Dataset(d['Xtr'],d['Str'],d['Xts'],d['Yts'])

#Other pipeline funcion define
def set_seed(seed: int):
    np.random.seed(seed)
    random.seed(seed)

def one_hot(y: np.ndarray, C: int) -> np.ndarray:
    oh = np.zeros((y.shape[0], C), dtype=np.float32)
    oh[np.arange(y.shape[0]), y] = 1.0
    return oh

def softmax(z: np.ndarray) -> np.ndarray:
    # stablize the value
    z = z - z.max(axis=1, keepdims=True)
    expz = np.exp(z)
    return expz / (expz.sum(axis=1, keepdims=True) + 1e-12)

def accuracy(y, yhat): return float((y==yhat).mean())


#retun T
def get_T(name: str) -> np.ndarray:
    name = name.lower()
    if "0.3" in name:
        T = np.array([[0.7, 0.3, 0.0],
                      [0.0, 0.7, 0.3],
                      [0.3, 0.0, 0.7]], dtype=np.float32)
    elif "0.6" in name:
        T = np.array([[0.4, 0.3, 0.3],
                      [0.3, 0.4, 0.3],
                      [0.3, 0.3, 0.4]], dtype=np.float32)
    else:
        raise ValueError("Unknown dataset: only 0.3 and 0.6 are supported here.")
    return T

# split the training set
def split(X,y,ratio=0.2,seed=42):
    set_seed(seed);
    n=X.shape[0];
    idx=np.random.permutation(n);
    sp=int(n*(1-ratio))
    return X[idx[:sp]],y[idx[:sp]],X[idx[sp:]],y[idx[sp:]]

## 3.Model Building

### 3.1 Model Prepocess

In [3]:
@dataclass
class Standardizer:
    mean: np.ndarray; std: np.ndarray
    def transform(self,Xf):
        Xn=(Xf-self.mean)/self.std
        bias=np.ones((Xn.shape[0],1),dtype=np.float64)
        return np.hstack([Xn,bias])

def flatten(X):
    return X.reshape(X.shape[0],-1).astype(np.float64)
def fit_std(Xtrf):
    m=Xtrf.mean(0,keepdims=True);
    s=Xtrf.std(0,keepdims=True)+1e-5;
    return Standardizer(m,s)

### 3.2 Forward part

**Forward Method:**  
Multiply the *clean prediction distribution* \( p_{\text{clean}} \) by \( T^\top \),  
to obtain the predicted distribution of noisy labels:  
$$
p_{\tilde{y}} = T^\top \, p_{\text{clean}}
$$
Then compute the cross-entropy loss with respect to the noisy labels.

In [4]:
def forward_loss(p_clean, y, T):
    p_tilde = p_clean @ T
    p_tilde = np.clip(p_tilde, 1e-12, 1.0)
    return float(-np.log(p_tilde[np.arange(y.shape[0]), y]).mean())

Return $ \frac{\partial L}{\partial z} $ (the gradient with respect to the logits),  
which can be combined with the linear layer to compute  
$ \frac{\partial L}{\partial W} = X^\top \frac{\partial L}{\partial z} $.

In [5]:
def dLdz_forward(p_clean, y, T):
    N, C = p_clean.shape
    Y = one_hot(y, C)
    p_tilde = p_clean @ T
    p_tilde = np.clip(p_tilde, 1e-12, 1.0)
    dL_dp_tilde = -(Y / p_tilde) / N
    dL_dp_clean = dL_dp_tilde @ T.T
    s = (dL_dp_clean * p_clean).sum(axis=1, keepdims=True)
    dL_dz = p_clean * (dL_dp_clean - s)
    return dL_dz

### 3.3 Multiclass Logistic Regression (Softmax Regression)

This part applies a **softmax** function on the linear outputs (logits) to produce  
a probability distribution over all classes:

$ p(y = c \mid x) = \dfrac{\exp(z_c)}{\sum_{k=1}^{C} \exp(z_k)} $,  
where $ z = XW + b $ are the logits.

The model is trained by minimizing the **cross-entropy loss** between the predicted  
distribution $p(y \mid x)$ and the true labels.

In [6]:
@dataclass
class FwdCfg: wd:float=1e-4; max_iter:int=300; seed:int=42
class SoftmaxFwd:
    def __init__(self, D, C, T, cfg:FwdCfg):
        self.D, self.C, self.T, self.cfg = D, C, T, cfg
        set_seed(cfg.seed)
        self.W = (0.01*np.random.randn(D,C)).astype(np.float64)
    def _fun(self,w,X,y):
        W=w.reshape(self.D,self.C); p=softmax(X@W)
        base=forward_loss(p,y,self.T)
        reg =0.5*self.cfg.wd*(sla.norm(W,'fro')**2)
        dLdz=dLdz_forward(p,y,self.T)
        grad = X.T@dLdz + self.cfg.wd*W
        return base+reg, grad.reshape(-1)
    def fit(self,X,y):
        res=opt.minimize(self._fun,self.W.reshape(-1),args=(X,y),method="L-BFGS-B",jac=True,
                         options={"maxiter":self.cfg.max_iter})
        self.W=res.x.reshape(self.D,self.C); return res
    def predict_proba(self,X): return softmax(X@self.W)
    def predict(self,X): return self.predict_proba(X).argmax(1)


## 4. Hyper-parameter Tunning-Cross Validation

In [7]:
@dataclass
class Result:
    cfg: "FwdCfg"
    train_loss_mean: float
    train_loss_std: float
    val_loss_mean: float
    val_loss_std: float
    train_acc_mean: float
    train_acc_std: float
    val_acc_mean: float
    val_acc_std: float

#10 fold cross-validation for model selection
def search_with_direction_cv(Xtr, Str, Xts, Yts, T, seed=42, n_splits=10) -> Tuple[Result, List[Result]]:
    # Flatten
    Xtr_f = flatten(Xtr)

    C = T.shape[0]
    D = Xtr_f.shape[1]

    # Grid search
    grid = [
        FwdCfg(wd=1e-4, max_iter=500, seed=seed),
        FwdCfg(wd=1e-3, max_iter=500, seed=seed),
        FwdCfg(wd=1e-2, max_iter=500, seed=seed),
        FwdCfg(wd=5e-2, max_iter=500, seed=seed),
        FwdCfg(wd=1e-1, max_iter=500, seed=seed),
        FwdCfg(wd=5e-1, max_iter=500, seed=seed),
    ]

    skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=seed)

    # [(cfg, tr_mean, tr_std, va_mean, va_std, tr_acc_mean, tr_acc_std, va_acc_mean, va_acc_std)]
    cfg_cv_stats = []

    for cfg in grid:
        fold_train_losses = []
        fold_val_losses = []
        fold_train_accs = []
        fold_val_accs = []

        for tr_idx, val_idx in skf.split(Xtr_f, Str):
            X_tr_raw, y_tr = Xtr_f[tr_idx], Str[tr_idx]
            X_val_raw, y_val = Xtr_f[val_idx], Str[val_idx]

            # standlization
            std = fit_std(X_tr_raw)
            X_tr = std.transform(X_tr_raw)
            X_val = std.transform(X_val_raw)

            #train the model
            model = SoftmaxFwd(X_tr.shape[1], C, T, cfg)
            model.fit(X_tr, y_tr)

            #train loss & acc
            p_tr = model.predict_proba(X_tr)
            train_loss = forward_loss(p_tr, y_tr, T) + 0.5 * cfg.wd * (sla.norm(model.W, 'fro') ** 2)
            y_tr_pred = np.argmax(p_tr, axis=1)
            train_acc = accuracy(y_tr, y_tr_pred)

            fold_train_losses.append(train_loss)
            fold_train_accs.append(train_acc)

            # val loss & acc
            p_val = model.predict_proba(X_val)
            val_loss = forward_loss(p_val, y_val, T) + 0.5 * cfg.wd * (sla.norm(model.W, 'fro') ** 2)
            y_val_pred = np.argmax(p_val, axis=1)
            val_acc = accuracy(y_val, y_val_pred)

            fold_val_losses.append(val_loss)
            fold_val_accs.append(val_acc)

        # mean ± std
        tr_mean = float(np.mean(fold_train_losses))
        tr_std  = float(np.std(fold_train_losses))
        va_mean = float(np.mean(fold_val_losses))
        va_std  = float(np.std(fold_val_losses))

        tr_acc_mean = float(np.mean(fold_train_accs))
        tr_acc_std  = float(np.std(fold_train_accs))
        va_acc_mean = float(np.mean(fold_val_accs))
        va_acc_std  = float(np.std(fold_val_accs))

        cfg_cv_stats.append(
            (cfg, tr_mean, tr_std, va_mean, va_std, tr_acc_mean, tr_acc_std, va_acc_mean, va_acc_std)
        )

    # use val loss mean choose best cfg
    best_tuple = min(cfg_cv_stats, key=lambda x: x[3])  # x[3] 是 val_loss_mean
    best_cfg, best_tr_mean, best_tr_std, best_va_mean, best_va_std, \
        best_tr_acc_mean, best_tr_acc_std, best_va_acc_mean, best_va_acc_std = best_tuple

    # results list
    results: List[Result] = [
        Result(
            cfg=cfg,
            train_loss_mean=tr_mean,
            train_loss_std=tr_std,
            val_loss_mean=va_mean,
            val_loss_std=va_std,
            train_acc_mean=tr_acc_mean,
            train_acc_std=tr_acc_std,
            val_acc_mean=va_acc_mean,
            val_acc_std=va_acc_std,
        )
        for (cfg, tr_mean, tr_std, va_mean, va_std,
             tr_acc_mean, tr_acc_std, va_acc_mean, va_acc_std) in cfg_cv_stats
    ]

    best = Result(
        cfg=best_cfg,
        train_loss_mean=best_tr_mean,
        train_loss_std=best_tr_std,
        val_loss_mean=best_va_mean,
        val_loss_std=best_va_std,
        train_acc_mean=best_tr_acc_mean,
        train_acc_std=best_tr_acc_std,
        val_acc_mean=best_va_acc_mean,
        val_acc_std=best_va_acc_std,
    )

    return best, results

## 5. Model-selection

### 5.1  Dataset with known flip rates (FashionMNIST)

In [9]:
def main():
    set_seed(42)
    datasets = [
       "datasets/FashionMNIST0.3.npz",
       "datasets/FashionMNIST0.6.npz",
   ]

    # from google.colab import drive
    # drive.mount('/content/drive')
    # datasets = [
    #     "/content/drive/MyDrive/datasets/FashionMNIST0.3.npz",
    #     "/content/drive/MyDrive/datasets/FashionMNIST0.6.npz",
    # ]
    for path in datasets:
        print(f"\n==== Dataset: {path} ====")
        T = get_T(path)
        data = load_npz(path)

        best, results = search_with_direction_cv(
            data.Xtr, data.Str, data.Xts, data.Yts, T, seed=42
        )

        def fmt(r: Result) -> str:
            return (
                f"[p@T] wd={r.cfg.wd}, it={r.cfg.max_iter} | "
                f"train_loss={r.train_loss_mean:.4f} (±{r.train_loss_std:.4f}), "
                f"val_loss={r.val_loss_mean:.4f} (±{r.val_loss_std:.4f}), "
                f"train_acc={r.train_acc_mean*100:.2f}% (±{r.train_acc_std*100:.2f}%), "
                f"val_acc={r.val_acc_mean*100:.2f}% (±{r.val_acc_std*100:.2f}%)"
            )

        print("Tuned configs with 10-fold CV (forward loss):")
        for r in results:
            print(" ", fmt(r))
        print("** Best:", fmt(best))


if __name__ == "__main__":
    main()

Mounted at /content/drive

==== Dataset: /content/drive/MyDrive/datasets/FashionMNIST0.3.npz ====
Tuned configs with 10-fold CV (forward loss):
  [p@T] wd=0.0001, it=500 | train_loss=0.6045 (±0.0008), val_loss=0.7443 (±0.0183), train_acc=72.40% (±0.19%), val_acc=67.64% (±0.54%)
  [p@T] wd=0.001, it=500 | train_loss=0.6237 (±0.0006), val_loss=0.6781 (±0.0084), train_acc=70.89% (±0.09%), val_acc=68.24% (±0.69%)
  [p@T] wd=0.01, it=500 | train_loss=0.6472 (±0.0007), val_loss=0.6646 (±0.0061), train_acc=69.34% (±0.08%), val_acc=68.72% (±0.70%)
  [p@T] wd=0.05, it=500 | train_loss=0.6690 (±0.0007), val_loss=0.6761 (±0.0060), train_acc=68.83% (±0.08%), val_acc=68.54% (±0.67%)
  [p@T] wd=0.1, it=500 | train_loss=0.6818 (±0.0007), val_loss=0.6865 (±0.0063), train_acc=68.61% (±0.09%), val_acc=68.45% (±0.69%)
  [p@T] wd=0.5, it=500 | train_loss=0.7284 (±0.0007), val_loss=0.7301 (±0.0060), train_acc=68.11% (±0.08%), val_acc=68.06% (±0.61%)
** Best: [p@T] wd=0.01, it=500 | train_loss=0.6472 (±0.00

### 5.2. Dataset with unknown flip rates (CIFAR)

In [None]:
# ==== CIFAR (ForwardCorrection, softmax) - 10-fold CV on wd (fixed dims) ====

# T from Transition matrix estimator
T_CIFAR = np.array([[0.3753,0.3311 ,0.2935],
                    [0.3028,0.3493,0.3479],
                    [0.3277,0.2998,0.3725]], dtype=np.float64)
DATASET_PATH = "datasets/CIFAR.npz"
# from google.colab import drive
# drive.mount('/content/drive')

# DATASET_PATH ="/content/drive/MyDrive/datasets/CIFAR.npz"

# load data
d = np.load(DATASET_PATH)
Xtr, Str, Xts, Yts = d['Xtr'], d['Str'], d['Xts'], d['Yts']

# Flattern
Xtr_f = flatten(Xtr)
Xts_f = flatten(Xts)

@dataclass
class FwdCfg:
    wd: float = 0.05
    max_iter: int = 500
    seed: int = 42

C = T_CIFAR.shape[0]
seed = 42

# Grid Search
grid = [
    FwdCfg(wd=1e-4, max_iter=500, seed=seed),
    FwdCfg(wd=1e-3, max_iter=500, seed=seed),
    FwdCfg(wd=5e-2, max_iter=500, seed=seed),
    FwdCfg(wd=1e-2, max_iter=500, seed=seed),
    FwdCfg(wd=5e-1, max_iter=500, seed=seed),
    FwdCfg(wd=1e-1, max_iter=500, seed=seed),
    FwdCfg(wd=0.2,   max_iter=500, seed=seed),
    FwdCfg(wd=0.5,   max_iter=500, seed=seed),
]

print(f"\n==== Dataset: {DATASET_PATH} ====")
print("Tuned configs with 10-fold CV (forward loss):")

# 10 fold Cross-Validation
skf = StratifiedKFold(n_splits=10, shuffle=True, random_state=seed)

# Model selection
cv_summaries = []
# [(cfg, mean_tr_loss, std_tr_loss, mean_vloss, std_vloss,
#    mean_tr_acc, std_tr_acc, mean_vacc, std_vacc)]

for cfg in grid:
    fold_train_losses = []
    fold_val_losses   = []
    fold_train_accs   = []
    fold_val_accs     = []

    for tr_idx, val_idx in skf.split(Xtr_f, Str):

        X_tr_raw, y_tr   = Xtr_f[tr_idx], Str[tr_idx]
        X_val_raw, y_val = Xtr_f[val_idx], Str[val_idx]

        std   = fit_std(X_tr_raw)
        X_tr  = std.transform(X_tr_raw)
        X_val = std.transform(X_val_raw)

        D_fold = X_tr.shape[1]

        # training model
        model = SoftmaxFwd(D_fold, C, T_CIFAR, cfg)
        res = model.fit(X_tr, y_tr)

        # training metrics
        p_tr = model.predict_proba(X_tr)
        tr_loss = forward_loss(p_tr, y_tr, T_CIFAR) + 0.5 * cfg.wd * (sla.norm(model.W, 'fro')**2)
        y_pred_tr = model.predict(X_tr)
        tr_acc = (y_pred_tr == y_tr).mean()

        # validation metrics
        p_val = model.predict_proba(X_val)
        vloss = forward_loss(p_val, y_val, T_CIFAR) + 0.5 * cfg.wd * (sla.norm(model.W, 'fro')**2)
        y_pred_val = model.predict(X_val)
        val_acc = (y_pred_val == y_val).mean()

        fold_train_losses.append(float(tr_loss))
        fold_val_losses.append(float(vloss))
        fold_train_accs.append(float(tr_acc))
        fold_val_accs.append(float(val_acc))

    # each fold mean and std
    mean_tr_loss = float(np.mean(fold_train_losses))
    std_tr_loss  = float(np.std(fold_train_losses, ddof=1)) if len(fold_train_losses) > 1 else 0.0

    mean_vloss   = float(np.mean(fold_val_losses))
    std_vloss    = float(np.std(fold_val_losses, ddof=1)) if len(fold_val_losses) > 1 else 0.0

    mean_tr_acc  = float(np.mean(fold_train_accs))
    std_tr_acc   = float(np.std(fold_train_accs, ddof=1)) if len(fold_train_accs) > 1 else 0.0

    mean_vacc    = float(np.mean(fold_val_accs))
    std_vacc     = float(np.std(fold_val_accs, ddof=1)) if len(fold_val_accs) > 1 else 0.0

    cv_summaries.append(
        (cfg,
         mean_tr_loss, std_tr_loss,
         mean_vloss,   std_vloss,
         mean_tr_acc,  std_tr_acc,
         mean_vacc,    std_vacc)
    )

    print(
        f"  [p@T] wd={cfg.wd:g}, it={cfg.max_iter} | "
        f"train_loss={mean_tr_loss:.4f} (±{std_tr_loss:.4f}), "
        f"val_loss={mean_vloss:.4f} (±{std_vloss:.4f}), "
        f"train_acc={mean_tr_acc*100:.2f}% (±{std_tr_acc*100:.2f}%), "
        f"val_acc={mean_vacc*100:.2f}% (±{std_vacc*100:.2f}%)"
    )

# use mean validation loss choose best cfg ===

(best_cfg,
 best_mean_trloss, best_std_trloss,
 best_mean_vloss,  best_std_vloss,
 best_mean_tracc,  best_std_tracc,
 best_mean_vacc,   best_std_vacc) = min(cv_summaries, key=lambda x: x[3])

# Results Summary
results = []
for (cfg,
     mean_trloss, std_trloss,
     mean_vloss,  std_vloss,
     mean_tracc,  std_tracc,
     mean_vacc,   std_vacc) in cv_summaries:

    results.append({
              "wd": cfg.wd,
              "iters": "N/A",
              "train_loss": mean_trloss,
              "train_loss_std": std_trloss,
              "val_loss": mean_vloss,
              "val_loss_std": std_vloss,
              "train_acc": mean_tracc,
              "train_acc_std": std_tracc,
              "val_acc": mean_vacc,
              "val_acc_std": std_vacc,
})

print(
    f"** Best (by 10-fold mean val loss): [p@T] wd={best_cfg.wd:g}, it={best_cfg.max_iter} | "
    f"train_loss={best_mean_trloss:.4f} (±{best_std_trloss:.4f}), "
    f"val_loss={best_mean_vloss:.4f} (±{best_std_vloss:.4f}), "
    f"train_acc={best_mean_tracc*100:.2f}% (±{best_std_tracc*100:.2f}%), "
    f"val_acc={best_mean_vacc*100:.2f}% (±{best_std_vacc*100:.2f}%)"
)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).

==== Dataset: /content/drive/MyDrive/datasets/CIFAR.npz ====
Tuned configs with 10-fold CV (forward loss):
  [p@T] wd=0.0001, it=500 | train_loss=1.0777 (±0.0007), val_loss=1.0999 (±0.0020), train_acc=49.89% (±0.55%), val_acc=35.70% (±1.36%)
  [p@T] wd=0.001, it=500 | train_loss=1.0838 (±0.0001), val_loss=1.1010 (±0.0016), train_acc=47.86% (±0.28%), val_acc=35.31% (±1.03%)
  [p@T] wd=0.05, it=500 | train_loss=1.0938 (±0.0001), val_loss=1.0960 (±0.0012), train_acc=38.07% (±0.18%), val_acc=36.41% (±1.56%)
  [p@T] wd=0.01, it=500 | train_loss=1.0912 (±0.0001), val_loss=1.0965 (±0.0012), train_acc=40.43% (±0.13%), val_acc=36.23% (±1.18%)
  [p@T] wd=0.5, it=500 | train_loss=1.0966 (±0.0001), val_loss=1.0971 (±0.0008), train_acc=36.70% (±0.20%), val_acc=36.25% (±1.31%)
  [p@T] wd=0.1, it=500 | train_loss=1.0948 (±0.0001), val_loss=1.0962 (±0.0011), train_acc=37.69

## 6. Main Execution Function

In [None]:
def train_config_forward_once(
    dataset_path: str,
    cfg: FwdCfg,
    tr_idx: np.ndarray = None,
    va_idx: np.ndarray = None,
    seed: int = 42,
):

    set_seed(seed)

    # load data
    d = np.load(dataset_path)
    Xtr, Str, Xts, Yts = d["Xtr"], d["Str"], d["Xts"], d["Yts"]

    # import T
    if "FashionMNIST0.3" in dataset_path:
        T = np.array([
            [0.7, 0.3, 0.0],
            [0.0, 0.7, 0.3],
            [0.3, 0.0, 0.7]
        ], dtype=np.float32)
    elif "FashionMNIST0.6" in dataset_path:
        T = np.array([
            [0.4, 0.3, 0.3],
            [0.3, 0.4, 0.3],
            [0.3, 0.3, 0.4]
        ], dtype=np.float32)
    elif "CIFAR" in dataset_path:
        T = np.array([
            [0.3753, 0.3311, 0.2935],
            [0.3028, 0.3493, 0.3479],
            [0.3277, 0.2998, 0.3725]
        ], dtype=np.float64)
    else:
        raise ValueError(f"Unrecognized dataset_path: {dataset_path}")

    C = T.shape[0]

    # split the data if no idx input
    if tr_idx is None or va_idx is None:
        from sklearn.model_selection import train_test_split
        tr_idx, va_idx = train_test_split(
            np.arange(len(Str)), test_size=0.1, stratify=Str, random_state=seed
        )

    # split the data base on idx
    X_train, y_train = Xtr[tr_idx], Str[tr_idx]
    X_val, y_val = Xtr[va_idx], Str[va_idx]

    # ==== flatten & standardization ====
    X_train_f = flatten(X_train)
    X_val_f   = flatten(X_val)
    X_test_f  = flatten(Xts)

    std = fit_std(X_train_f)
    X_train_std = std.transform(X_train_f)
    X_val_std   = std.transform(X_val_f)
    X_test_std  = std.transform(X_test_f)

    D = X_train_std.shape[1]

    # Training
    model = SoftmaxFwd(D, C, T, cfg)
    res = model.fit(X_train_std, y_train)

    # Validation
    p_val = model.predict_proba(X_val_std)
    base_loss = forward_loss(p_val, y_val, T)
    reg_loss  = 0.5 * cfg.wd * (sla.norm(model.W, 'fro') ** 2)
    val_loss  = base_loss + reg_loss
    val_acc   = accuracy(y_val, model.predict(X_val_std))

    # testing
    test_acc  = accuracy(Yts, model.predict(X_test_std))

    return val_loss, val_acc, test_acc, model, std

