In [1]:
!which python

In [2]:
# %%
# ## 0.  Install & Imports
# (Uncomment if running fresh environment)
# !pip install tensorflow==2.19.0 scikit-learn opencv-python matplotlib pandas

import os, re, glob, cv2, gc
import numpy as np, pandas as pd, tensorflow as tf
from pathlib import Path
from sklearn.decomposition import PCA
from sklearn.metrics import mean_squared_error
from tensorflow import keras
from keras import layers



2025-06-26 17:30:31.506260: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2025-06-26 17:30:31.573971: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2025-06-26 17:30:31.576403: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [3]:
# %%
# ## 1.  Configuration

# Paths
DATA_DIR        = Path("data")             # expects subfolders of .jpg/.csv
OUTPUT_DIR      = Path("outputs/notebook") 
OUTPUT_DIR.mkdir(exist_ok=True, parents=True)

# Image & PCA
IMG_H = IMG_W = 64
PCA_NCOMP = 20

# Training hyperparameters
BATCH_SIZE      = 64
EPOCHS          = 5
BASE_LR         = 3e-4
PATIENCE        = 50

# ViT architecture
PATCH_SIZE      = 8
VIT_NUM_HEADS   = 6
VIT_LAYERS      = 6
MLP_DIM         = 128

# Random seed for reproducibility
SEED = 42123
np.random.seed(SEED)
tf.random.set_seed(SEED)



In [4]:
# %%
# ## 2.  Helper Functions

def natural_sort(filelist):
    """Sort file paths in human order."""
    convert = lambda text: int(text) if text.isdigit() else text.lower()
    alphanum_key = lambda key: [ convert(c) for c in re.split('([0-9]+)', str(key)) ]
    return sorted(filelist, key=alphanum_key)

def collect_images():
    jpgs = list(DATA_DIR.glob("*/*.jpg"))
    return natural_sort(jpgs)

def split_files(jpg_list):
    """Deterministic split: train/val/test."""
    n = len(jpg_list)
    n_test = int(0.02 * n)
    n_val  = int(0.10 * n)
    test   = jpg_list[:n_test]
    val    = jpg_list[n_test:n_test+n_val]
    train  = jpg_list[n_test+n_val:]
    return train, val, test

def load_spectrum(path):
    """Read .csv, skip first 5 lines, return float32 spectrum."""
    arr = pd.read_csv(path, skiprows=range(5), names=["f","v"])["v"].to_numpy(dtype="float32")
    return arr

def fit_pca(train_paths):
    # gather only spectra of consistent length
    spectra_list = []
    expected_len = None
    for p in train_paths:
        arr = load_spectrum(p.with_suffix(".csv"))
        if expected_len is None:
            expected_len = arr.shape[0]
        if arr.shape[0] != expected_len:
            print(f"Skipping {p.name}: length {arr.shape[0]} (expected {expected_len})")
            continue
        spectra_list.append(arr)
    if not spectra_list:
        raise ValueError("No valid spectra found for PCA. Check CSV files for row consistency.")
    spectra = np.stack(spectra_list)    
    pca = PCA(n_components=PCA_NCOMP, random_state=SEED).fit(spectra)
    return pca

def parse_one(path, pca, augment=True):
    """Load JPG + CSV, apply optional rotation, pack props into channels."""
    # load grayscale image
    img = cv2.imread(str(path), cv2.IMREAD_GRAYSCALE)
    folder = path.parent.name

    # augment: random rotation for specific folders
    if augment and folder in {"0d65h","0d75h"}:
        angle = np.random.randint(0,360)
        M = cv2.getRotationMatrix2D((IMG_W/2, IMG_H/2), angle, 1.0)
        img = cv2.warpAffine(img, M, (IMG_W, IMG_H), borderMode=cv2.BORDER_REFLECT)
    else:
        img = cv2.resize(img, (IMG_W, IMG_H))

    # normalize
    img = img.astype("float32") / 255.0
    img = img[...,None]  # shape (H,W,1)

    # extract numeric properties from folder name, map to [0,1]
    h,a = [int(x) for x in re.findall(r"\d+", folder)]
    prop = np.array([h/60., a/75.], dtype="float32")
    prop1 = np.full((IMG_H, IMG_W, 1), prop[0], dtype="float32")
    prop2 = np.full((IMG_H, IMG_W, 1), prop[1], dtype="float32")
    img = np.concatenate([img, prop1, prop2], axis=-1)  # shape (H,W,3)

    # PCA-transform the spectrum
    spectrum = load_spectrum(path.with_suffix(".csv"))
    y = pca.transform(spectrum.reshape(1,-1))[0].astype("float32")
    return img, y

