# Vector Quantization Codecs for Blosc2

This notebook demonstrates the implementation of custom Blosc2 codecs using:
- **Fuzzy C-Means (CMeans)** clustering
- **Archetypal Analysis (AA)**

The codecs are evaluated on the Olivetti Faces dataset using SSIM metrics.

## Setup

In [None]:
import os
import sys

# Add project root to path
project_root = os.path.abspath(os.path.join(os.getcwd(), '..'))
if project_root not in sys.path:
    sys.path.insert(0, project_root)

import numpy as np
import blosc2
import matplotlib.pyplot as plt

from models import CMeans, AA

## Toy Example: Prototype Visualization

First, we illustrate the difference between CMeans and AA prototypes using synthetic 2D data.

In [None]:
# Generate synthetic data
np.random.seed(42)
X = np.random.normal(size=(1_000, 2))
n_prototypes = 4

# Fit models
cmeans_model = CMeans(n_prototypes, m=2, max_iter=100, tol=1e-6)
cmeans_model.fit(X)

aa_model = AA(n_prototypes, n_init=1, max_iter=100, tol=1e-6)
aa_model.fit(X)

# Visualize
plt.figure(figsize=(8, 6))
plt.scatter(X[:, 0], X[:, 1], c='black', alpha=0.1, label='Data')
plt.scatter(cmeans_model.prototypes[:, 0], cmeans_model.prototypes[:, 1], 
            c='red', s=100, marker='o', label='CMeans')
plt.scatter(aa_model.prototypes[:, 0], aa_model.prototypes[:, 1], 
            c='blue', s=100, marker='s', label='AA')
plt.legend()
plt.title('Prototypes: CMeans (centroids) vs AA (archetypes)')
plt.xlabel('X1')
plt.ylabel('X2')
plt.show()

## Olivetti Faces Dataset

Load and preprocess the Olivetti Faces dataset (400 images, 64x64 pixels each).

In [None]:
from sklearn.datasets import fetch_olivetti_faces
from sklearn.feature_extraction import image
from sklearn.preprocessing import StandardScaler

# Load data
faces = fetch_olivetti_faces(shuffle=True, random_state=0)
data = faces.images
n_samples = data.shape[0]

print(f"Dataset shape: {data.shape}")
print(f"Number of images: {n_samples}")

In [None]:
# Extract patches for training
block_size = 4

patcher = image.PatchExtractor(
    patch_size=(block_size, block_size), 
    max_patches=n_samples, 
    random_state=0
)
tiled_data = patcher.fit_transform(data)

# Select a random subset for training
n_samples_subset = 1_000
np.random.seed(0)
idx = np.random.choice(tiled_data.shape[0], n_samples_subset, replace=False)
X = tiled_data[idx].reshape(n_samples_subset, -1)

# Standardize features
scaler = StandardScaler()
X_pre = scaler.fit_transform(X)

print(f"Training samples: {X_pre.shape[0]}")
print(f"Features per sample: {X_pre.shape[1]}")

## Train Models

Train CMeans and AA models to learn the codebook (prototypes).

In [None]:
n_prototypes = 36

# Train CMeans
cmeans_model = CMeans(
    n_prototypes, 
    m=2, 
    max_iter=100, 
    tol=1e-6, 
    metric='cosine', 
    random_state=0
)
cmeans_model.fit(X_pre)
print(f"CMeans trained with {n_prototypes} centroids")

# Train AA
aa_model = AA(
    n_prototypes, 
    n_init=1, 
    max_iter=100, 
    tol=1e-6, 
    random_state=0
)
aa_model.fit(X_pre)
print(f"AA trained with {n_prototypes} archetypes")

## Define Custom Blosc2 Codec

Implement encoder and decoder functions for the custom codec.

In [None]:
import lz4.frame
import pickle


