In [None]:
# Cell 1
import os
import numpy as np
import matplotlib.pyplot as plt
from sklearn.manifold import TSNE
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, Model

BATCH_SIZE = 64

DATASET = "KTH"

MODEL_ARCH = "64"
USE_CONTEXT = True
USE_FOVEA = True

N_SAMPLES = 40000
PERPLEXITY = 400
MAX_ITER = 1000

EXPERIMENT = "both64_500_64_ctx64x64_fov32x32"
OUTPUT_DIR = f"{DATASET}_results/{EXPERIMENT}"

CONTEXT_SHAPE = (64, 64, 1)
FOVEA_SHAPE = (32, 32, 1)


CLASS_NAMES = ["boxing", "handclapping", "handwaving", "walking"] if DATASET != "UCF11" else ["basketball","biking","diving","golf_swing","horse_riding","soccer_juggling","swing","tennis_swing","trampoline_jumping","volleyball_spiking","walking"]
NUM_CLASSES = len(CLASS_NAMES)


ENABLE_XLA_JIT      = True
CACHE_TO_DISK       = True
PREFETCH_TO_DEVICE  = True
SHUFFLE_BUFFER      = 30000

try:
    tf.config.optimizer.set_jit(ENABLE_XLA_JIT)
    print("XLA JIT:", "ENABLED" if ENABLE_XLA_JIT else "DISABLED")
except Exception as e:
    print("Could not set XLA JIT:", e)


gpus = tf.config.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        tf.config.set_visible_devices(gpus[0], 'GPU')
        print("GPU detected and configured for training:", gpus[0])
    except RuntimeError as e:
        print(e)
else:
    print("No GPU found. Training will run on CPU.")

XLA JIT: ENABLED
GPU detected and configured for training: PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')


In [None]:
# Cell 2
if MODEL_ARCH == "64":
    CHANNEL1_SIZE, CHANNEL2_SIZE, DENSE1_SIZE, DENSE2_SIZE = 16, 16, 10, 16
elif MODEL_ARCH == "32":
    CHANNEL1_SIZE, CHANNEL2_SIZE, DENSE1_SIZE, DENSE2_SIZE = 16, 32, 20, 32
else:
    CHANNEL1_SIZE, CHANNEL2_SIZE, DENSE1_SIZE, DENSE2_SIZE = 8, 16, 8, 8

def build_branch(input_shape, name_prefix=""):
    inp = keras.Input(shape=input_shape, name=f"{name_prefix}input")
    x = layers.Conv2D(CHANNEL1_SIZE, (3, 3), kernel_initializer="he_normal", bias_initializer="zeros")(inp)
    x = layers.MaxPooling2D((2, 2))(x)
    x = layers.Conv2D(CHANNEL2_SIZE, (3, 3), kernel_initializer="he_normal", bias_initializer="zeros")(x)
    x = layers.MaxPooling2D((2, 2))(x)
    x = layers.Dropout(0.3)(x)
    x = layers.Flatten()(x)
    return inp, x

def build_model(use_context=True, use_fovea=True):
    inputs, branches = [], []
    if use_context:
        ci, co = build_branch(CONTEXT_SHAPE, "context_")
        inputs.append(ci)
        branches.append(co)
    if use_fovea:
        fi, fo = build_branch(FOVEA_SHAPE, "fovea_")
        inputs.append(fi)
        branches.append(fo)
    fused = branches[0] if len(branches) == 1 else layers.Concatenate()(branches)
    z = layers.Dense(DENSE1_SIZE, kernel_initializer="he_normal", bias_initializer="zeros")(fused)
    z = layers.Dropout(0.5)(z)
    z = layers.Dense(DENSE2_SIZE, name="embedding", kernel_initializer="he_normal", bias_initializer="zeros")(z)
    z = layers.Dropout(0.5)(z)
    out = layers.Dense(NUM_CLASSES, activation="softmax", kernel_initializer="he_normal", bias_initializer="zeros")(z)
    return Model(inputs=inputs, outputs=out)