def make_tf_dataset(paths, pca, training=False):
    """Build a tf.data.Dataset for a list of Paths."""
    AUTOTUNE = tf.data.AUTOTUNE

    def gen():
        expected_dim = pca.components_.shape[1]
        for p in paths:
            spec = load_spectrum(p.with_suffix(".csv"))
            if spec.shape[0] != expected_dim:
                print(f"Skipping {p.name}: spectrum length {spec.shape[0]} ≠ expected {expected_dim}")
                continue
            yield parse_one(p, pca, augment=training)

    ds = tf.data.Dataset.from_generator(
        gen,
        output_signature=(
            tf.TensorSpec((IMG_H, IMG_W, 3), tf.float32),
            tf.TensorSpec((PCA_NCOMP,), tf.float32),
        )
    )
    if training:
        ds = ds.shuffle(len(paths), seed=SEED, reshuffle_each_iteration=True)
    ds = ds.batch(BATCH_SIZE).prefetch(AUTOTUNE)
    return ds

def weighted_mse(y_true, y_pred):
    idx = tf.cast(tf.range(1, PCA_NCOMP+1), tf.float32)
    return tf.reduce_mean((1.0/idx) * tf.square(y_true - y_pred))



In [5]:
# %%
# ## 3.  Build the Vision Transformer

def patch_embed(x):
    x = layers.Conv2D(filters=MLP_DIM,
                      kernel_size=(PATCH_SIZE,PATCH_SIZE),
                      strides=(PATCH_SIZE,PATCH_SIZE),
                      padding="valid")(x)
    num_patches = (IMG_H//PATCH_SIZE)**2
    x = layers.Reshape((num_patches, MLP_DIM))(x)
    return x

def build_vit():
    inp = layers.Input((IMG_H, IMG_W, 3))
    x   = patch_embed(inp)

    # learnable positional embedding
    pos = layers.Embedding(input_dim=x.shape[1], output_dim=MLP_DIM)(
        tf.range(start=0, limit=x.shape[1], delta=1)
    )
    x = x + pos

    # transformer encoder blocks
    for _ in range(VIT_LAYERS):
        # attention block
        y = layers.LayerNormalization(epsilon=1e-6)(x)
        y = layers.MultiHeadAttention(
            num_heads=VIT_NUM_HEADS, key_dim=MLP_DIM
        )(y, y)
        x = layers.Add()([x, y])

        # MLP block
        y = layers.LayerNormalization(epsilon=1e-6)(x)
        y = layers.Dense(MLP_DIM*4, activation="gelu")(y)
        y = layers.Dense(MLP_DIM)(y)
        x = layers.Add()([x, y])

    x = layers.GlobalAveragePooling1D()(x)
    out = layers.Dense(PCA_NCOMP, activation="linear")(x)
    return keras.Model(inputs=inp, outputs=out, name="ViT_PCA")



In [6]:
# %%
# ## 4.  Prepare Data & PCA

all_jpg = collect_images()
train_files, val_files, test_files = split_files(all_jpg)
print(f"Splits: train={len(train_files)}  val={len(val_files)}  test={len(test_files)}")

pca = fit_pca(train_files)
print("PCA fitted, explained variance sum:", pca.explained_variance_ratio_.sum())

train_ds = make_tf_dataset(train_files, pca, training=True)
val_ds   = make_tf_dataset(val_files,   pca, training=False)
test_ds  = make_tf_dataset(test_files,  pca, training=False)



Splits: train=2976  val=338  test=67
Skipping 30d75h-178.jpg: length 21 (expected 201)
Skipping 60d75h-897.jpg: length 0 (expected 201)
PCA fitted, explained variance sum: 0.96804994


2025-06-26 17:30:36.936420: W tensorflow/core/common_runtime/gpu/gpu_device.cc:1960] Cannot dlopen some GPU libraries. Please make sure the missing libraries mentioned above are installed properly if you would like to use GPU. Follow the guide at https://www.tensorflow.org/install/gpu for how to download and setup the required libraries for your platform.
Skipping registering GPU devices...


