## COMP5328 - Advanced Machine Learning
## Assignment 1: Non-negative Matrix Factorization
----------------------------------------------------------------------------------------

**(Semester 2, 2025)**

In this ipython notebook, we provide some example code for assignment1.
+ Load Data.
    - ORL dataset. 
    - Extended YaleB dataset. 
    - AR dataset (**optional**).
+ Perform Evaluation. 
   - Relative Reconstruction Errors.
   - Accuracy, NMI (**optional**).

Lecturer: Tongliang Liu.

**Note: All datasets can be used only for this assignment and you are not allowed to distribute these datasets. If you want to use AR dataset, you need to apply it by yourself (we do not provide AR dataset due to the problem of license, please find more details in http://www2.ece.ohio-state.edu/~aleix/ARdatabase.html).**

## 1. Load Dataset

### 1.0 Data Folder

In [6]:
from typing import List, Tuple
import os
from PIL import Image
import numpy as np
import pandas as pd
from typing import Optional
from collections import Counter
from sklearn.cluster import KMeans
from sklearn.metrics import accuracy_score
from sklearn.metrics import normalized_mutual_info_score


In [3]:
# The structure of data folder.
!ls -l data

'ls' �����ڲ����ⲿ���Ҳ���ǿ����еĳ���
���������ļ���


### 1.1 Load ORL Dataset and Extended YaleB Dataset.
+ ORL dataset contains ten different images of each of 40 distinct subjects. For some subjects, the images were taken at different times, varying the lighting, facial expressions (open / closed eyes, smiling / not smiling) and facial details (glasses / no glasses). All the images were taken against a dark homogeneous background with the subjects in an upright, frontal position (with tolerance for some side movement). The size of each image is 92x112 pixels, with 256 grey levels per pixel. To further reduce the computation complexity, you can resize all images to 30x37 pixels.

+ Extended YaleB dataset contains 2414 images of 38 human subjects under 9 poses and 64 illumination conditions. All images are manually aligned, cropped, and then resized to 168x192 pixels. To further reduce the computation complexity, you can resize all images to 42x48 pixels.

In [1]:
def load_data(root='data/CroppedYaleB', reduce=4):
    """ 
    Load ORL (or Extended YaleB) dataset to numpy array.
    
    Args:
        root: path to dataset.
        reduce: scale factor for zooming out images.
        
    """ 
    images, labels = [], []

    for i, person in enumerate(sorted(os.listdir(root))):
        
        if not os.path.isdir(os.path.join(root, person)):
            continue
        
        for fname in os.listdir(os.path.join(root, person)):    
            
            # Remove background images in Extended YaleB dataset.
            if fname.endswith('Ambient.pgm'):
                continue
            
            if not fname.endswith('.pgm'):
                continue
                
            # load image.
            img = Image.open(os.path.join(root, person, fname))
            img = img.convert('L') # grey image.

            # reduce computation complexity.
            img = img.resize([s//reduce for s in img.size])

            # TODO: preprocessing.

            # convert image to numpy array.
            img = np.asarray(img).reshape((-1,1))

            # collect data and label.
            images.append(img)
            labels.append(i)

    # concate all images and labels.
    images = np.concatenate(images, axis=1)
    labels = np.array(labels)

    return images, labels

In [2]:
# Load ORL dataset.
X, Y = load_data(root='data/ORL', reduce=2)
print('ORL dataset: X.shape = {}, Y.shape = {}'.format(X.shape, Y.shape))

# Load Extended YaleB dataset.
X, Y = load_data(root='data/CroppedYaleB', reduce=4)
print('Extended YalB dataset: X.shape = {}, Y.shape = {}'.format(X.shape, Y.shape))

ORL dataset: X.shape = (2576, 400), Y.shape = (400,)
Extended YalB dataset: X.shape = (2016, 2414), Y.shape = (2414,)


### 1.2 Load AR Dataset (Optional) 
AR dataset contains 2600 images of 100 individuals (50 male and 50 female). All images have been cropped and resized to 120x165 pixels. To further reduce the computation complexity, you can resize all images to 40x55 pixels.

In [4]:
def load_data_AR(root='data/CroppedAR', reduce=3):
    """ 
    Load AR dataset to numpy array.
    
    Args:
        root: path to AR dataset.
        reduce: scale factor for zooming out images.
        
    """ 
    images, labels = [], []
    
    for fname in os.listdir(root):
        
        if not fname.endswith('.bmp'):
            continue
        
        # get label.
        label = int(fname[2:5])
        if fname[0] == 'W': # start from 50
            label += 50
        
        # load image.
        img = Image.open(os.path.join(root, fname))
        img = img.convert('L') # grey
        
        # reduce computation complexity.
        img = img.resize([s//reduce for s in img.size])
   
        # TODO: preprocessing.
        
        # convert image to numpy array.
        img = np.asarray(img).reshape((-1,1))
        
        # collect data and label.
        images.append(img)
        labels.append(label)
        
    # concate all images and labels.
    images = np.concatenate(images, axis=1)
    labels = np.array(labels)
    
    return images, labels

In [None]:
# ============ 2) Helpers ============

def normalize_columns_01(X: np.ndarray) -> np.ndarray:
    """按列缩放到[0,1]；如需自定义归一化策略，这里实现。"""
    pass

def guess_hw_from_d(d: int) -> tuple[int, int]:
    """根据向量维度 d 估计图像 (h, w)。若已知尺寸可直接跳过。"""
    pass


In [5]:
# X, Y = load_data_AR(root='data/CroppedAR', reduce=3)
# print('AR dataset: X.shape = {}, Y.shape = {}'.format(X.shape, Y.shape))

In [None]:
def salt_pepper(V: np.ndarray, p: float = 0.2, r: float = 0.5, seed: Optional[int] = None
               ) -> Tuple[np.ndarray, np.ndarray]:
    """
    对列为样本的矩阵 V(d×n) 施加椒盐噪声。
    p: 被替换像素比例（0~1）
    r: 被替换为“盐”(最大值)的比例（其余为“椒”(最小值)）
    返回: (V_noisy, noise_mask)，noise_mask=1 表示该像素被替换
    """
    V = np.asarray(V, dtype=float)
    d, n = V.shape
    rng = np.random.default_rng(seed)
    vmin = float(np.min(V))
    vmax = float(np.max(V))

    U = rng.random((d, n))
    noise_idx = U < p
    salt_mask = rng.random((d, n)) < r


    V_noisy = V.copy()
    V_noisy[noise_idx & salt_mask]  = vmax
    V_noisy[noise_idx & ~salt_mask] = vmin
    return V_noisy, noise_idx.astype(float)

---------------------------


## 2. Evaluation Metrics


### 2.1 Relative Reconstruction Errors (RRE)

To compare the robustness of different NMF algorithms, you can use the ```relative reconstruction errors```. Let $V$ denote the contaminated dataset (by adding noise), and $\hat{V}$
 denote the clean dataset. Let $W$ and $H$ denote the factorization results on $V$, the ``relative reconstruction errors`` then can be defined as follows:
 \begin{equation}
    RRE = \frac{ \| \hat{V} - WH \|_F }{ \| \hat{V} \|_F}.
\end{equation}


In [6]:
# Load dataset.
print('==> Load ORL dataset ...')
V_hat, Y_hat = load_data('data/ORL', reduce=3)
print('V_hat.shape={}, Y_hat.shape={}'.format(V_hat.shape, Y_hat.shape))

# Add Noise.
V_noise = np.random.rand(*V_hat.shape) * 40
V = V_hat + V_noise

# Plot result.
import matplotlib.pyplot as plt
img_size = [i//3 for i in (92, 112)] # ORL
ind = 2 # index of demo image.
plt.figure(figsize=(10,3))
plt.subplot(131)
plt.imshow(V_hat[:,ind].reshape(img_size[1],img_size[0]), cmap=plt.cm.gray)
plt.title('Image(Original)')
plt.subplot(132)
plt.imshow(V_noise[:,ind].reshape(img_size[1],img_size[0]), cmap=plt.cm.gray)
plt.title('Noise')
plt.subplot(133)
plt.imshow(V[:,ind].reshape(img_size[1],img_size[0]), cmap=plt.cm.gray)
plt.title('Image(Noise)')
plt.show()

# TODO: you should implement NMF algorithms by yourself.
print('==> Apply NMF ...')
from sklearn.decomposition import NMF
model = NMF(n_components=len(set(Y_hat))) # set n_components to num_classes.
W = model.fit_transform(V)
H = model.components_
print('W.shape={}, H.shape={}'.format(W.shape, H.shape))

# Evaluate relative reconstruction errors.
print('==> Evaluate RRE ...')
RRE = np.linalg.norm(V_hat - W.dot(H)) / np.linalg.norm(V_hat)
print('RRE = {}'.format(RRE))

==> Load ORL dataset ...
V_hat.shape=(1110, 400), Y_hat.shape=(400,)


<Figure size 1000x300 with 3 Axes>

==> Apply NMF ...
W.shape=(1110, 40), H.shape=(40, 400)
==> Evaluate RRE ...
RRE = 0.22688415691467015


### 2.2 Evaluate Clustering Performance

1. Accuracy.
    
    $$ Acc(Y, Y_{pred}) = \frac{1}{n}\sum\limits_{i=1}^n 1\{Y_{pred}(i) == Y(i)\}$$
        
2. Normalized Mutual Information (NMI).

    $$ NMI(Y, Y_{pred}) = \frac{2 * I(Y, Y_{pred})}{H(Y) + H(Y_{pred})} $$
    
   where $ I(\cdot,\cdot) $ is mutual information and $ H(\cdot) $ is entropy.

In [7]:
def assign_cluster_label(X, Y):
    kmeans = KMeans(n_clusters=len(set(Y))).fit(X)
    Y_pred = np.zeros(Y.shape)
    for i in set(kmeans.labels_):
        ind = kmeans.labels_ == i
        Y_pred[ind] = Counter(Y[ind]).most_common(1)[0][0] # assign label.
    return Y_pred

print('==> Evaluate Acc and NMI ...')

# Assign cluster labels.
Y_pred = assign_cluster_label(H.T, Y_hat)

acc = accuracy_score(Y_hat, Y_pred)
nmi = normalized_mutual_info_score(Y_hat, Y_pred)
print('Acc(NMI) = {:.4f} ({:.4f})'.format(acc, nmi))


==> Evaluate Acc and NMI ...
Acc(NMI) = 0.5875 (0.7431)


# 3. NMF core algorithm


## 3.1 NMF L2 core algorithm
Here is the process of L2 core algorithm

In [None]:
def NMFL2(X: np.ndarray, k: int, max_iter: int = 300, tol: float = 1e-5, seed: int = 0) -> Tuple[np.ndarray, np.ndarray, List[float]]:
    """
    L2-NMF（你的原始乘法更新）:
      返回 (W, H, error_curve)
    【内容保留与之前相同】
    """
    V = np.mat(np.maximum(np.asarray(X, dtype=float), 0.0))  # ensure nonnegative, matrix type
    m, n = V.shape
    r = int(k)
    rng = np.random.default_rng(seed)
    W = np.mat(rng.random((m, r)))
    H = np.mat(rng.random((r, n)))
    errs: List[float] = []
    for _ in range(int(max_iter)):
        E = V - W * H
        err = float(np.sum(np.multiply(E, E)))
        errs.append(err)
        if err < tol:
            break
        a = W.T * V
        b = W.T * W * H
        for i in range(r):
            for j in range(n):
                if b[i, j] != 0:
                    H[i, j] = H[i, j] * a[i, j] / b[i, j]
        c = V * H.T
        d = W * H * H.T
        for i in range(m):
            for j in range(r):
                if d[i, j] != 0:
                    W[i, j] = W[i, j] * c[i, j] / d[i, j]
    return np.asarray(W), np.asarray(H), errs
# https://www.cnblogs.com/zhibei/p/9373120.html

## 3.2 NMF L1 Algorithm
Here is the realization of L1 Algorithm

In [None]:
# Some code
def NMFL1_IRLS(X: np.ndarray, k: int, max_iter: int = 100, inner_iters: int = 3, eps: float = 1e-8, tol: float = 1e-4, seed: int = 0):
    """
    L1-NMF（IRLS 框架）占位：
      需要时把 IRLS 的加权乘法更新写进来，返回 (W, H, mae_curve)。
    """
    pass

# 4. Main
Here is citation of above functions

In [None]:
# ===== Config =====
DATASET_ROOT = "data/ORL"           # 或 "data/CroppedYaleB"
DATASET_TYPE = "ORL"                # "ORL" 或 "YaleB"（用于推 h,w）
REDUCE       = 3 if DATASET_TYPE=="ORL" else 4
USE_NOISE    = True                 # 是否加椒盐噪声
P_NOISE      = 0.20                 # 被替换像素比例 p
R_SALT       = 0.50                 # 盐(最大值)比例 r
K            = None                 # 分解维度；None 则设为类别数
MAX_ITER_L2  = 300
TOL_L2       = 1e-5
RUN_L1       = True                 # 是否跑 L1（若你已实现 NMFL1_IRLS）

# ===== 1) Load dataset (列为样本 d×n) =====
try:
    X_clean, Y = load_data(root=DATASET_ROOT, reduce=REDUCE)
except NameError:
    # 如果你函数名是 load_data
    X_clean, Y = load_data(root=DATASET_ROOT, reduce=REDUCE)
print("X_clean:", X_clean.shape, "Y:", Y.shape)

# 推断图像尺寸（按数据集与 reduce）
if DATASET_TYPE.upper() == "ORL":
    # ORL 原始 112×92
    h, w = 112//REDUCE, 92//REDUCE
else:
    # YaleB 原图规格不完全一致，这里用 d 的平方根近似；不对就自己改成固定 h,w
    d = X_clean.shape[0]
    hw = int(np.sqrt(d))
    if hw*hw == d:
        h = w = hw
    else:
        h = int(np.floor(np.sqrt(d)))
        w = int(np.ceil(d / h))

# ===== 2) （可选）加椒盐噪声 =====
if USE_NOISE:
    X, M = salt_pepper(X_clean, p=P_NOISE, r=R_SALT, seed=0)
    # 可视化一张：原图 / 噪声mask / 加噪
    try:
        viz_triplet(X_clean, X, M, h, w, idx=2, suptitle=f"Salt-Pepper p={P_NOISE}, r={R_SALT}")
    except Exception as e:
        print("viz_triplet skipped:", e)
else:
    X = X_clean

# ===== 3) Run NMFs =====
if K is None:
    K = len(np.unique(Y)) if Y.size else 40

# L2（你的手写实现）
W2, H2, err2 = NMFL2(X, k=K, max_iter=MAX_ITER_L2, tol=TOL_L2, seed=0)
print(f"[L2] W:{W2.shape} H:{H2.shape} iters:{len(err2)}  RRE(clean):",
      np.linalg.norm(X_clean - W2 @ H2) / (np.linalg.norm(X_clean) + 1e-12))

# （可选）L1（如果你已实现 NMFL1_IRLS）
if RUN_L1:
    W1, H1, err1 = NMFL1_IRLS(X, k=K, max_iter=100, inner_iters=3, seed=0)
    print(f"[L1] W:{W1.shape} H:{H1.shape} iters:{len(err1)}  RRE(clean):",
          np.linalg.norm(X_clean - W1 @ H1) / (np.linalg.norm(X_clean) + 1e-12))

# ===== 4) （可选）聚类评测 =====
if Y.size:
    Yp2 = cluster_assign_from_H(H2, Y)
    acc2, nmi2 = acc_nmi(Y, Yp2)
    print(f"[L2] ACC={acc2:.4f} NMI={nmi2:.4f}")
    if RUN_L1:
        Yp1 = cluster_assign_from_H(H1, Y)
        acc1, nmi1 = acc_nmi(Y, Yp1)
        print(f"[L1] ACC={acc1:.4f} NMI={nmi1:.4f}")

# ===== 5) 可视化（字典 & 重构）=====
try:
    show_dictionary(W2, h, w, cols=8, title=f"L2 Dictionary (k={K})")
    show_reconstruction(X_clean, W2, H2, h, w, idx=0)
except Exception as e:
    print("visualization skipped:", e)

