In [79]:
import os, re, random, struct, zlib
import numpy as np

# 0. Data Preprocessing

In [80]:
def read(path):
    with open(path, "rb") as f:
        data = f.read()
    sig = b'\x89PNG\r\n\x1a\n'
    if not data.startswith(sig):
        raise ValueError("Not a PNG: "+path)
    

    # 청크 파싱
    i = 8
    width = height = bit_depth = color_type = comp = filt = interlace  = None
    idat = bytearray()

    while i < len(data):
        L = int.from_bytes(data[i:i + 4], "big")
        c_type = data[i + 4: i+ 8]
        chunk = data[i + 8: i + 8 + L]
        i += 12 + L #length + type + data + CRC

        if c_type == b'IHDR':
            width, height, bit_depth, color_type, comp, filt, interlace = struct.unpack(">IIBBBBB", chunk)
        elif c_type == b'IDAT':
            idat.extend(chunk)
        elif c_type == b'IEND':
            break
        else:
            #보조청크 무시
            pass


    # 스펙 제약 검사, 전제 위반시 에러
    if width is None or height is None:
        raise ValueError("Missing IHDR")
        

    
    # 압축해제(여러 IDAT를 zlib으로 한 번에 풀기)
    raw = zlib.decompress(bytes(idat))
    
    # 스캔라인 디필터링(원본바이트복원) - grey 8bit -> bpp = 1
    stride = width
    out = np.empty((height, stride), dtype = np.uint8)
    pos = 0
    prev = np.zeros(stride, dtype=np.uint8) # 윗줄

    for row in range(height):
        if pos >= len(raw):
            raise ValueError("Unexpected end of IDAT stream")
        ftype = raw[pos]
        pos += 1
        if pos + stride > len(raw):
            raise ValueError("Truncated scanline")
        scan = np.frombuffer(raw, dtype=np.uint8, count=stride, offset=pos)
        pos += stride


        recon = np.empty(stride, dtype = np.uint8)
        if ftype == 0: #None
            recon[:] = scan
        elif ftype == 1: #Sub
            recon[0] = scan[0]
            for x in range(1, stride):
                recon[x] = (scan[x] + recon[x - 1]) & 0xFF

        elif ftype == 2: #Up
            recon[:] = (scan + prev) & 0xFF
        elif ftype == 3: #Average
            for x in range(stride):
                left = recon[x - 1] if x > 0 else 0
                up = prev[x]
                recon[x] = (scan[x] + ((left + up) >> 1)) & 0xFF
        elif ftype == 4:
                left = recon[x-1] if x > 0 else 0
                up = prev[x]
                up_left = prev[x - 1] if x > 0 else 0
                p = left + up - up_left
                pa = abs(p - left)
                pb = abs(p - up)
                pc = abs(p - up_left)
                pred = left if pa <= pb and pa <= pc else (up if pb <= pc else up_left)
                recon[x] = (scan[x] + pred) & 0xFF

        else:
            raise ValueError(f"Unknown PNG filter type {ftype}") 

        out[row] = recon
        prev = recon

    return out.astype(np.float32) / 255.0

# 1. Data Refining

In [81]:
def load_png(root_dir, input_size=(112, 92)):
    X_list, y_list = [], []
    size = None
    for name in sorted(os.listdir(root_dir)):
        m = re.fullmatch(r"(\d+)_(\d+)\.png", name)
        if not m:
            continue
        person = int(m.group(1))
        img = read(os.path.join(root_dir, name))
        if size is None:
            size = img.shape       # (height, width)
        if input_size is not None and img.shape != input_size:
            msg = f"Unexpected image shape {img.shape} for {name}, input {input_size}"
        
        X_list.append(img.reshape(-1))
        y_list.append(person - 1)
    if len(X_list) == 0:
        raise ValueError("No x_y.png files found")
    X = np.stack(X_list).astype(np.float32)        # (N, D)
    y = np.asarray(y_list, dtype=np.int64)

    return X, y, size