In [None]:
# Cell 3
def make_tf_dataset(features_paths, labels, batch_size, shuffle=False, drop_remainder=False):
    feat_elems = {k: tf.constant(v) for k, v in features_paths.items()}
    ds = tf.data.Dataset.from_tensor_slices((feat_elems, labels))
    if shuffle:
        ds = ds.shuffle(min(30000, len(labels)))
    ctx_h, ctx_w, _ = CONTEXT_SHAPE
    fov_h, fov_w, _ = FOVEA_SHAPE
    def _loader(features, label):
        out_feats = {}
        if "context_input" in features:
            x = tf.io.read_file(features["context_input"])
            x = tf.io.decode_png(x, channels=1)
            x = tf.image.convert_image_dtype(x, tf.float32)
            x = tf.image.resize(x, [ctx_h, ctx_w])
            x = tf.ensure_shape(x, CONTEXT_SHAPE)
            out_feats["context_input"] = x
        if "fovea_input" in features:
            x = tf.io.read_file(features["fovea_input"])
            x = tf.io.decode_png(x, channels=1)
            x = tf.image.convert_image_dtype(x, tf.float32)
            x = tf.image.resize(x, [fov_h, fov_w])
            x = tf.ensure_shape(x, FOVEA_SHAPE)
            out_feats["fovea_input"] = x
        return out_feats, label
    ds = ds.map(_loader, num_parallel_calls=tf.data.AUTOTUNE)
    ds = ds.batch(batch_size, drop_remainder=drop_remainder)
    ds = ds.prefetch(tf.data.AUTOTUNE)
    return ds

In [None]:
# Cell 4
CHECKPOINTS_DIR = os.path.join(OUTPUT_DIR, "checkpoints")
PLOTS_DIR = os.path.join(OUTPUT_DIR, "plots")
TEST_DATA_DIR = os.path.join(OUTPUT_DIR, "test_data")
os.makedirs(PLOTS_DIR, exist_ok=True)

In [None]:
# Cell 5
fold_id = 1

data = np.load(os.path.join(TEST_DATA_DIR, f"fold_{fold_id}_test_data.npz"), allow_pickle=True)
y = data["y"].astype(np.int32)
features = {}
use_ctx = "X_context_paths" in data.files
use_fov = "X_fovea_paths" in data.files

indices = np.random.choice(len(y), size=N_SAMPLES, replace=False)


y = y[indices]
if "X_context_paths" in data.files:
    features["context_input"] = data["X_context_paths"][indices].astype(str).tolist()
if "X_fovea_paths" in data.files:
    features["fovea_input"] = data["X_fovea_paths"][indices].astype(str).tolist()

ds = make_tf_dataset(features, y, batch_size=BATCH_SIZE, shuffle=False, drop_remainder=False)

model = build_model(use_ctx, use_fov)
model.compile(optimizer="adam",
            loss=keras.losses.SparseCategoricalCrossentropy(),
            metrics=["accuracy"])    
ckpt_path = os.path.join(CHECKPOINTS_DIR, f"fold_{fold_id}_best_model.keras")
model.load_weights(ckpt_path)
embed_model = Model(inputs=model.inputs, outputs=model.get_layer("embedding").output)
embeddings = embed_model.predict(ds, verbose=0)

Z = TSNE(n_components=2,
         init="pca",
         perplexity=PERPLEXITY,
         learning_rate="auto",
         max_iter=MAX_ITER,
         random_state=42).fit_transform(embeddings)

plt.figure(figsize=(10, 6))
palette = ["r","g","b","c","m","y","k","tab:orange","tab:purple","tab:brown","tab:pink","tab:gray","tab:olive","tab:cyan"]
for cls_idx in range(NUM_CLASSES):
    mask = y == cls_idx
    if np.any(mask):
        plt.scatter(Z[mask, 0], Z[mask, 1], s=18, alpha=0.9, label=CLASS_NAMES[cls_idx] if cls_idx < len(CLASS_NAMES) else f"C{cls_idx+1}", c=palette[cls_idx % len(palette)])
plt.xlabel("Dimension 1")
plt.ylabel("Dimension 2")
plt.legend(loc="best", title="Classes")
plt.tight_layout()
save_path = os.path.join(PLOTS_DIR, f"tsne_fold{fold_id}_s{N_SAMPLES}_p{PERPLEXITY}_i{MAX_ITER}.png")
plt.savefig(save_path, dpi=200)
plt.close()
print("Saved:", save_path)

W0000 00:00:1754873141.637436  938274 graph_launch.cc:671] Fallback to op-by-op mode because memset node breaks graph update


Saved: KTH_results/both64_500_64_ctx64x64_fov32x32/plots/tsne_fold1_s40000_p400_i1000.png
