In [None]:
%%writefile eig1.py

"""
eig1.py

Port of MATLAB function eig1.m to Python (NumPy).

Usage:
    eigvec, eigval, eigval_full = eig1(A, c=None, isMax=True, isSym=True)

Inputs:
    A       : (n, n) array-like square matrix
    c       : int, number of eigenpairs to return (default: n)
    isMax   : bool, if True return largest eigenvalues, else smallest (default: True)
    isSym   : bool, if True treat A as (elementwise) symmetric by doing np.maximum(A, A.T).
              If A is truly symmetric, set True to use the faster symmetric solver.

Returns:
    eigvec      : (n, c) array with eigenvectors corresponding to `eigval` (columns)
    eigval      : (c,) array of selected eigenvalues
    eigval_full : (n,) array of all eigenvalues (unsorted)

Notes:
    - For symmetric matrices we use numpy.linalg.eigh (real symmetric).
      For general matrices we use numpy.linalg.eig.
    - Behavior mirrors original MATLAB code:
        * if c is None -> set to n
        * if c > n -> set to n
        * if isSym True -> A := np.maximum(A, A.T) (elementwise max), matching MATLAB's max(A,A')
        * if isMax True -> sort eigenvalues in descending order (largest first)
          else -> ascending (smallest first)
    - For very large/sparse matrices consider using scipy.sparse.linalg.eigs / eigsh.
"""

import numpy as np
from typing import Tuple, Optional

