In [None]:
!pip install MOSEK

Collecting MOSEK
  Downloading mosek-11.0.16-cp39-abi3-manylinux2014_x86_64.whl.metadata (698 bytes)
Downloading mosek-11.0.16-cp39-abi3-manylinux2014_x86_64.whl (14.7 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m14.7/14.7 MB[0m [31m84.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: MOSEK
Successfully installed MOSEK-11.0.16


Base

In [None]:
%%writefile base.py
from typing import Tuple, List, Union
import numpy as np
import numpy.linalg as npl

vector = Union[np.ndarray]
entry_matrix = Tuple[int, int, float]

class Dataset:

    def __init__(self,
                 obs: List[entry_matrix],
                 shape: Tuple[int, int]):

        self.obs = obs
        self.shape = shape
        # self.num_observations: int = len(obs)

        self.count_mat: vector = np.zeros(shape)
        self.obs_mat: vector = np.zeros(shape)
        self.num_observed_entries: int = 0

        self.update_observations()

    def update_observations(self):
        for ob in self.obs:
            row_idx, col_idx, value = ob
            self.obs_mat[row_idx, col_idx] += value
            self.count_mat[row_idx, col_idx] += 1
        self.num_observed_entries = np.count_nonzero(self.count_mat)

    def loss(self,
             n_estimator: vector,
             alphas: Union[float, list[float], np.ndarray],
             weight_quad_loss: vector,
             weight_nuc_norm: vector) -> np.ndarray:

        if isinstance(alphas, float):
            alphas = [alphas]

        alphas = np.array(alphas)

        nuc_norm = npl.norm(n_estimator, 'nuc')

        rss = self.get_rss(n_estimator, weight_quad_loss, weight_nuc_norm)
        reg = nuc_norm

        return rss + alphas * reg

    def get_rss(self,
                n_estimator: vector,
                weight_quad_loss: vector,
                weight_nuc_norm: vector) -> float:

        obs_loc = np.nonzero(self.count_mat)

        product_1 = np.multiply(np.sqrt(self.count_mat[obs_loc]), 1./np.sqrt(weight_nuc_norm[obs_loc]))

        product_2 = np.multiply(product_1, n_estimator[obs_loc])

        difference = product_2 - np.multiply(self.obs_mat[obs_loc], 1./np.sqrt(self.count_mat[obs_loc]))
        product_3 = np.multiply(np.sqrt(weight_quad_loss[obs_loc]), difference)

        return 1 / (2 * self.num_observed_entries) * npl.norm(product_3) ** 2

    def get_rss_grad(self,
                     n_estimator: vector,
                     weight_quad_loss: vector,
                     weight_nuc_norm: vector) -> vector:

        operand_1 = np.multiply(self.count_mat, np.multiply(1. / weight_nuc_norm, n_estimator))
        operand_2 = np.multiply(1. / np.sqrt(weight_nuc_norm), self.obs_mat)
        difference = operand_1 - operand_2
        return (1 / self.num_observed_entries) * np.multiply(weight_quad_loss, difference)

    def get_init_step_size(self,
                           weight_quad_loss: vector,
                           weight_nuc_norm: vector) -> float:
        product = np.multiply(1. / np.sqrt(weight_nuc_norm), np.sqrt(self.count_mat))
        maximum_element = np.max(np.multiply(product, np.sqrt(weight_quad_loss)))
        init_step_size = self.num_observed_entries / (maximum_element ** 2)

        return init_step_size

    def get_marginal_probabilities(self):
        row_marginal = self.count_mat.sum(axis=1)/self.count_mat.sum()
        col_marginal = self.count_mat.sum(axis=0)/self.count_mat.sum()

        return np.outer(row_marginal, col_marginal)

Writing base.py


WMC

In [None]:
%%writefile wmc.py
# wmc.py  —  Weight‑Matrix Completion / NU‑Recommend solver
# --------------------------------------------------------------------
from typing import Optional, Tuple, List, Union, Any
from collections import namedtuple, defaultdict

import numpy as np
import numpy.linalg as npl
import cvxpy as cvx

from base import Dataset

vector = Union[np.ndarray]
entry_matrix = Tuple[int, int, float]

SolverMetrics = namedtuple('SolverMetrics',
                           ['loss', 'difference_norm', 'old_norm'])

DEFAULT_XTOL = 1e-5


class WmcImpute:
    """
    Main solver class implementing
      • margin‑weighted trace‑norm minimisation
      • NU‑Recommend (weight derived via Eq.(6))
      • free/uniform baseline
    """

    # ----------------------------------------------------------------
    def __init__(
        self,
        shape: Tuple[int, int],
        weight_quad_loss: Optional[vector] = None,
        weight_nuc_norm: Optional[vector] = None,
        metrics: Optional['Metric'] = None
    ):
        if metrics is None:
            metrics = Metric()

        self.metrics = metrics
        self.shape   = shape

        # quadratic‑loss weights (IPW)
        if weight_quad_loss is None:
            self.weight_quad_loss = np.ones(shape)
        else:
            self.weight_quad_loss = weight_quad_loss

        # nuclear‑norm weights  (default: uniform scaling 1/(d·d))
        if weight_nuc_norm is None:
            self.weight_nuc_norm = np.ones(shape) / (shape[0] * shape[1])
        else:
            self.weight_nuc_norm = weight_nuc_norm

        # step‑size / continuation params
        self.init_step_size: float = 0.0
        self.line_search_shrinkage_amount: float = 1 / 1.2   # ← β

        # iterate storage
        self.n_old: Optional[vector] = None
        self.n_new: Optional[vector] = None
        self._init_n()

        self.xtol: float = DEFAULT_XTOL

    # ----------------------------------------------------------------
    #           -----   weight construction for NU‑Recommend  -----
    # ----------------------------------------------------------------
    def set_weight_nuc_norm_for_nu_rec(self,
                                       b_hat,
                                       p_hat,
                                       ell,
                                       gamma):
        """Compute W = Q^2 via Eq.(6) and store as weight_nuc_norm."""
        self.weight_nuc_norm = self.compute_weight(b_hat, p_hat,
                                                   ell, gamma)

    @staticmethod
    def compute_weight(b_hat,
                       p_hat,
                       ell: float,
                       gamma: float):
        """
        Solve
            minimise    ‖ B_hat ⊙ Q ‖_*
            subject to  sqrt(P_hat)/ℓ ≤ Q ≤ ℓ·sqrt(P_hat)
                        ‖ B_hat ⊙ Q ‖_∞ ≤ γ
        Return W = Q ⊙ Q.

        We call MOSEK with tight tolerances so results match
        MATLAB / CVX (≈ 1e‑8 rel‑gap).
        """
        import numpy as np

        # decision variable
        n1, n2 = b_hat.shape
        q = cvx.Variable((n1, n2))

        sqrt_p = np.sqrt(p_hat)
        BQ     = cvx.multiply(b_hat, q)

        constraints = [
            q >= cvx.multiply(sqrt_p, 1 / ell),
            q <= cvx.multiply(sqrt_p, ell),
            cvx.norm(BQ, "inf") <= gamma
        ]

        prob = cvx.Problem(cvx.Minimize(cvx.normNuc(BQ)), constraints)

        mosek_tol = {
            'MSK_DPAR_INTPNT_CO_TOL_REL_GAP': 1e-8,
            'MSK_DPAR_INTPNT_CO_TOL_PFEAS'  : 1e-9,
            'MSK_DPAR_INTPNT_CO_TOL_DFEAS'  : 1e-9
        }

        prob.solve(solver=cvx.MOSEK,
                   mosek_params=mosek_tol,
                   verbose=False)

        if q.value is None:
            raise RuntimeError("compute_weight: MOSEK failed — no Q.")

        return q.value ** 2   # element‑wise square → W matrix

    # ----------------------------------------------------------------
    #           ---------   proximal‑gradient solver   ---------
    # ----------------------------------------------------------------
    def _init_n(self):
        self.n_old = None
        self.n_new = self.zero()

    def zero(self):
        return np.zeros(self.shape)

    # --------------  public API -----------------
    def impute(self, ds: Dataset, **kwargs):
        assert 'alpha' in kwargs
        alpha_min = kwargs['alpha']

        num_alpha = kwargs.get('num_alpha', 30)
        alphas = self.get_alpha_seq(ds, alpha_min, num_alpha)
        return self.fit(ds, alphas, **kwargs)[-1]

    # --------------  helper routines  --------------
    def get_alpha_seq(self, ds: Dataset,
                      alpha_min: float,
                      num_alpha: int) -> List[float]:
        alpha = self.alpha_max(ds)
        while alpha_min >= alpha:
            alpha_min *= 0.1
        return list(np.logspace(np.log10(alpha),
                                np.log10(alpha_min), num_alpha))

    def alpha_max(self, ds: Dataset) -> float:
        grad = ds.get_rss_grad(self.zero(),
                               self.weight_quad_loss,
                               self.weight_nuc_norm)
        return npl.norm(grad, 2)

    # --------------  main fit loop  ----------------
    def fit(self,
            ds: Dataset,
            alphas: List[float],
            max_iterations: int = 5000,
            **kwargs) -> List[np.ndarray]:
        self._prefit(ds, **kwargs)
        self._init_n()

        bs: List[np.ndarray] = []
        for alpha in alphas:
            for _ in range(max_iterations):
                metrics = self.update_once(ds, alpha)
                if self.should_stop(metrics):
                    break
            bs.append(np.multiply(self.n_new,
                                  1 / np.sqrt(self.weight_nuc_norm)))
        return bs

    def _prefit(self, ds: Dataset, **kwargs):
        self.init_step_size = kwargs.get(
            'init_step_size',
            ds.get_init_step_size(self.weight_quad_loss,
                                  self.weight_nuc_norm))
        self.xtol = kwargs.get('xtol', DEFAULT_XTOL)

    # --------------  one PGD iteration  ------------
    def update_once(self, ds: Dataset, alpha: float) -> SolverMetrics:
        n_old   = self.n_new
        step_sz = self.get_step_size(ds, alpha)
        gen_grad = self.get_generalized_grad(ds, alpha, step_sz)
        self.n_new = n_old - step_sz * gen_grad

        return SolverMetrics(
            loss=ds.loss(self.n_new, alpha,
                         self.weight_quad_loss, self.weight_nuc_norm),
            difference_norm=npl.norm(n_old - self.n_new),
            old_norm=npl.norm(n_old)
        )


    def get_step_size(self, ds: Dataset, alpha: float):
        step_size = self.init_step_size
        beta = self.line_search_shrinkage_amount

        MAX_STEP_SIZE = 1e307

        if self.should_backtrack(ds, alpha, step_size):
            while self.should_backtrack(ds, alpha, step_size):
                step_size *= beta
                if step_size > MAX_STEP_SIZE:
                    step_size = MAX_STEP_SIZE
                    break
        else:
            while not self.should_backtrack(ds, alpha, step_size):
                step_size /= beta
                if step_size > MAX_STEP_SIZE:
                    step_size = MAX_STEP_SIZE
                    break
            step_size *= beta
            if step_size > MAX_STEP_SIZE:
                step_size = MAX_STEP_SIZE

        return step_size

    def should_backtrack(self, ds: Dataset, alpha: float, step_size: float) -> bool:
        rss_grad = ds.get_rss_grad(self.n_new, self.weight_quad_loss, self.weight_nuc_norm)
        generalized_grad = self.get_generalized_grad(ds, alpha, step_size)

        n_new = self.n_new
        n_new_next = n_new - step_size * generalized_grad

        rss = ds.get_rss(n_new, self.weight_quad_loss, self.weight_nuc_norm)
        rss_next = ds.get_rss(n_new_next, self.weight_quad_loss, self.weight_nuc_norm)

        term_2 = step_size * np.sum(np.multiply(rss_grad, generalized_grad))
        term_3 = step_size/2 * (npl.norm(generalized_grad) ** 2)
        threshold = rss - term_2 + term_3
        return rss_next > threshold

    def get_generalized_grad(self, ds: Dataset, alpha: float, step_size: float) -> np.ndarray:
        rss_grad = ds.get_rss_grad(self.n_new, self.weight_quad_loss, self.weight_nuc_norm)
        grad_descent = self.n_new - step_size * rss_grad
        u, s, vh = np.linalg.svd(grad_descent, full_matrices=False)
        s_threshold = np.maximum(s - step_size * alpha, 0)
        numerator = self.n_new - np.dot(u * s_threshold, vh)
        return numerator / step_size

    def should_stop(self, metrics: Any) -> bool:
        difference_norm = metrics.difference_norm
        old_norm = metrics.old_norm
        if difference_norm < self.xtol * max(1, old_norm):
            return True
        return False

    def update_metrics(self, alphas, bs, b, p):
        for i, alpha in enumerate(alphas):
            b_hat = bs[i]
            l2_error = npl.norm(np.multiply(np.sqrt(p), b_hat - b)) / npl.norm(np.multiply(np.sqrt(p), b))
            frobenius_error = npl.norm(b_hat - b) / npl.norm(b)
            self.metrics.add_errors(alpha, l2_error, frobenius_error)


class Metric:
    def __init__(self):
        self.alphas = []
        self.l2_errors = []
        self.frobenius_errors = []
        self.best_b_hat = None

    def add_errors(self, alpha, l2_error, frobenius_error):
        self.alphas.append(alpha)
        self.l2_errors.append(l2_error)
        self.frobenius_errors.append(frobenius_error)

    def get_best_b_hat(self, bs):
        return bs[np.argmin(self.l2_errors)]

    def __len__(self):
        return len(self.alphas)

    def __getitem__(self, i):
        return defaultdict(alpha=self.alphas[i], l2_error=self.l2_errors[i], frobenius_error=self.frobenius_errors[i])

    def __iter__(self):
        def _iter():
            for i in range(len(self)):
                yield self[i]
        return _iter()


Writing wmc.py


Utils

In [None]:
%%writefile utils.py
import numpy as np
import numpy.random as npr
from typing import Tuple, List
import random

def check_random_state(random_state):
    if random_state is None:
        return npr.get_state()
    elif isinstance(random_state, int):
        return npr.RandomState(random_state)
    else:
        return random_state


def random_state_generator(seed):

    def _random_state_gen(_seed):
        while True:
            yield npr.RandomState(_seed)
            _seed += 1

    gen = _random_state_gen(seed)

    def _next_random_state():
        return next(gen)

    return _next_random_state


class MetricAggregator:

    def __init__(self):
        self.m0 = 0
        self.m1 = 0
        self.m2 = 0

    def get_confidence_band(self):

        m0 = self.m0
        m1 = self.m1
        m2 = self.m2
        m0 = np.maximum(m0, 1)

        mean = m1 / m0
        var = (m2 - m1 ** 2 / m0) / (m0 - 1)
        sd = var ** 0.5
        se = (var / m0) ** 0.5

        return mean, sd, se

    def aggregate(self, x):
        self.m0 += 1
        self.m1 += x
        self.m2 += x ** 2

Writing utils.py


Experiments

In [None]:
%%writefile experiments.py
from typing import Dict
from argparse import ArgumentParser
import numpy as np
import random
import os
import matplotlib.pyplot as plt
from base import Dataset
from wmc import Metric
from utils import random_state_generator, MetricAggregator

def create_free(shape):
    from wmc import WmcImpute
    solver = WmcImpute(shape)

    return solver


def create_margin(shape, margin_weight):
    from wmc import WmcImpute
    solver = WmcImpute(shape, weight_nuc_norm=margin_weight)
    return solver


def create_nu_recommend(shape):
    from wmc import WmcImpute
    solver = WmcImpute(shape)

    return solver


def run_once(run_seed, run, shape, r, sd, num_samples, next_random_state, ell, gamma) -> Dict[str, Metric]:

    output_dir = f'out/dr{shape[0]}dc{shape[1]}r{r}sd{sd}'
    os.makedirs(output_dir, exist_ok=True)

    state = next_random_state()
    row_dimension, col_dimension = shape
    bl = state.rand(row_dimension, r)
    br = state.rand(col_dimension, r)
    b = bl @ br.T

    state = next_random_state()
    pl = state.rand(row_dimension, r)
    pr = state.rand(col_dimension, r)
    p = pl @ pr.T
    p = p/p.sum()

    obs = []

    state = next_random_state()
    counts = state.multinomial(num_samples, p.flatten())

    for index, count in enumerate(counts):
        # if count == 0:
        #     print('count is 0')
        for _ in range(count):
            row_ind = int(index // col_dimension)
            col_ind = int(index % col_dimension)
            obs.append((row_ind, col_ind, b[row_ind][col_ind] + state.normal(scale=sd)))

    # row_idx = list(npr.randint(row_dimension, size=num_samples))
    # col_idx = list(npr.randint(col_dimension, size=num_samples))
    # for i in range(num_samples):
    #     obs.append((row_idx[i], col_idx[i], b[row_idx[i]][col_idx[i]] + npr.normal(scale=sd)))

    ds = Dataset(obs, shape=shape)

    margin_weight = ds.get_marginal_probabilities()

    algorithms = {
        # 'free': create_free(shape),
        'margin': create_margin(shape, margin_weight),
        'nu-recommend': create_nu_recommend(shape),
    }

    # run the algorithms
    for name, alg in algorithms.items():
        print('current alg is', name)
        alphas = alg.get_alpha_seq(ds, alpha_min=0.01, num_alpha=30)

        if 'nu-recommend' in name:
            b_hat_margin = np.loadtxt(f'{output_dir}/marginseed{run_seed}run{run}obs{num_samples}.txt')
            alg.set_weight_nuc_norm_for_nu_rec(b_hat_margin, margin_weight, ell, gamma)

        bs = alg.fit(ds, alphas)
        alg.update_metrics(alphas, bs, b, p)
        np.savetxt(f'{output_dir}/{name}seed{run_seed}run{run}obs{num_samples}.txt', alg.metrics.get_best_b_hat(bs))

    return {name: alg.metrics for name, alg in algorithms.items()}


def run_all(seed, n_run, shape, r, sd, num_samples, ell, gamma, output=None, pattern=None):
    aggs = {}

    for run in range(n_run):

        print('current run is', run)
        run_seed = 1000 * (seed + run)
        next_random_state = random_state_generator(run_seed)

        metrics = run_once(run_seed, run, shape, r, sd, num_samples, next_random_state, ell, gamma)

        for name, metric in metrics.items():
            if name not in aggs:
                aggs[name] = tuple(MetricAggregator() for _ in range(2))

            aggs[name][0].aggregate(np.min(metric.l2_errors))
            aggs[name][1].aggregate(np.min(metric.frobenius_errors))

            if output is not None:
                for t in metric:
                    print(pattern.format(run=run, run_seed=run_seed, num_samples=num_samples, **t), file=output)

    x = 1
    fig = plt.figure()
    for name, agg in aggs.items():
        y, _, yerr = agg[0].get_confidence_band()
        plt.errorbar(2*x, y, yerr=yerr, label=f'{name}', marker='s')
        x += 1

    plt.title('l2 error')
    plt.legend()
    plt.show()

    fig = plt.figure()
    for name, agg in aggs.items():
        y, _, yerr = agg[0].get_confidence_band()
        plt.errorbar(2 * x, y, yerr=yerr, label=f'{name}', marker='s')
        x += 1

    plt.title('frobenius error')
    plt.legend()
    plt.show()


def __main__():
    parser = ArgumentParser()
    parser.add_argument('--shape', nargs='+', type=int, default=(100, 100))
    parser.add_argument('--r', type=int, default=20)
    parser.add_argument('--sd', type=float, default=1)
    parser.add_argument('--obs', type=int, default=2000)
    parser.add_argument('--run', type=int, default=2)
    parser.add_argument('--seed', type=int, default=1)
    parser.add_argument('--ell', type=float, default=3)
    parser.add_argument('--gamma', type=float, default=3)
    # parser.add_argument('--name', type=str, default='output')
    parser.add_argument('--name', type=str, default=None)

    args = parser.parse_args()

    if args.name is not None:
        columns = ['run', 'run_seed', 'num_samples', 'alpha', 'l2_error', 'frobenius_error']

        header = ','.join(columns)
        pattern = ','.join([f'{{{col}}}' for col in columns])

        os.makedirs('out', exist_ok=True)

        with open(f'out/{args.name}.csv', 'w') as output:
            print(header, file=output)
            run_all(args.seed, args.run, args.shape, args.r, args.sd, args.obs, args.ell, args.gamma, output, pattern)
    else:
        run_all(args.seed, args.run, args.shape, args.r, args.sd, args.obs, args.ell, args.gamma)


if __name__ == '__main__':
    __main__()

Writing experiments.py


Lab Test Data Process

In [None]:
# ===============================================================
#  Real-lab data ▸  Margin  vs  NU-Recommend   (ℓ = 2, γ = 6)
#  Outputs per seed
#      • test-RMSE  (log-space)
#      • test rel-Frobenius error
#      • best α_margin , best α_NU
# ===============================================================

import pandas as pd, numpy as np, cvxpy as cp, time
from base import Dataset
from wmc  import WmcImpute

RAW_CSV  = '/content/anonymized2_1Year.csv'
SEEDS    = [0]                     # change to range(10) etc.
VAL_FRAC = 0.10;  TEST_FRAC = 0.10
ELL, GAMMA = 2.0, 6.0
NUM_ALPHA  = 30

# ---------- load, filter, pivot ---------------------------------
data  = pd.read_csv(RAW_CSV)
pat_ct= data.groupby('myPatID').size()
lab_ct= data.groupby('myCID'  ).size()
mask  = (pat_ct[data['myPatID']] >110)&(pat_ct[data['myPatID']]<4000)&\
        (lab_ct[data['myCID'  ]] >1000)
mat_raw = ( np.log1p(
              data.loc[mask]
                  .pivot_table(index='myCID', columns='myPatID',
                               values='LV', aggfunc='mean')
           ).to_numpy(dtype='float64') )

rows_all, cols_all = np.where(~np.isnan(mat_raw))
vals_all           = mat_raw[rows_all, cols_all]
M_fro_sq           = np.nansum(mat_raw**2)

# ===============================================================
def run_one_seed(seed):
    rng = np.random.default_rng(seed)
    m   = len(vals_all)

    perm      = rng.permutation(m)
    n_test    = int(TEST_FRAC * m)
    n_val     = int(VAL_FRAC  * m)

    idx_test  = perm[:n_test]
    idx_val   = perm[n_test : n_test+n_val]
    idx_train = perm[n_test+n_val:]

    # ----- μ,σ on TRAIN slice only -----
    col_mu = np.nanmean(mat_raw[:, cols_all[idx_train]], axis=1)
    col_sd = np.nanstd (mat_raw[:, cols_all[idx_train]], axis=1)
    col_sd[col_sd==0] = 1
    mat = (mat_raw - col_mu[:,None]) / col_sd[:,None]

    def ds_from(idx):
        return Dataset(list(zip(rows_all[idx], cols_all[idx],
                                mat[rows_all[idx], cols_all[idx]])),
                       mat.shape)

    ds_train = ds_from(idx_train)
    rv_r, rv_c = rows_all[idx_val],  cols_all[idx_val]
    rv_v      = mat[rv_r, rv_c]
    rt_r, rt_c= rows_all[idx_test], cols_all[idx_test]
    rt_v_raw  = mat_raw[rt_r, rt_c]

    # ---------- Margin baseline ----------
    w_mar = ds_train.get_marginal_probabilities()
    mar   = WmcImpute(mat.shape, weight_nuc_norm=w_mar)
    alpha_seq = mar.get_alpha_seq(ds_train, 0.01, NUM_ALPHA)
    B_mar_path = mar.fit(ds_train, alpha_seq)
    rmse_val = [np.sqrt(np.mean((B[rv_r,rv_c]-rv_v)**2)) for B in B_mar_path]
    best_i   = int(np.argmin(rmse_val))
    B_mar    = B_mar_path[best_i]
    alpha_mar_best = alpha_seq[best_i]

    # ---------- NU-Recommend ----------
    nu = WmcImpute(mat.shape)
    nu.set_weight_nuc_norm_for_nu_rec(B_mar, w_mar, ell=ELL, gamma=GAMMA)
    B_nu_path = nu.fit(ds_train, alpha_seq)
    rmse_val_nu = [np.sqrt(np.mean((B[rv_r,rv_c]-rv_v)**2)) for B in B_nu_path]
    best_j   = int(np.argmin(rmse_val_nu))
    B_nu     = B_nu_path[best_j]
    alpha_nu_best = alpha_seq[best_j]

    # ---------- re-scale and score ----------
    def score(B_z):
        B_log = B_z*col_sd[:,None] + col_mu[:,None]
        rmse  = np.sqrt(np.mean((B_log[rt_r,rt_c]-rt_v_raw)**2))
        relF  = np.sqrt(np.sum((B_log[rt_r,rt_c]-rt_v_raw)**2)) / np.sqrt(M_fro_sq)
        return rmse, relF

    return score(B_mar), score(B_nu), alpha_mar_best, alpha_nu_best

# ===============================================================
print("seed | RMSE_m  RMSE_nu | relF_m relF_nu | α_m  α_nu")
for s in SEEDS:
    (rm_m,rf_m),(rm_nu,rf_nu),a_m,a_nu = run_one_seed(s)
    print(f"{s:4} | {rm_m:6.4f}  {rm_nu:6.4f} | {rf_m:.4f} {rf_nu:.4f} | {a_m:.3g} {a_nu:.3g}")