def encoder(input, output, meta, schunk: blosc2.SChunk):
    """Encode image blocks using trained model prototypes."""
    # Determine data type
    if schunk.typesize == 8:
        dtype = np.dtype(np.float64)
    elif schunk.typesize == 4:
        dtype = np.dtype(np.float32)
    elif schunk.typesize == 2:
        dtype = np.dtype(np.float16)
    else:
        raise ValueError('Unsupported data type')
    
    itemsize = dtype.itemsize
    block_size = int(np.sqrt(schunk.blocksize // itemsize))
    shape = (block_size, block_size)
    img = input[:schunk.blocksize].view(dtype).reshape(shape)

    # Load models from metadata
    scaler_loaded = pickle.loads(schunk.meta['scaler'])
    model = pickle.loads(schunk.meta['model'])
    tile_size = int(np.sqrt(scaler_loaded.n_features_in_))

    # Tile the input into blocks
    img_tiled = (
        img.reshape(
            block_size // tile_size,
            tile_size,
            block_size // tile_size,
            tile_size
        )
        .transpose(0, 2, 1, 3)
        .reshape(-1, tile_size * tile_size)
    )

    # Transform and quantize weights
    img_pre = scaler_loaded.transform(img_tiled)
    weights = model.transform(img_pre)
    weights = np.digitize(weights, np.linspace(0, 1, 256)).astype(np.uint8)

    # Compress with LZ4
    output_compressed = lz4.frame.compress(pickle.dumps(weights))
    output_compressed_size = len(output_compressed)
    output[0:output_compressed_size] = np.frombuffer(output_compressed, dtype=np.uint8)

    return output_compressed_size


def decoder(input, output, meta, schunk: blosc2.SChunk):
    """Decode image blocks using trained model prototypes."""
    # Determine data type
    if schunk.typesize == 8:
        dtype = np.dtype(np.float64)
    elif schunk.typesize == 4:
        dtype = np.dtype(np.float32)
    elif schunk.typesize == 2:
        dtype = np.dtype(np.float16)
    else:
        raise ValueError('Unsupported data type')
    
    itemsize = dtype.itemsize

    # Decompress weights
    weights = pickle.loads(lz4.frame.decompress(input)) / 255.

    # Load models from metadata
    scaler_loaded = pickle.loads(schunk.meta['scaler'])
    model = pickle.loads(schunk.meta['model'])
    tile_size = int(np.sqrt(scaler_loaded.n_features_in_))

    # Reconstruct image from prototypes
    prototypes = scaler_loaded.inverse_transform(model.prototypes)
    img_reconstructed = weights @ prototypes

    # Untile the image
    block_size = int(np.sqrt(schunk.blocksize // itemsize))
    shape = (block_size, block_size)
    img_reconstructed = (
        img_reconstructed
        .reshape(block_size // tile_size, block_size // tile_size, tile_size, tile_size)
        .transpose(0, 2, 1, 3)
        .reshape(shape)
    )

    if img_reconstructed.dtype != dtype:
        img_reconstructed = img_reconstructed.astype(dtype)

    output[:schunk.blocksize] = img_reconstructed.view(np.uint8).ravel()
    return schunk.blocksize

In [None]:
# Register the custom codec (skip if already registered)
codec_id = 185

if codec_id not in blosc2.ucodecs_registry:
    blosc2.register_codec('VQ', codec_id, encoder, decoder)
    print(f"Registered codec with ID {codec_id}")
else:
    print(f"Codec ID {codec_id} already registered")

## Compress Images

Apply the custom codecs to compress the Olivetti Faces dataset.

In [None]:
chunks = (1, 64, 64)
blocks = (1, 64, 64)

cparams = {
    'codec': codec_id,
    'filters': [],
    'splitmode': blosc2.SplitMode.NEVER_SPLIT,
    'nthreads': 1,
}

dparams = {
    'nthreads': 1,
}

# Compress with CMeans
meta_cmeans = {
    'scaler': pickle.dumps(scaler),
    'model': pickle.dumps(cmeans_model),
}
compressed_cmeans = blosc2.asarray(
    faces.images.copy(), 
    chunks=chunks, 
    blocks=blocks, 
    meta=meta_cmeans, 
    cparams=cparams, 
    dparams=dparams
)

# Compress with AA
meta_aa = {
    'scaler': pickle.dumps(scaler),
    'model': pickle.dumps(aa_model),
}
compressed_aa = blosc2.asarray(
    faces.images.copy(), 
    chunks=chunks, 
    blocks=blocks, 
    meta=meta_aa, 
    cparams=cparams, 
    dparams=dparams
)

print(f"CMeans compression ratio: {compressed_cmeans.schunk.cratio:.2f}x")
print(f"AA compression ratio: {compressed_aa.schunk.cratio:.2f}x")

## Results Visualization

Compare original images with reconstructions from both codecs.

In [None]:
fig, ax = plt.subplots(3, 10, figsize=(15, 4))

np.random.seed(2)
idx = np.random.choice(n_samples, 10)

for i in range(10):
    ax[0, i].imshow(data[idx[i]], cmap='gray', vmin=0, vmax=1)
    ax[1, i].imshow(np.clip(compressed_aa[idx[i]], 0, 1), cmap='gray', vmin=0, vmax=1)
    ax[2, i].imshow(np.clip(compressed_cmeans[idx[i]], 0, 1), cmap='gray', vmin=0, vmax=1)
    
    for row in range(3):
        ax[row, i].set_xticks([])
        ax[row, i].set_yticks([])

ax[0, 0].set_ylabel('Original')
ax[1, 0].set_ylabel('AA')
ax[2, 0].set_ylabel('CMeans')

plt.tight_layout()
plt.savefig('compression_faces.pdf')
plt.show()

## SSIM Evaluation

Compute Structural Similarity Index (SSIM) for all reconstructed images.

In [None]:
from skimage.metrics import structural_similarity as ssim
import pandas as pd

# Compute SSIM for each image
cmeans_ssim = [
    ssim(np.array(orig), np.array(recon), data_range=1)
    for orig, recon in zip(data, compressed_cmeans)
]

aa_ssim = [
    ssim(np.array(orig), np.array(recon), data_range=1)
    for orig, recon in zip(data, compressed_aa)
]

# Summary statistics
summary = pd.DataFrame({
    'CMeans': {
        'Min': np.min(cmeans_ssim),
        'P25': np.percentile(cmeans_ssim, 25),
        'P75': np.percentile(cmeans_ssim, 75),
        'Max': np.max(cmeans_ssim),
        'Mean': np.mean(cmeans_ssim),
        'Std': np.std(cmeans_ssim),
    },
    'AA': {
        'Min': np.min(aa_ssim),
        'P25': np.percentile(aa_ssim, 25),
        'P75': np.percentile(aa_ssim, 75),
        'Max': np.max(aa_ssim),
        'Mean': np.mean(aa_ssim),
        'Std': np.std(aa_ssim),
    }
}).T

print("SSIM Summary Statistics:")
print(summary.round(2))

In [None]:
# LaTeX table for paper
print("\nLaTeX table:")
print(summary.round(2).to_latex())

In [None]:
print(f"\nCompression Ratios:")
print(f"  CMeans: {compressed_cmeans.schunk.cratio:.2f}x")
print(f"  AA: {compressed_aa.schunk.cratio:.2f}x")