def eig1(A, c: Optional[int] = None, isMax: bool = True, isSym: bool = True
        ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    A = np.asarray(A)
    if A.ndim != 2 or A.shape[0] != A.shape[1]:
        raise ValueError("Input A must be a square matrix (n x n).")

    n = A.shape[0]

    # default c
    if c is None:
        c = n
    if c > n:
        c = n
    # ensure booleans
    isMax = bool(isMax)
    isSym = bool(isSym)

    # If requested, symmetrize similar to MATLAB's max(A, A')
    if isSym:
        # MATLAB used elementwise max(A, A'), replicating that behavior:
        A = np.maximum(A, A.T)

    # Choose solver
    if isSym:
        # Use eigh for symmetric/hermitian matrices (real eigenvalues)
        eigvals_full, eigvecs = np.linalg.eigh(A)
        # numpy.linalg.eigh returns eigenvalues in ascending order
    else:
        eigvals_complex, eigvecs_complex = np.linalg.eig(A)
        # handle possibly complex eigenvalues: keep as complex if present
        eigvals_full = eigvals_complex
        eigvecs = eigvecs_complex

    # If used symmetric branch, variable names differ:
    if isSym:
        eigvals_full = eigvals_full  # already set above
        eigvecs = eigvecs

    # Convert to 1-D array
    eigvals_full = np.asarray(eigvals_full).ravel()

    # Sorting indices
    if isMax:
        # descending
        idx_sorted = np.argsort(eigvals_full)[::-1]
    else:
        # ascending
        idx_sorted = np.argsort(eigvals_full)

    # full eigenvalues in Matlab code: d = diag(d); eigval_full = d(idx)
    # MATLAB returns d in the original (unsorted) order as eigval_full = d;
    # To mirror that we will return eigval_full as eigvals_full (unsorted).
    eigval_full = eigvals_full.copy()

    # pick top c indices according to sort
    idx1 = idx_sorted[:c]

    eigval = eigval_full[idx1]

    # eigenvectors columns correspond to eigvals in eigvals_full; pick same columns
    # Note: if eigenvectors array columns correspond to eigvals in the same order,
    # selecting columns by idx1 is correct.
    eigvec = eigvecs[:, idx1]

    # If eigenvalues/eigenvectors are (numerically) real but complex dtype, cast to real
    # if imaginary parts are very small.
    if np.iscomplexobj(eigval) or np.iscomplexobj(eigvec):
        # check small imaginary parts
        if np.allclose(np.imag(eigval), 0, atol=1e-12) and np.allclose(np.imag(eigvec), 0, atol=1e-12):
            eigval = np.real(eigval)
            eigvec = np.real(eigvec)
            eigval_full = np.real(eigval_full)

    return eigvec, eigval, eigval_full





Overwriting eig1.py


In [None]:
%%writefile UpdateS.py



"""
update_s.py

Port of MATLAB UpdateS.m (from "Twin Learning for Similarity and Clustering...") to Python.

Function:
    Z = update_S(K, F, alpha, beta)

For each column j (1..n), we solve the QP:
    min_x 0.5 * x^T H x + ff^T x
    s.t.  sum(x) = 1
          0 <= x_i <= 1  for all i

where:
    H = 2*alpha*I + 2*K   (made symmetric)
    ff = (beta/2) * all_distances - 2 * K[:, j]
    all_distances[i] = || F[j,:] - F[i,:] ||^2

Notes:
 - This implementation uses scipy.optimize.minimize (SLSQP) as a generic QP solver.
 - For better performance on large n, prefer a specialized QP solver (cvxopt, quadprog, OSQP).
 - Returns Z as an (n, n) numpy array where column j is the solution for column j.
"""

import numpy as np
from typing import Optional
from scipy.optimize import minimize

def UpdateS(K: np.ndarray, F: np.ndarray, alpha: float, beta: float,
             tol: float = 1e-8, verbose: bool = False) -> np.ndarray:
    """
    Compute matrix Z by solving n QPs, one for each column.

    Parameters
    ----------
    K : (n, n) array-like
        Kernel / similarity matrix used in the quadratic cost.
    F : (n, d) array-like
        Feature (embedding) matrix. Each row F[i,:] is the feature vector for sample i.
    alpha : float
        Parameter as in the paper.
    beta : float
        Parameter as in the paper.
    tol : float
        Tolerance for solver termination (passed to minimize).
    verbose : bool
        If True, prints progress information.

    Returns
    -------
    Z : (n, n) np.ndarray
        Each column j is the solution vector for the j-th QP.
    """

    K = np.asarray(K, dtype=float)
    F = np.asarray(F, dtype=float)
    if K.ndim != 2 or K.shape[0] != K.shape[1]:
        raise ValueError("K must be a square matrix (n x n).")
    n = K.shape[0]
    if F.shape[0] != n:
        raise ValueError("F must have the same number of rows as K (n).")

    # Precompute H = 2*alpha*I + 2*K (ensure symmetric)
    H = 2.0 * alpha * np.eye(n) + 2.0 * K
    H = 0.5 * (H + H.T)   # symmetrize to reduce numerical issues

    # Precompute pairwise squared norms of F rows maybe helpful for speed
    # But we only need ||F[j] - F[i]||^2 for each j; compute per j below vectorized.

    Z = np.zeros((n, n), dtype=float)

    # constraints: equality sum(x) = 1 and bounds 0<=x<=1
    cons = ({'type': 'eq', 'fun': lambda x: np.sum(x) - 1.0,
             'jac': lambda x: np.ones_like(x)})
    bounds = [(0.0, 1.0) for _ in range(n)]

    # Pre-factor: for faster gradient calc we can reuse H
    # Objective function and gradient for a given ff:
    def _objective(x, H_mat, ff_vec):
        # 0.5 x^T H x + ff^T x
        # grad = H x + ff
        x = np.asarray(x)
        obj = 0.5 * x.dot(H_mat.dot(x)) + ff_vec.dot(x)
        grad = H_mat.dot(x) + ff_vec
        return obj, grad

    # Use uniform initial guess
    x0 = np.full(n, 1.0 / n)

    for j in range(n):
        # compute all squared distances between F[j,:] and every row F[i,:]
        # vectorized: (F - F[j])**2 sum over columns
        diff = F - F[j, :]  # shape (n, d)
        all_sq = np.sum(diff * diff, axis=1)  # shape (n,)

        # ff = beta/2 * all' - 2*K(:,j)
        ff = (beta / 2.0) * all_sq - 2.0 * K[:, j]

        # objective wrapper for scipy.minimize (SLSQP accepts jac)
        def obj_for_min(x):
            val, grad = _objective(x, H, ff)
            return val, grad

        # SciPy minimize expects separate functions for fun and jac OR return both with method='SLSQP'
        def fun(x):
            val, _ = _objective(x, H, ff)
            return val

        def jac(x):
            _, grad = _objective(x, H, ff)
            return grad

        # run SLSQP
        res = minimize(fun, x0, method='SLSQP', jac=jac,
                       bounds=bounds, constraints=cons,
                       options={'ftol': tol, 'maxiter': 1000, 'disp': False})

        if not res.success:
            # If SLSQP failed to converge, we can try a fallback: more iterations or different init
            # Try a different initial point (e.g., projection of -ff onto simplex)
            if verbose:
                print(f"Warning: solver failed at column {j}, message: {res.message}. Trying fallback init.")
            # fallback init: project uniform+small random noise
            x0_fallback = np.clip(x0 + 1e-3 * np.random.randn(n), 0, 1)
            x0_fallback = x0_fallback / np.sum(x0_fallback)
            res2 = minimize(fun, x0_fallback, method='SLSQP', jac=jac,
                            bounds=bounds, constraints=cons,
                            options={'ftol': tol, 'maxiter': 2000, 'disp': False})
            if res2.success:
                sol = res2.x
            else:
                # last resort: project the unconstrained quadratic minimizer on simplex then clip
                # unconstrained solution x* = -H^{-1} ff
                try:
                    sol_unc = -np.linalg.solve(H, ff)
                except np.linalg.LinAlgError:
                    sol_unc = -np.linalg.pinv(H).dot(ff)
                # project onto simplex
                sol_proj = _project_to_simplex(sol_unc)
                # clip to [0,1]
                sol = np.clip(sol_proj, 0.0, 1.0)
                sol = sol / np.sum(sol)  # re-normalize to sum to 1
        else:
            sol = res.x

        Z[:, j] = sol
        # next initial guess = current solution (warm start)
        x0 = sol

    return Z

# helper: projection to simplex (useful fallback)
def _project_to_simplex(v: np.ndarray, z: float = 1.0) -> np.ndarray:
    """
    Euclidean projection of v onto the probability simplex { x : x >= 0, sum(x) = z }.
    Reference: Efficient Projections onto the L1-Ball for Learning in High Dimensions,
               John Duchi et al. (2008).
    """
    v = np.asarray(v, dtype=float).copy()
    n = v.size
    if np.sum(v) == z and np.all(v >= 0):
        return v
    u = np.sort(v)[::-1]
    cssv = np.cumsum(u)
    rho = np.where(u * np.arange(1, n+1) > (cssv - z))[0]
    if rho.size == 0:
        theta = 0.0
    else:
        rho = rho[-1]
        theta = (cssv[rho] - z) / float(rho + 1)
    w = np.maximum(v - theta, 0.0)
    return w

# Optional: fast solver hint using cvxopt (commented)
# If you have cvxopt installed and prefer to use it, you can replace the SLSQP call by cvxopt.solvers.qp.
# Example (not active in this code):
#
# from cvxopt import matrix, solvers
# P = matrix(H)
# q = matrix(ff)
# # equality: A x = b  -> sum x = 1
# A = matrix(np.ones((1,n)))
# b = matrix(1.0)
# # bounds: 0 <= x <= 1  -> Gx <= h
# G = matrix(np.vstack((-np.eye(n), np.eye(n))))
# h = matrix(np.hstack((np.zeros(n), np.ones(n))))
# sol = solvers.qp(P, q, G, h, A, b)
# if sol['status'] == 'optimal':
#     sol_x = np.array(sol['x']).ravel()
# else:
#     handle_failure...
#
# cvxopt is usually faster and more stable for dense QPs than SLSQP for large n.



Overwriting UpdateS.py


In [None]:
%%writefile UpdateW.py



"""
update_P.py

Port of MATLAB UpdateP.m / UpdateW.m to Python (NumPy).

Function:
    D_mat = update_P(Coef, Data, D_mat_init, tol=1e-6, max_iter=100, rho_init=1.0, rate_rho=1.2)

Inputs:
    Coef         : numpy array (d, N)  -- corresponds to 'Coef' (H in your comment)
    Data         : numpy array (m, N)  -- corresponds to 'Data' (M in your comment)
    D_mat_init   : numpy array (m, d)  -- initial W (previous D_Mat)
    tol          : float, stopping threshold on mean squared change (default 1e-6)
    max_iter     : int, maximum iterations (default 100)
    rho_init     : float, initial rho (default 1.0)
    rate_rho     : float, multiplier to increase rho each iteration (default 1.2)

Returns:
    D_mat : numpy array (m, d) -- updated mapping W (TempD)

Notes:
    - This implements the same iteration as the MATLAB code:
        TempD   = (rho*(TempS-TempT) + TempData*TempCoef') * inv(rho*I + TempCoef*TempCoef')
        TempS   = normcol_lessequal(TempD + TempT)
        TempT   = TempT + TempD - TempS
    - normcol_lessequal scales each column to have L2 norm <= 1 (i.e., unit ball projection).
    - Uses np.linalg.solve (via transposed system) instead of explicit inverse for numerical stability.
"""

import numpy as np

def _normcol_lessequal(X: np.ndarray) -> np.ndarray:
    """
    Project each column of X to have l2-norm <= 1.
    If a column's norm > 1, scale it to have norm 1; otherwise keep it unchanged.
    """
    X = np.asarray(X, dtype=float)
    norms = np.linalg.norm(X, axis=0)
    # avoid division by zero
    scale = np.ones_like(norms)
    mask = norms > 1.0
    if np.any(mask):
        scale[mask] = 1.0 / norms[mask]
    # multiply each column by corresponding scalar
    return X * scale

def UpdateW(Coef: np.ndarray, Data: np.ndarray, D_mat_init: np.ndarray,
             tol: float = 1e-6, max_iter: int = 100,
             rho_init: float = 1.0, rate_rho: float = 1.2) -> np.ndarray:
    """
    ADMM-style update for D (W) as in UpdateP/UpdateW.m
    """
    Coef = np.asarray(Coef, dtype=float)   # shape (d, N)
    Data = np.asarray(Data, dtype=float)   # shape (m, N)
    TempS = np.asarray(D_mat_init, dtype=float)  # shape (m, d)

    d = Coef.shape[0]  # number of rows of Coef (dimension d)
    # Validate shapes:
    if Coef.ndim != 2:
        raise ValueError("Coef must be 2D (d x N).")
    if Data.ndim != 2:
        raise ValueError("Data must be 2D (m x N).")
    if TempS.ndim != 2:
        raise ValueError("D_mat_init must be 2D (m x d).")
    if TempS.shape[1] != d:
        raise ValueError("D_mat_init must have number of columns equal to number of rows of Coef (d).")
    if Data.shape[1] != Coef.shape[1]:
        raise ValueError("Data and Coef must have the same number of columns (N).")

    m = Data.shape[0]  # output dimension
    # initialize variables
    TempT = np.zeros_like(TempS)   # Lagrange multiplier-like variable
    previousD = TempS.copy()
    rho = float(rho_init)
    Imat = np.eye(d)

    Iter = 1
    ERROR = 1.0

    # Precompute TempCoef * TempCoef' structure: note it changes not in loop
    # But TempCoef is constant (Coef). So denom = rho*I + Coef @ Coef.T (d x d)
    # However rho changes each iteration, so precompute Coef @ Coef.T once and reuse.
    CoefCoefT = Coef @ Coef.T  # shape (d, d)

    while ERROR > tol and Iter < max_iter:
        # numerator: rho*(TempS - TempT) + Data * Coef'
        numerator = rho * (TempS - TempT) + (Data @ Coef.T)  # shape (m, d)

        # denom: rho*I + Coef*Coef'
        denom = rho * Imat + CoefCoefT  # shape (d, d)
        # Solve for TempD: TempD = numerator * inv(denom)
        # To avoid explicit inverse, we solve: denom^T * X^T = numerator^T  => X = (solve(denom.T, numerator.T)).T
        # This yields TempD of shape (m, d)
        try:
            TempD = np.linalg.solve(denom.T, numerator.T).T
        except np.linalg.LinAlgError:
            # fallback to pseudo-inverse if denom is singular
            denom_inv = np.linalg.pinv(denom)
            TempD = numerator @ denom_inv

        # Update TempS by projecting columns of (TempD + TempT) to have norm <= 1
        TempS = _normcol_lessequal(TempD + TempT)

        # Update TempT
        TempT = TempT + TempD - TempS

        # update rho
        rho = rate_rho * rho

        # compute stopping criterion (mean squared change)
        ERROR = np.mean((previousD - TempD) ** 2)
        previousD = TempD.copy()
        Iter += 1

    D_mat = TempD
    return D_mat



Overwriting UpdateW.py


In [None]:
%%writefile SMR_mtv.py

"""
smr_mtv.py

Port of MATLAB function SMR_mtv (SMR.m) to Python.

Solves the Sylvester equation used in the paper:
    A_syl * H + H * B_syl + C_syl = 0

MATLAB call:
    H = lyap(A_syl, B_syl, C_syl)

We use scipy.linalg.solve_sylvester which solves:
    A * X + X * B = C

So to match MATLAB's lyap we set C_scipy = -C_syl.

Function:
    H = SMR_mtv(M, P, S, alpha)

Inputs:
    M     : (m, N) numpy array
    P     : (N, c) numpy array  (cluster indicator matrix)
    S     : (N, N) numpy array  (similarity matrix)
    alpha : scalar

Output:
    H     : (c, N) numpy array  (solution of the Sylvester equation)
"""

import numpy as np
from scipy.linalg import solve_sylvester
from typing import Any

def SMR_mtv(M: np.ndarray, P: np.ndarray, S: np.ndarray, alpha: float) -> np.ndarray:
    """
    Solve for H such that A_syl * H + H * B_syl + C_syl = 0 where:
        A_syl = P.T @ P        (c x c)
        B_syl = alpha*(I - S) @ (I - S).T   (N x N)
        C_syl = - P.T @ M      (c x N)

    Returns H (c x N).

    Parameters
    ----------
    M : array_like, shape (m, N)
    P : array_like, shape (N, c)
    S : array_like, shape (N, N)
    alpha : float

    Returns
    -------
    H : ndarray, shape (c, N)
    """
    M = np.asarray(M, dtype=float)
    P = np.asarray(P, dtype=float)
    S = np.asarray(S, dtype=float)
    alpha = float(alpha)

    # Basic validation
    if S.ndim != 2 or S.shape[0] != S.shape[1]:
        raise ValueError("S must be a square matrix (N x N).")
    N = S.shape[0]

    if P.ndim != 2 or P.shape[0] != N:
        raise ValueError("P must have shape (N, c) with the same N as S.")
    if M.ndim != 2 or M.shape[1] != N:
        raise ValueError("M must have shape (m, N) with same N as S and P.")

    # Build matrices for Sylvester equation
    A_syl = P.T @ P  # shape (c, c)
    I = np.eye(N)
    diff = I - S
    B_syl = alpha * (diff @ diff.T)  # shape (N, N)
    # In MATLAB: C_syl = -P'*M ; lyap solves A*X + X*B + C = 0
    # scipy.linalg.solve_sylvester solves A*X + X*B = C.
    # Thus we set C_scipy = -C_syl = P' * M
    C_scipy = P.T @ M  # shape (c, N)

    # Solve A_syl * H + H * B_syl = C_scipy
    H = solve_sylvester(A_syl, B_syl, C_scipy)

    return H


Overwriting SMR_mtv.py


In [None]:
%%writefile normcol_lessequal.py


"""
normcol_lessequal.py

Port of MATLAB function normcol_lessequal.m to Python (NumPy).

Behavior:
    matout = argmin ||matout - matin||_F^2  subject to each column norm(matout[:, i]) <= 1

Usage:
    matout = normcol_lessequal(matin)

Inputs:
    matin : (m, n) array-like

Returns:
    matout : (m, n) ndarray with each column scaled to have l2-norm <= 1
"""

import numpy as np

def normcol_lessequal(matin: np.ndarray) -> np.ndarray:
    matin = np.asarray(matin, dtype=float)
    # compute l2 norms of columns
    # sum over rows (axis=0), keepdims for broadcasting
    eps = np.finfo(float).eps
    col_norms = np.sqrt(np.sum(matin * matin, axis=0) + eps)  # shape (n,)
    # denom = max(1, col_norm) per column
    denom = np.maximum(1.0, col_norms)
    # scale columns: matin / denom (broadcast denom over rows)
    matout = matin / denom[np.newaxis, :]
    return matout



Overwriting normcol_lessequal.py


In [None]:
%%writefile MCLES.py



from UpdateS import UpdateS
from UpdateW import UpdateW
from eig1 import eig1


"""
MCLES.py

Python port of MCLES.m (Multi-View Clustering in Latent Embedding Space).

Dependencies:
    numpy, scipy, scikit-learn

Assumed helper functions (place in same folder or importable):
    - update_P(Coef, Data, D_mat_init, ...)      # corresponds to UpdateP.m (UpdateW)
    - update_S(K, F, alpha, beta, ...)           # corresponds to UpdateS.m
    - eig1(A, c=None, isMax=True, isSym=True)    # corresponds to eig1.m

This file implements the top-level MCLES() function and clustering metrics.
"""

import numpy as np
from scipy.linalg import solve_sylvester  # for Sylvester equation
from sklearn.cluster import KMeans
from sklearn.metrics import normalized_mutual_info_score
from scipy.optimize import linear_sum_assignment
from typing import List, Tuple

# If you placed your converted helper functions in other files, import them:
# from update_p import update_P
# from update_s import update_S
# from eig1 import eig1

# If you haven't yet converted them to python files, you can place them as functions
# in this module or update the import paths above.

def clustering_measure(true_labels: np.ndarray, pred_labels: np.ndarray
                       ) -> Tuple[float, float, float]:
    """
    Compute ACC (accuracy via optimal assignment), NMI, PUR (purity).
    true_labels : array-like shape (N,)
    pred_labels : array-like shape (N,)
    Returns (ACC, NMI, PUR)
    """
    true = np.asarray(true_labels).astype(int)
    pred = np.asarray(pred_labels).astype(int)
    N = true.size

    # NMI
    nmi = normalized_mutual_info_score(true, pred)

    # ACC via Hungarian on the contingency matrix
    # Build contingency matrix
    labels_true = np.unique(true)
    labels_pred = np.unique(pred)
    # Map labels to [0..k-1]
    true_map = {lab: i for i, lab in enumerate(labels_true)}
    pred_map = {lab: i for i, lab in enumerate(labels_pred)}
    K_true = labels_true.size
    K_pred = labels_pred.size
    cont = np.zeros((K_true, K_pred), dtype=np.int64)
    for t, p in zip(true, pred):
        cont[true_map[t], pred_map[p]] += 1
    # Hungarian to maximize matching -> minimize -cont
    row_ind, col_ind = linear_sum_assignment(-cont)
    acc = cont[row_ind, col_ind].sum() / N

    # Purity: sum of max intersection per predicted cluster divided by N
    # For each predicted cluster, find the max true label count
    pur = 0
    for j, pj in enumerate(labels_pred):
        idx = (pred == pj)
        if idx.sum() == 0:
            continue
        # get true labels of these points
        counts = np.bincount(true[idx])
        pur += counts.max()
    purity = pur / N

    return acc, nmi, purity


def _solve_sylvester_for_H(M: np.ndarray, P_mat: np.ndarray, S: np.ndarray, alpha: float) -> np.ndarray:
    """
    Solve Sylvester equation A*H + H*B + C = 0 for H, where:
        A = P_mat.T @ P_mat
        B = alpha * (I - S) @ (I - S).T
        C = - P_mat.T @ M

    scipy.linalg.solve_sylvester solves A*X + X*B = C_scipy,
    so we set C_scipy = -C = P_mat.T @ M.

    Note: P_mat must have same number of rows as M (both are m x ...),
    so that P_mat.T @ M has shape (d, N), yielding H shape (d, N).
    """
    # input validation
    M = np.asarray(M, dtype=float)
    P_mat = np.asarray(P_mat, dtype=float)
    S = np.asarray(S, dtype=float)
    N = S.shape[0]
    if M.ndim != 2 or M.shape[1] != N:
        raise ValueError("M must have shape (m, N) where N = number of samples (S.shape[0])")
    if P_mat.shape[0] != M.shape[0]:
        raise ValueError("P (W) must have the same number of rows as M (both are 'm' rows).")

    A_syl = P_mat.T @ P_mat  # (d x d)
    I = np.eye(N)
    diff = I - S
    B_syl = alpha * (diff @ diff.T)  # (N x N)
    C_scipy = P_mat.T @ M  # (d x N)

    H = solve_sylvester(A_syl, B_syl, C_scipy)  # solves A H + H B = C_scipy
    # The MATLAB lyap(A,B,C) used C = -P'*M and solved A*H + H*B + C = 0
    # which is equivalent to A H + H B = P' M  (exactly what we solved).
    return H


def MCLES(X: List[np.ndarray], alpha: float, beta: float, d: int, gamma: float,
          maxIters: int, gt: np.ndarray, verbose: bool = False
         ) -> np.ndarray:
    """
    Main MCLES function translated from MCLES.m

    Inputs:
      X        : list of V views, each view is a numpy array of shape (d_v, N)
      alpha,beta,gamma: parameters (scalars)
      d        : embedding dimension (int)
      maxIters : maximum outer iterations (int)
      gt       : ground-truth labels, array shape (N,)

    Returns:
      result : numpy array [ACC, NMI, PUR]
    """
    # number of classes
    gt = np.asarray(gt)
    C = np.unique(gt).size
    V = len(X)
    N = X[0].shape[1]

    # normalize each view (column-wise)
    X_normed = []
    for v in range(V):
        Xi = np.asarray(X[v], dtype=float)
        # column norms
        col_norms = np.sqrt(np.sum(Xi ** 2, axis=0) + np.finfo(float).eps)
        Xi = Xi / (col_norms[np.newaxis, :])
        X_normed.append(Xi)

    # dimensions and construct M (vertical concatenation)
    Ds = [Xi.shape[0] for Xi in X_normed]
    SD = sum(Ds)
    M = np.vstack(X_normed)   # shape (SD, N)

    # initialize variables
    W = np.zeros((SD, d), dtype=float)      # W (m x d)
    H = np.random.randn(d, N)               # H (d x N)
    S = np.zeros((N, N), dtype=float)       # similarity matrix
    F = np.random.randn(N, C)               # F used in UpdateS distances (N x C)

    # parameters for KMeans
    MAXiter = 1000
    REPlic = 20

    Obj = []
    for it in range(maxIters):
        if verbose:
            print(f"[MCLES] Iter {it+1}/{maxIters}")

        # ------ update W (UpdateW / UpdateP) ------
        # update_P expects: Coef (d x N), Data (m x N), D_mat_init (m x d)
        # In MATLAB: W = UpdateW(H,M,W)
        # So call update_P(H, M, W)
        # NOTE: ensure you have converted UpdateP to update_P and it's importable.
        try:
            # user-provided conversion for UpdateP.m
            W = UpdateW(H, M, W)
        except Exception as e:
            # If update_p is not available, raise clear error
            raise ImportError("UpdateW (converted from UpdateP.m) not found or failed. "
                              "Please place UpdateW in module UpdateW.py. "
                              f"Original error: {e}")

        # ------ update H (SMR_mtv) ------
        # In MATLAB: H = SMR_mtv(M,W,S,alpha);
        # We provide a local Sylvester solver to compute H:
        H = _solve_sylvester_for_H(M, W, S, alpha)  # returns (d, N)

        # ------ update S ------
        # In MATLAB: S = UpdateS(H'*H,F,beta/alpha,gamma/alpha);
        K = H.T @ H   # shape (N, N)
        try:
             # user-provided conversion for UpdateS.m
            S = UpdateS(K, F, beta / alpha, gamma / alpha)
        except Exception as e:
            raise ImportError("UpdateS (converted from UpdateS.m) not found or failed. "
                              "Please place UpdateS in module UpdateS.py. "
                              f"Original error: {e}")

        # ------ update F (spectral embedding from Laplacian) ------
        Z = S
        Z = 0.5 * (Z + Z.T)
        D_diag = np.sum(Z, axis=1)
        D_mat = np.diag(D_diag)
        L = D_mat - Z
        # eig1: eigvec, eigval, eigval_full = eig1(A, c=None, isMax=True, isSym=True)
        try:

            F_new, _, _ = eig1(L, C, isMax=False, isSym=True)
        except Exception as e:
            raise ImportError("eig1 (converted from eig1.m) not found or failed. "
                              "Please place eig1 in module eig1.py. "
                              f"Original error: {e}")

        # eig1 returns eigenvectors as columns (N x C)
        F = np.asarray(F_new, dtype=float)

        # ------ compute objective ------
        term1 = np.linalg.norm(M - W @ H, ord='fro') ** 2
        term2 = alpha * (np.linalg.norm(H - H @ S, ord='fro') ** 2)
        term3 = beta * (np.linalg.norm(S, ord='fro') ** 2)
        term4 = gamma * np.trace(F.T @ L @ F)
        obj_val = term1 + term2 + term3 + term4
        Obj.append(obj_val)

        if verbose:
            print(f"  Obj = {obj_val:.6e} (terms: {term1:.3e}, {term2:.3e}, {term3:.3e}, {term4:.3e})")

        # convergence check (relative change < 1e-2)
        if it > 0:
            rel_change = abs(Obj[-1] - Obj[-2]) / (abs(Obj[-2]) + 1e-12)
            if rel_change < 1e-2:
                if verbose:
                    print(f"[MCLES] Converged at iteration {it+1}, rel_change={rel_change:.4e}")
                break

    # final kmeans on F (rows are samples -> sklearn expects shape (n_samples, n_features))
    kmeans = KMeans(n_clusters=C, n_init=REPlic, max_iter=MAXiter, random_state=0)
    labels = kmeans.fit_predict(F)  # shape (N,)

    ACC, NMI, PUR = clustering_measure(gt, labels)
    result = np.array([ACC, NMI, PUR], dtype=float)
    return result




Overwriting MCLES.py


In [None]:
%%writefile run.py




from MCLES import MCLES

"""
Python port of run.m for MSRC-v1 experiment.

This script:
- loads MSRC-v1.mat (expects variables X and Y or similar),
- converts views to shape (d_v, N) as expected by MCLES,
- sets MCLES parameters,
- runs MCLES and prints the resulting [ACC, NMI, PUR].

Usage:
    python run_mccles.py

Make sure:
- MSRC-v1.mat is in the same directory or give full path to it.
- mccles.py and helper modules (update_p.py, update_s.py, eig1.py, etc.) are importable.
"""

import os
import numpy as np
from scipy.io import loadmat

# import your converted MCLES function
# from mccles import MCLES

def load_mat_views(mat_path: str):
    """
    Load a .mat file and extract X (list of views) and Y (ground-truth labels).
    This function tries a few common layouts produced by MATLAB .mat files.

    Returns:
        X_views: list of numpy arrays, each array shape (d_v, N)  -- matches expected MCLES input.
        Y: 1D numpy array of length N (integers)
    """
    data = loadmat(mat_path)
    # Try common keys
    if 'X' in data:
        X_mat = data['X']
        # MATLAB cell arrays are loaded as numpy object arrays by scipy.io.loadmat
        if isinstance(X_mat, np.ndarray) and X_mat.dtype == np.object_:
            # Flatten object array and extract arrays
            X_list = [np.asarray(v) for v in X_mat.flatten()]
        else:
            # If X is a numeric array, try to interpret: maybe shape (V, ...) or (d, N, V)
            # Try to detect 3D arrays with last dimension as views
            if X_mat.ndim == 3:
                # ph: (d, N, V) -> extract along third axis
                V = X_mat.shape[2]
                X_list = [X_mat[:, :, i] for i in range(V)]
            else:
                # fallback: wrap single view
                X_list = [np.asarray(X_mat)]
    else:
        # fallback keys some datasets use (try to detect feature sets)
        # e.g., 'gist', 'hog', 'lbp', 'sift' etc.
        candidates = ['gist', 'hog', 'lbp', 'SIFT', 'X']  # extend as needed
        X_list = []
        for k in candidates:
            if k in data:
                X_list.append(np.asarray(data[k]))
        if len(X_list) == 0:
            raise KeyError("Cannot find 'X' or known feature keys in the .mat file.")

    # Get ground truth labels
    if 'Y' in data:
        Y = np.asarray(data['Y']).squeeze()
    elif 'y' in data:
        Y = np.asarray(data['gt']).squeeze()
    elif 'labels' in data:
        Y = np.asarray(data['labels']).squeeze()
    else:
        # last resort: try to find a 1D integer array among variables
        possible = []
        for k, v in data.items():
            if k.startswith('__'):
                continue
            arr = np.asarray(v)
            if arr.ndim == 2 and 1 in arr.shape and arr.size > 0:
                possible.append((k, arr.squeeze()))
        if len(possible) >= 1:
            # pick the first candidate (this is a last-resort heuristic)
            print("Warning: ground truth not found with key 'Y'/'gt'. Using variable:", possible[0][0])
            Y = possible[0][1]
        else:
            raise KeyError("Ground-truth labels not found in the .mat file (keys tried: Y, gt, labels).")

    # MATLAB often stores each view as (dv x N) or (N x dv). In your run.m you transposed each view:
    #    temp = X{i}; X{i} = temp';
    # That suggests loaded X{i} was (dv, N?) and they wanted (N features?) However earlier code expects X{i} to be (dv, N) .
    # To match your earlier conversions we will follow the same step: transpose each view if its second dimension mismatches N.
    # Determine N by checking the length of Y.
    Y = np.asarray(Y)
    N = Y.size

    X_views = []
    for v_arr in X_list:
        arr = np.asarray(v_arr)
        # If arr shape is (N, d_v), transpose to (d_v, N)
        if arr.ndim == 2 and arr.shape[0] == N and arr.shape[1] != N:
            arr = arr.T
        # If arr shape is (d_v, N) already, keep
        # If ambiguous (square), leave as-is
        X_views.append(arr)

    # Verify consistent N across views
    for i, arr in enumerate(X_views):
        if arr.ndim != 2:
            raise ValueError(f"View {i} is not 2D array.")
        if arr.shape[1] != N:
            raise ValueError(f"View {i} has inconsistent number of samples: expected {N}, got {arr.shape[1]}")

    return X_views, Y

def main():
    # Path to the .mat file (adjust if needed)
    mat_file = 'MSRC-v1.mat'
    if not os.path.exists(mat_file):
        raise FileNotFoundError(f"{mat_file} not found in current directory ({os.getcwd()}). Place the file here or update the path.")

    # load views and labels
    X_views, gt = load_mat_views(mat_file)
    print(f"Loaded {len(X_views)} views, N = {gt.size}")

    # MCLES parameters (same as your run.m)
    maxIters = 30
    alpha = 0.8
    beta = 0.4
    d = 70
    gamma = 0.004

    # Run MCLES
    result = MCLES(X_views, alpha, beta, d, gamma, maxIters, gt, verbose=True)
    print("Result [ACC, NMI, PUR] =", result)

if __name__ == "__main__":
    main()


Overwriting run.py


In [None]:
!python run.py

Loaded 4 views, N = 210
[MCLES] Iter 1/30
  Obj = 1.351639e+02 (terms: 1.330e+02, 6.885e-01, 1.420e+00, 1.081e-02)
[MCLES] Iter 2/30
  Obj = 3.174836e+01 (terms: 2.740e+01, 2.680e+00, 1.658e+00, 1.197e-02)
[MCLES] Iter 3/30
  Obj = 2.290428e+01 (terms: 1.684e+01, 3.914e+00, 2.136e+00, 1.123e-02)
[MCLES] Iter 4/30
  Obj = 2.057246e+01 (terms: 1.349e+01, 4.640e+00, 2.432e+00, 1.076e-02)
[MCLES] Iter 5/30
  Obj = 1.950549e+01 (terms: 1.187e+01, 5.029e+00, 2.598e+00, 1.043e-02)
[MCLES] Iter 6/30
  Obj = 1.891264e+01 (terms: 1.087e+01, 5.302e+00, 2.732e+00, 1.012e-02)
[MCLES] Iter 7/30
  Obj = 1.847269e+01 (terms: 1.013e+01, 5.498e+00, 2.838e+00, 9.845e-03)
[MCLES] Iter 8/30
  Obj = 1.817083e+01 (terms: 9.585e+00, 5.648e+00, 2.928e+00, 9.605e-03)
[MCLES] Iter 9/30
  Obj = 1.795104e+01 (terms: 9.172e+00, 5.764e+00, 3.005e+00, 9.398e-03)
[MCLES] Iter 10/30
  Obj = 1.778925e+01 (terms: 8.850e+00, 5.859e+00, 3.071e+00, 9.218e-03)
[MCLES] Converged at iteration 10, rel_change=9.0129e-03
Result [