In [None]:
# %%
# ## 5.  Compile & Train

strategy = tf.distribute.get_strategy()  # change if multi-GPU
with strategy.scope():
    model = build_vit()
    opt = tf.keras.optimizers.Adam(BASE_LR)
    model.compile(optimizer=opt,
                  loss=weighted_mse,
                  metrics=["mae"])

model.summary()

callbacks = [
    keras.callbacks.EarlyStopping(patience=PATIENCE, restore_best_weights=True),
    keras.callbacks.ReduceLROnPlateau(patience=PATIENCE//2, factor=0.5),
    keras.callbacks.ModelCheckpoint(
        str(OUTPUT_DIR/"vit_best.h5"), save_best_only=True, monitor="val_loss"
    ),
    keras.callbacks.TensorBoard(str(OUTPUT_DIR/"tb")),
]

history = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=EPOCHS,
    callbacks=callbacks,
    verbose=1
)

pd.DataFrame(history.history).to_csv(OUTPUT_DIR/"history.csv", index=False)
model.save(OUTPUT_DIR/"vit_final.h5")



Model: "ViT_PCA"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_1 (InputLayer)        [(None, 64, 64, 3)]          0         []                            
                                                                                                  
 conv2d (Conv2D)             (None, 8, 8, 128)            24704     ['input_1[0][0]']             
                                                                                                  
 reshape (Reshape)           (None, 64, 128)              0         ['conv2d[0][0]']              
                                                                                                  
 tf.__operators__.add (TFOp  (None, 64, 128)              0         ['reshape[0][0]']             
 Lambda)                                                                                    

2025-06-26 17:30:56.070864: I tensorflow/core/kernels/data/shuffle_dataset_op.cc:422] Filling up shuffle buffer (this may take a while): 2552 of 2976


Skipping 60d75h-897.jpg: spectrum length 0 ≠ expected 201


2025-06-26 17:30:57.710043: I tensorflow/core/kernels/data/shuffle_dataset_op.cc:450] Shuffle buffer filled.


     43/Unknown - 60s 926ms/step - loss: 0.2647 - mae: 0.6874

In [None]:
# %%
# ## 6.  Evaluate on Test Set

mse_list = []
for x_batch, y_batch in test_ds:
    preds = model.predict(x_batch, verbose=1)
    mses = np.mean((y_batch - preds)**2, axis=1)
    mse_list.extend(mses.tolist())

print("Mean Test MSE:", np.mean(mse_list))

# plot a few spectra overlays
import matplotlib.pyplot as plt
count=0
for x_batch, y_batch in test_ds.take(1):
    preds = model.predict(x_batch, verbose=1)
    inv_true = pca.inverse_transform(y_batch)
    inv_pred = pca.inverse_transform(preds)
    for i in range(min(31, len(inv_true))):
        plt.figure()
        plt.plot(inv_true[i], label="True")
        plt.plot(inv_pred[i], '--', label="Pred")
        plt.legend()
        plt.title(f"Sample {count}")
        plt.ylim(0, 1)  
        plt.show()
        count+=1

# %%
print("Notebook run complete! All artefacts in", OUTPUT_DIR)