In [82]:
def downscale(X, size):
    height, width = size
    if (height & 2) or (width % 2):
        raise ValueError("Image size must be even")
    imgs = X.reshape(-1, height, width)
    imgs2 = imgs.reshape(imgs.shape[0], height // 2, 2, width//2, 2).mean(axis=(2, 4))

    return imgs2.reshape(imgs.shape[0], -1).astype(np.float32), (height//2, width//2)


In [83]:
def stratified_5_fold(y, seed=42):
    rng = np.random.RandomState(seed)
    classes = np.unique(y)
    per_class = {c: rng.permutation(np.where(y==c)[0]) for c in classes}
    folds = []
    per_class_chunk = {}
    for c in classes:
        idx = np.where(y == c)[0]
        if len(idx) < 5:
            raise ValueError(f"class {c} has only {len(idx)} samples (<{5})")
        idx = rng.permutation(idx)
        per_class_chunk[c] = np.array_split(idx, 5)
    for k in range(5):
        test_idx = np.concatenate([per_class[c][2*k:2*(k+1)]for c in classes])
        train_mask = np.ones(len(y), dtype=bool)
        train_mask[test_idx] = False
        train_idx = np.where(train_mask)[0]
        folds.append((train_idx, test_idx))
    return folds

# 2. 1-NN & PCA

In [84]:
def one_nn(X_train, y_train, X_test):
    train_norm = np.sum(X_train * X_train, axis = 1)
    test_norm = np.sum(X_test * X_test, axis=1, keepdims=True)
    d2 = test_norm + train_norm[None, :] - 2.0 * (X_test @ X_train.T)
    return y_train[np.argmin(d2, axis=1)]

In [85]:
def pca_fit(X_train, n_pca_axis = 100):
    mu = X_train.mean(axis = 0, dtype=np.float64).astype(np.float32)
    Xc = (X_train - mu).astype(np.float32, copy = False)
    # SVD
    U, S, Vt = np.linalg.svd(Xc, full_matrices=False)
    comps = Vt[:n_pca_axis].astype(np.float32) # (k, D)
    return mu, comps

In [86]:
def pca_transform(X, mu, comps):
    return (X - mu) @ comps.T  # (N, k)

# 3. Pipeline

In [87]:
def run_1nn(X, y, folds):
    accs = []
    for i, (tr, te) in enumerate(folds, 1):
        yhat= one_nn(X[tr], y[tr], X[te])
        accuracy = (yhat == y[te]).mean()
        print(f"[Task1][Fold {i}] 1-NN accuracy = {accuracy:.4f}")
        accs.append(accuracy)
    print(f"[Task1] Mean accuracy: {np.mean(accs):.4f}")
    return accs

In [88]:
def run_pca(X, y, folds, k = 100, title = "Task2"):
    accs = []
    for i, (tr, te) in enumerate(folds, 1):
        mu, comps = pca_fit(X[tr], n_pca_axis = k)
        Z_train = pca_transform(X[tr], mu, comps)
        Z_test = pca_transform(X[te], mu, comps)
        yhat = one_nn(Z_train, y[tr], Z_test)
        accuracy = (yhat == y[te]).mean()
        print(f"[{title}][Fold {i}] PCA ({k}) + 1NN accuracy = {accuracy:.4f}")
        accs.append(accuracy)
    print(f"[{title}] Mean accuracy: {np.mean(accs):.4f}")
    return accs

In [89]:
if __name__ == "__main__":
    image_dir = "/Users/gwaec/umd_lectures_projects/cmsc422/hw3/ATT"

    X, y, size = load_png(image_dir, input_size = (112, 92))
    print(f"Loaded: X = {X.shape}, y = {y.shape}, size = {size} ")
    
    folds = stratified_5_fold(y, seed = 42)

    _ = run_1nn(X, y, folds)
    _ = run_pca(X, y, folds, k = 100, title = "Task2")

    X_small, size_small = downscale(X, size)
    print(f"Resized: X_small = {X_small.shape}, size = {size_small}")
    _ = run_pca(X_small, y, folds, k = 100, title = "Task3_resized")

Loaded: X = (400, 10304), y = (400,), size = (112, 92) 
[Task1][Fold 1] 1-NN accuracy = 0.9625
[Task1][Fold 2] 1-NN accuracy = 0.9625
[Task1][Fold 3] 1-NN accuracy = 0.9875
[Task1][Fold 4] 1-NN accuracy = 0.9875
[Task1][Fold 5] 1-NN accuracy = 0.9625
[Task1] Mean accuracy: 0.9725
[Task2][Fold 1] PCA (100) + 1NN accuracy = 0.9750
[Task2][Fold 2] PCA (100) + 1NN accuracy = 0.9750
[Task2][Fold 3] PCA (100) + 1NN accuracy = 0.9875
[Task2][Fold 4] PCA (100) + 1NN accuracy = 0.9875
[Task2][Fold 5] PCA (100) + 1NN accuracy = 0.9625
[Task2] Mean accuracy: 0.9775
Resized: X_small = (400, 2576), size = (56, 46)
[Task3_resized][Fold 1] PCA (100) + 1NN accuracy = 0.9750
[Task3_resized][Fold 2] PCA (100) + 1NN accuracy = 0.9750
[Task3_resized][Fold 3] PCA (100) + 1NN accuracy = 0.9875
[Task3_resized][Fold 4] PCA (100) + 1NN accuracy = 0.9875
[Task3_resized][Fold 5] PCA (100) + 1NN accuracy = 0.9625
[Task3_resized] Mean accuracy: 0.9775