In [None]:
# %% Cell 7: 5-FOLD CROSS-VALIDATION (C-2), with per-sample validation
from sklearn.model_selection import KFold
import json, time

def kfold_vit(build_fn, all_files, k=5, epochs=EPOCHS):
    kf = KFold(n_splits=k, shuffle=True, random_state=SEED)
    mses, times = [], []
    for fold, (tr_idx, vl_idx) in enumerate(kf.split(all_files), 1):
        print(f"\n▶ Fold {fold}/{k}")
        tr = [all_files[i] for i in tr_idx]
        vl = [all_files[i] for i in vl_idx]

        ds_tr = make_tf_dataset(tr, pca, training=True)
        ds_vl = make_tf_dataset(vl, pca, training=False)

        # build & train
        m = build_fn()
        m.compile(optimizer=keras.optimizers.Adam(BASE_LR),
                  loss=weighted_mse)
        m.fit(ds_tr,
              epochs=epochs,
              callbacks=[keras.callbacks.EarlyStopping(patience=PATIENCE,
                                                       restore_best_weights=True)],
              verbose=1)

        # measure inference-time
        t0 = time.perf_counter()
        _  = m.predict(ds_vl, verbose=0)
        t1 = time.perf_counter()
        sec_sample = (t1 - t0) / len(vl)

        # evaluate MSE on each valid sample
        ys, preds = [], []
        for p in vl:
            try:
                x,y = parse_one(p, pca, augment=False)
            except ValueError as e:
                print(f"  Skipping {p.name}: {e}")
                continue
            pr = m.predict(x[None,...], verbose=0)[0]
            ys.append(pca.inverse_transform(y[None,:])[0])
            preds.append(pca.inverse_transform(pr[None,:])[0])

        if not ys:
            print(f"  No valid samples in fold {fold}, skipping metrics.")
            continue

        fold_mse = np.mean([mean_squared_error(a,b) for a,b in zip(ys,preds)])
        print(f"  Fold {fold} MSE={fold_mse:.5f}, {sec_sample*1000:.2f} ms/sample")
        mses.append(fold_mse); times.append(sec_sample)

        tf.keras.backend.clear_session(); gc.collect()

    summary = {
        "mse_mean": float(np.mean(mses)),
        "mse_std":  float(np.std(mses)),
        "avg_ms_per_sample": float(np.mean(times))*1000
    }
    with open(OUTPUT_DIR/"vit_kfold.json","w") as f:
        json.dump(summary, f, indent=2)
    print("✅ K-fold summary written to", OUTPUT_DIR/"vit_kfold.json", "\n", summary)

# run it
kfold_vit(build_vit, train_files, k=5)


  Fold 4 MSE=0.01031, 7.47 ms/sample

▶ Fold 5/5
Epoch 1/2
Epoch 2/2
Skipping 30d75h-178.jpg: spectrum length 21 ≠ expected 201
Skipping 60d75h-897.jpg: spectrum length 0 ≠ expected 201
  Skipping 30d75h-178.jpg: X has 21 features, but PCA is expecting 201 features as input.
  Skipping 60d75h-897.jpg: Found array with 0 feature(s) (shape=(1, 0)) while a minimum of 1 is required by PCA.
  Fold 5 MSE=0.00973, 7.60 ms/sample
✅ K-fold summary written to outputs/notebook/vit_kfold.json 
 {'mse_mean': 0.009577682241797447, 'mse_std': 0.0007598736556246877, 'avg_ms_per_sample': 6.664649968166674}


In [None]:
# %% Cell 8: OUT-OF-DISTRIBUTION EVALUATION (C-3), skipping bad files
def is_ood(path, h_min=68, h_max=72, max_angle=50):
    h,a = [int(x) for x in re.findall(r"\d+", path.parent.name)]
    return (h < h_min or h > h_max) or (a >= max_angle)

ood_files = [p for p in train_files+val_files+test_files if is_ood(p)]
print(f"OOD candidate set size: {len(ood_files)}")

ys, preds = [], []
for p in ood_files:
    try:
        x,y = parse_one(p, pca, augment=False)
    except ValueError as e:
        print(f"  Skipping {p.name}: {e}")
        continue
    pr = model.predict(x[None,...], verbose=0)[0]
    ys.append(pca.inverse_transform(y[None,:])[0])
    preds.append(pca.inverse_transform(pr[None,:])[0])

if ys:
    ood_mse = np.mean([mean_squared_error(a,b) for a,b in zip(ys,preds)])
    print(f"✅ ViT OOD MSE = {ood_mse:.5f}")
    pd.DataFrame({
        "file": [p.name for p in ood_files[:len(ys)]],
        "mse":  [mean_squared_error(a,b) for a,b in zip(ys,preds)]
    }).to_csv(OUTPUT_DIR/"vit_ood_details.csv", index=False)
else:
    print("⚠️  No valid OOD samples found.")


In [None]:
# %% Inverse-Design (improved)
import tensorflow as tf

def inverse_design_constrained(target_csv, steps=300, lr=1e-1, tv_weight=1e-2):
    # 1. load & PCA-project target spectrum
    spec   = load_spectrum(Path(target_csv))
    if spec.shape[0] != pca.n_features_in_:
        raise ValueError("CSV length mismatch")
    t_pca  = pca.transform(spec.reshape(1,-1))[0]

    # 2. build initial guess: start from a uniform gray geometry
    #    and freeze channels 1 & 2 to the known physical props
    #    parse_one() can give us prop-channels for one sample:
    example_img, _ = parse_one(train_files[0], pca, augment=False)
    prop1 = example_img[...,1:2]  # channel 1
    prop2 = example_img[...,2:3]  # channel 2

    # trainable geometry channel (1): initialize to 0.5
    geom = tf.Variable(tf.ones([1,IMG_H,IMG_W,1]) * 0.5, dtype=tf.float32)

    opt = tf.keras.optimizers.Adam(lr)
    best_l, best_geom = 1e9, None

    for step in range(1, steps+1):
        with tf.GradientTape() as tape:
            # assemble full input: [geom, prop1, prop2]
            x = tf.concat([geom, prop1[None,...], prop2[None,...]], axis=-1)
            pred = model(x, training=False)                   # ViT forward
            err  = tf.reduce_mean((pred - t_pca)**2)          # spectrum MSE

            # total variation loss on geometry for smoothness
            tv   = tf.image.total_variation(geom)[0]
            loss = err + tv_weight * tv

        grads = tape.gradient(loss, [geom])
        opt.apply_gradients(zip(grads, [geom]))
        # clamp geometry to [0,1]
        geom.assign(tf.clip_by_value(geom, 0.0, 1.0))

        # track best
        if loss < best_l:
            best_l, best_geom = float(loss), geom.numpy().copy()

        if step % 50 == 0:
            print(f"step {step:>3d}  err={err:.4e}  tv={tv:.4e}")

    # 3. threshold geometry to binary mask
    bin_geom = (best_geom[0,...,0] > 0.5).astype(np.uint8) * 255
    out_png = OUTPUT_DIR/f"inv_design_bin_{Path(target_csv).stem}.png"
    cv2.imwrite(str(out_png), bin_geom)

    # 4. export the predicted spectrum
    full_input = np.stack([bin_geom/255, 
                           prop1[...,:,0], 
                           prop2[...,:,0]], axis=-1)[None,...]
    pred_pca   = model(full_input, training=False)[0].numpy()
    pred_spec  = pca.inverse_transform(pred_pca)
    pd.DataFrame({"target":spec, "predicted":pred_spec})\
      .to_csv(OUTPUT_DIR/f"inv_design_{Path(target_csv).stem}_spec.csv", index=False)

    print(f"Saved binary design → {out_png}  (best loss {best_l:.4e})")
    return out_png

# Example usage:
# inverse_design_constrained("data/0d65h/sample_123.csv")
