<a href="https://colab.research.google.com/github/Isafon/ECE528/blob/main/ECE528_ASN3_Q4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# ECE528 Lab 4 Q1 - Isa Fontana

#### Q4: Pruning

## Imports!

In [1]:
import os, io, zipfile, numpy as np, pandas as pd, tensorflow as tf
from tensorflow.keras import layers, models, callbacks
import matplotlib.pyplot as plt

np.random.seed(42)
tf.random.set_seed(42)
print("TF version:", tf.__version__)

TF version: 2.19.0


## Choose File

In [2]:
from google.colab import files
uploaded = files.upload()

Saving archive.zip to archive (1).zip


## Unzip the File

In [3]:
# Create a working folder
DATA_DIR = "./data_asl"
os.makedirs(DATA_DIR, exist_ok=True)

# If a zip was uploaded, extract it
for fname in uploaded.keys():
    if fname.lower().endswith(".zip"):
        with zipfile.ZipFile(io.BytesIO(uploaded[fname]), 'r') as zf:
            zf.extractall(DATA_DIR)
        print(f"Extracted: {fname} -> {DATA_DIR}")

# Figure out where the CSVs ended up (root or inside DATA_DIR)
candidates = [
    "sign_mnist_train.csv",
    "sign_mnist_test.csv",
    os.path.join(DATA_DIR, "sign_mnist_train.csv"),
    os.path.join(DATA_DIR, "sign_mnist_test.csv"),
]

# Build resolved paths
train_csv, test_csv = None, None
for c in candidates:
    if c.endswith("sign_mnist_train.csv") and os.path.exists(c):
        train_csv = c
    if c.endswith("sign_mnist_test.csv") and os.path.exists(c):
        test_csv = c

assert train_csv and test_csv, "Could not find the CSVs. Re-upload the zip or both CSV files."

print("Train CSV:", train_csv)
print("Test  CSV:", test_csv)

Extracted: archive (1).zip -> ./data_asl
Train CSV: ./data_asl/sign_mnist_train.csv
Test  CSV: ./data_asl/sign_mnist_test.csv


## Load Data

In [4]:
# Load CSVs
train_df = pd.read_csv(train_csv)
test_df  = pd.read_csv(test_csv)

# Separate labels and pixels
y_train_raw = train_df.pop('label').values
y_test_raw  = test_df.pop('label').values

x_train = train_df.values.reshape(-1, 28, 28, 1).astype("float32") / 255.0
x_test  = test_df.values.reshape(-1, 28, 28, 1).astype("float32") / 255.0

# Make labels contiguous (handles “missing J and Z”)
uniq = np.unique(np.concatenate([y_train_raw, y_test_raw]))
remap = {old:i for i, old in enumerate(sorted(uniq))}
y_train = np.array([remap[v] for v in y_train_raw])
y_test  = np.array([remap[v] for v in y_test_raw])

num_classes = len(uniq)  # should be 24
print("Shapes:", x_train.shape, x_test.shape)
print("Classes detected:", num_classes, "Original label ids:", uniq)

Shapes: (27455, 28, 28, 1) (7172, 28, 28, 1)
Classes detected: 24 Original label ids: [ 0  1  2  3  4  5  6  7  8 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24]


## Model It

In [5]:
def CBR(filters):
    # Conv -> BatchNorm -> ReLU (BN immediately before activation per instructions)
    return tf.keras.Sequential([
        layers.Conv2D(filters, 3, padding="same", use_bias=False),
        layers.BatchNormalization(),
        layers.ReLU(),
    ])

inputs = layers.Input((28, 28, 1))
x = CBR(32)(inputs);  x = CBR(32)(x);  x = layers.MaxPool2D()(x);  x = layers.Dropout(0.25)(x)
x = CBR(64)(x);       x = CBR(64)(x);  x = layers.MaxPool2D()(x);  x = layers.Dropout(0.25)(x)
x = CBR(128)(x);      x = layers.Conv2D(128, 3, padding="same", use_bias=False)(x)
x = layers.BatchNormalization()(x); x = layers.ReLU()(x)
x = layers.GlobalAveragePooling2D()(x); x = layers.Dropout(0.40)(x)
outputs = layers.Dense(num_classes, activation="softmax")(x)

model = models.Model(inputs, outputs, name="asl_mnist_cnn")
model.summary()

## Train It

In [7]:
from tensorflow.keras import callbacks

model.compile(optimizer=tf.keras.optimizers.Adam(1e-3),
              loss="sparse_categorical_crossentropy",
              metrics=["accuracy"])

rlrop = callbacks.ReduceLROnPlateau(
    monitor="val_accuracy", factor=0.5, patience=2, min_lr=1e-5, verbose=1
)
es = callbacks.EarlyStopping(
    monitor="val_accuracy", patience=6, restore_best_weights=True, verbose=1
)

history = model.fit(
    x_train, y_train,
    epochs=10,           # longer run; ES will stop early
    batch_size=128,
    validation_split=0.10,     # from TRAIN only
    callbacks=[rlrop, es],
    verbose=2
)

Epoch 1/10
194/194 - 18s - 92ms/step - accuracy: 0.6338 - loss: 1.3128 - val_accuracy: 0.0736 - val_loss: 4.3066 - learning_rate: 1.0000e-03
Epoch 2/10
194/194 - 1s - 6ms/step - accuracy: 0.9611 - loss: 0.2217 - val_accuracy: 0.4126 - val_loss: 1.9614 - learning_rate: 1.0000e-03
Epoch 3/10
194/194 - 1s - 5ms/step - accuracy: 0.9925 - loss: 0.0669 - val_accuracy: 0.9428 - val_loss: 0.1875 - learning_rate: 1.0000e-03
Epoch 4/10
194/194 - 1s - 6ms/step - accuracy: 0.9966 - loss: 0.0355 - val_accuracy: 0.9920 - val_loss: 0.0244 - learning_rate: 1.0000e-03
Epoch 5/10
194/194 - 1s - 6ms/step - accuracy: 0.9987 - loss: 0.0185 - val_accuracy: 0.9993 - val_loss: 0.0097 - learning_rate: 1.0000e-03
Epoch 6/10
194/194 - 1s - 6ms/step - accuracy: 0.9957 - loss: 0.0265 - val_accuracy: 0.9989 - val_loss: 0.0088 - learning_rate: 1.0000e-03
Epoch 7/10
194/194 - 1s - 6ms/step - accuracy: 0.9995 - loss: 0.0094 - val_accuracy: 1.0000 - val_loss: 9.7055e-04 - learning_rate: 1.0000e-03
Epoch 8/10
194/194 - 

In [8]:
test_loss, test_acc = model.evaluate(x_test, y_test, verbose=0)
print("Q1 Test accuracy:", round(float(test_acc), 4))

Q1 Test accuracy: 0.9969


## Accuracy Overall (Proof)

In [9]:
# === Q1: summarize training/validation accuracy over all completed epochs ===
import numpy as np

hist = history.history
# Works with both old/new key names
train_key = 'accuracy' if 'accuracy' in hist else 'acc'
val_key   = 'val_accuracy' if 'val_accuracy' in hist else 'val_acc'

train_acc = np.array(hist[train_key], dtype=float)
val_acc   = np.array(hist[val_key],   dtype=float)

epochs_run = len(val_acc)
best_idx   = int(np.argmax(val_acc))           # 0-based
best_epoch = best_idx + 1                      # 1-based
best_val   = float(val_acc[best_idx])
mean_val   = float(val_acc.mean())
mean_train = float(train_acc.mean())

print(f"Epochs completed: {epochs_run}")
print(f"Mean TRAIN accuracy over epochs: {mean_train:.4f}")
print(f"Mean VAL   accuracy over epochs: {mean_val:.4f}")
print(f"Best VAL accuracy: {best_val:.4f} (epoch {best_epoch})")
print(f"Last VAL accuracy: {val_acc[-1]:.4f}")

try:
    print(f"TEST accuracy: {float(test_acc):.4f}")
except NameError:
    print("TEST accuracy: (run your evaluate cell to show this)")

# Quick pass/fail for the assignment target
meets_target = (mean_val >= 0.92) or ('test_acc' in globals() and float(test_acc) >= 0.92)
print("Meets ≥92% target (mean VAL or TEST):", "Yes" if meets_target else "Not yet")

Epochs completed: 10
Mean TRAIN accuracy over epochs: 0.9577
Mean VAL   accuracy over epochs: 0.8419
Best VAL accuracy: 1.0000 (epoch 7)
Last VAL accuracy: 1.0000
TEST accuracy: 0.9969
Meets ≥92% target (mean VAL or TEST): Yes


## Save the Model

In [10]:
model.save("asl_mnist_baseline.keras")
print("Saved: asl_mnist_baseline.keras")

Saved: asl_mnist_baseline.keras


# Q4 Starts Here!

### Imports + quiet install (if needed)

In [16]:
# Q4 Cell 1 — setup (safe version)
!pip -q install -U tensorflow-model-optimization==0.8.0

import os, numpy as np, tensorflow as tf, tensorflow_model_optimization as tfmot
from tensorflow import keras

# Optional: these can only be set BEFORE TF initializes
# Skip them if the context is already initialized (avoid RuntimeError)
try:
    tf.config.threading.set_intra_op_parallelism_threads(1)
    tf.config.threading.set_inter_op_parallelism_threads(1)
except RuntimeError:
    print("[Info] Thread settings already initialized; skipping manual thread config.")

# Keep XNNPACK threads low to avoid CPU overuse
os.environ["XNNPACK_NUM_THREADS"] = "1"

# Seeds for reproducibility
np.random.seed(42)
tf.random.set_seed(42)

print("TensorFlow:", tf.__version__)
print("TF-MOT:", tfmot.__version__)

[Info] Thread settings already initialized; skipping manual thread config.
TensorFlow: 2.19.0
TF-MOT: 0.8.0


### Load baseline

In [17]:
# Q4 Cell 2 — baseline from Q1
# Re-use in-memory Q1 model if present; otherwise reload the file you saved in Q1.
BASELINE_PATH = "asl_mnist_baseline.keras"

if "model" in globals():
    baseline = model
    print("Using in-memory Q1 model variable: model")
else:
    baseline = keras.models.load_model(BASELINE_PATH)
    print(f"Loaded baseline from: {BASELINE_PATH}")

# sanity: we need x_train/y_train/x_test/y_test from Q1 still in memory
assert "x_train" in globals() and "y_train" in globals() and "x_test" in globals() and "y_test" in globals(), \
    "Run your Q1 cells first so x_train/y_train/x_test/y_test are in memory."

print("Q1 float model ready. IO:", baseline.input_shape, baseline.output_shape)


Using in-memory Q1 model variable: model
Q1 float model ready. IO: (None, 28, 28, 1) (None, 24)


### Helpers

In [18]:
# Q4 Cell 3 — helpers

def make_pruned_clone(base_model, target_sparsity, epochs=6, batch=128):
    """
    Try TF-MOT magnitude pruning; if the environment rejects some layers,
    fall back to a simple magnitude mask (zeros the smallest |W|).
    Returns a compiled Keras model ready to fine-tune.
    """
    m = keras.models.clone_model(base_model)
    m.set_weights(base_model.get_weights())
    m.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"])

    # Try official TF-MOT first
    try:
        steps = math.ceil(len(x_train) / batch)
        sched = tfmot.sparsity.keras.PolynomialDecay(
            initial_sparsity=0.0, final_sparsity=float(target_sparsity),
            begin_step=0, end_step=steps * epochs
        )
        pm = tfmot.sparsity.keras.prune_low_magnitude(m, pruning_schedule=sched)
        pm.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"])
        pm._uses_prune_wrapper = True  # tag for callbacks
        return pm
    except Exception as e:
        print(f"[Info] TF-MOT pruning failed ({type(e).__name__}: {e}); using mask-based fallback.")

    # Fallback: magnitude mask per Conv2D/Dense
    for lyr in m.layers:
        if isinstance(lyr, (keras.layers.Conv2D, keras.layers.Dense)):
            ws = lyr.get_weights()
            if not ws:
                continue
            W, *rest = ws
            # keep the largest (1 - target_sparsity) fraction
            keep = max(1, int(np.floor(W.size * (1.0 - float(target_sparsity)))))
            thr = np.partition(np.abs(W).ravel(), -keep)[-keep]
            mask = (np.abs(W) >= thr).astype(W.dtype)
            lyr.set_weights([W * mask] + rest)
    m._uses_prune_wrapper = False
    return m


def to_tflite_dynamic_no_artifacts(keras_model, out_path):
    """
    Convert via ConcreteFunction to avoid SavedModel export (prevents the
    'Endpoint/Captures' wall of logs and saves memory). CPU-only.
    """
    sig = tf.TensorSpec([None, 28, 28, 1], tf.float32)

    @tf.function(input_signature=[sig])
    def wrapped(x):
        return keras_model(x, training=False)

    cf = wrapped.get_concrete_function()

    # Silence converter logs
    _stdout = io.StringIO()
    real_stdout = os.sys.stdout
    os.sys.stdout = _stdout
    try:
        with tf.device("/CPU:0"):
            conv = tf.lite.TFLiteConverter.from_concrete_functions([cf])
            conv.optimizations = [tf.lite.Optimize.DEFAULT]  # dynamic range
            tfl = conv.convert()
    finally:
        os.sys.stdout = real_stdout

    open(out_path, "wb").write(tfl)
    return out_path


def tflite_acc_fixed_batch(tfl_path, x, y, batch=256):
    """
    Evaluate a float-input TFLite model with a fixed batch to minimize
    tensor reallocations (saves RAM and time). CPU-only.
    """
    inter = tf.lite.Interpreter(model_path=tfl_path, num_threads=1)
    inp = inter.get_input_details()[0]["index"]
    out = inter.get_output_details()[0]["index"]

    n = len(x)
    main = (n // batch) * batch
    correct = 0

    if main:
        inter.resize_tensor_input(inp, [batch, 28, 28, 1]); inter.allocate_tensors()
        for i in range(0, main, batch):
            xb = x[i:i+batch].astype(np.float32)
            inter.set_tensor(inp, xb)
            inter.invoke()
            pred = inter.get_tensor(out).argmax(1)
            correct += (pred == y[i:i+batch]).sum()

    if main < n:  # tail
        tail = n - main
        inter.resize_tensor_input(inp, [tail, 28, 28, 1]); inter.allocate_tensors()
        xb = x[main:].astype(np.float32)
        inter.set_tensor(inp, xb)
        inter.invoke()
        pred = inter.get_tensor(out).argmax(1)
        correct += (pred == y[main:]).sum()

    return float(correct) / float(n)


def gz(path):
    gz_path = path + ".gz"
    with open(path, "rb") as fi, gzip.open(gz_path, "wb") as fo:
        fo.write(fi.read())
    return gz_path


def mb(bytes_):
    return bytes_ / (1024 * 1024)


def free_all(*objs):
    for o in objs:
        try: del o
        except: pass
    tf.keras.backend.clear_session()
    gc.collect()


### One full run

In [19]:
# Q4 Cell 4 — run pruning + quantization and summarize
targets = [0.50, 0.75, 0.90]
results = []

for s in targets:
    print(f"\n[Q4] Running sparsity={s:.2f}")

    # Build pruned copy
    pruned = make_pruned_clone(baseline, target_sparsity=s, epochs=6, batch=128)

    # Callbacks: add UpdatePruningStep only if TF-MOT wrappers are present
    es = keras.callbacks.EarlyStopping(monitor="val_accuracy", patience=2,
                                       restore_best_weights=True, mode="max", verbose=1)
    cbs = [es]
    if getattr(pruned, "_uses_prune_wrapper", False):
        cbs = [tfmot.sparsity.keras.UpdatePruningStep(), es]

    # Short fine-tune to recover accuracy
    pruned.fit(x_train, y_train,
               validation_split=0.10,
               epochs=6, batch_size=128,
               callbacks=cbs, verbose=2)

    keras_acc = pruned.evaluate(x_test, y_test, verbose=0)[1]

    # Convert to dynamic-range TFLite (no SavedModel artifacts)
    tfl = f"s_mnist_q4_pruned_s{str(s).replace('.','')}.tflite"
    to_tflite_dynamic_no_artifacts(pruned, tfl)

    # Evaluate TFLite accuracy (CPU, fixed batch)
    tfl_acc = tflite_acc_fixed_batch(tfl, x_test, y_test, batch=256)

    # Sizes
    size_mb = mb(os.path.getsize(tfl))
    gz_path = gz(tfl)
    size_gz_mb = mb(os.path.getsize(gz_path))

    results.append({
        "tag": f"s{str(s).replace('.','')}",
        "sparsity": float(s),
        "keras_acc": float(keras_acc),
        "tflite_acc": float(tfl_acc),
        "tflite": tfl,
        "gzip": gz_path,
        "size_mb": size_mb,
        "size_gz_mb": size_gz_mb
    })

    # free memory between runs
    free_all(pruned)

# ---- summary / what to submit ----
print("\n=== Q4 Summary (Magnitude Pruning + Dynamic-Range TFLite) ===")
print(f"{'Tag':<8} {'Sparsity':>9} {'Keras Acc':>10} {'TFLite Acc':>11} {'Size (MB)':>10} {'GZip (MB)':>10}")
print("-"*72)
for r in results:
    print(f"{r['tag']:<8} {r['sparsity']:>9.2f} {r['keras_acc']:>10.4f} {r['tflite_acc']:>11.4f} "
          f"{r['size_mb']:>10.2f} {r['size_gz_mb']:>10.2f}")

valid = [r for r in results if r["tflite_acc"] >= 0.90]
best  = min(valid, key=lambda r: r["size_gz_mb"]) if valid else min(results, key=lambda r: r["size_gz_mb"])

print("\nFILES TO SUBMIT FOR Q4:")
print(" • Notebook (.ipynb)")
print(f" • Best .tflite     → {best['tflite']} ({best['size_mb']:.2f} MB)")
print(f" • Best .tflite.gz  → {best['gzip']} ({best['size_gz_mb']:.2f} MB)")
print("\nBest config:", best)



[Q4] Running sparsity=0.50
[Info] TF-MOT pruning failed (ValueError: `prune_low_magnitude` can only prune an object of the following types: keras.models.Sequential, keras functional model, keras.layers.Layer, list of keras.layers.Layer. You passed an object of type: Functional.); using mask-based fallback.
Epoch 1/6
194/194 - 15s - 77ms/step - accuracy: 0.9963 - loss: 0.0260 - val_accuracy: 0.9166 - val_loss: 0.2440
Epoch 2/6
194/194 - 1s - 6ms/step - accuracy: 0.9970 - loss: 0.0165 - val_accuracy: 0.9982 - val_loss: 0.0084
Epoch 3/6
194/194 - 1s - 6ms/step - accuracy: 0.9996 - loss: 0.0053 - val_accuracy: 0.9993 - val_loss: 0.0041
Epoch 4/6
194/194 - 1s - 6ms/step - accuracy: 0.9985 - loss: 0.0075 - val_accuracy: 0.9727 - val_loss: 0.0834
Epoch 5/6
194/194 - 1s - 6ms/step - accuracy: 0.9979 - loss: 0.0103 - val_accuracy: 0.9949 - val_loss: 0.0189
Epoch 5: early stopping
Restoring model weights from the end of the best epoch: 3.


    TF 2.20. Please use the LiteRT interpreter from the ai_edge_litert package.
    See the [migration guide](https://ai.google.dev/edge/litert/migration)
    for details.
    



[Q4] Running sparsity=0.75
[Info] TF-MOT pruning failed (ValueError: `prune_low_magnitude` can only prune an object of the following types: keras.models.Sequential, keras functional model, keras.layers.Layer, list of keras.layers.Layer. You passed an object of type: Functional.); using mask-based fallback.
Epoch 1/6
194/194 - 15s - 78ms/step - accuracy: 0.9934 - loss: 0.0717 - val_accuracy: 0.9898 - val_loss: 0.0524
Epoch 2/6
194/194 - 1s - 6ms/step - accuracy: 0.9972 - loss: 0.0227 - val_accuracy: 1.0000 - val_loss: 0.0057
Epoch 3/6
194/194 - 1s - 6ms/step - accuracy: 0.9963 - loss: 0.0218 - val_accuracy: 0.9111 - val_loss: 0.3225
Epoch 4/6
194/194 - 1s - 6ms/step - accuracy: 0.9981 - loss: 0.0120 - val_accuracy: 1.0000 - val_loss: 1.9670e-04
Epoch 4: early stopping
Restoring model weights from the end of the best epoch: 2.





[Q4] Running sparsity=0.90
[Info] TF-MOT pruning failed (ValueError: `prune_low_magnitude` can only prune an object of the following types: keras.models.Sequential, keras functional model, keras.layers.Layer, list of keras.layers.Layer. You passed an object of type: Functional.); using mask-based fallback.
Epoch 1/6
194/194 - 15s - 77ms/step - accuracy: 0.9666 - loss: 0.3051 - val_accuracy: 0.9978 - val_loss: 0.0554
Epoch 2/6
194/194 - 1s - 6ms/step - accuracy: 0.9984 - loss: 0.0394 - val_accuracy: 0.9978 - val_loss: 0.0161
Epoch 3/6
194/194 - 1s - 6ms/step - accuracy: 0.9994 - loss: 0.0165 - val_accuracy: 0.9825 - val_loss: 0.0689
Epoch 3: early stopping
Restoring model weights from the end of the best epoch: 1.





=== Q4 Summary (Magnitude Pruning + Dynamic-Range TFLite) ===
Tag       Sparsity  Keras Acc  TFLite Acc  Size (MB)  GZip (MB)
------------------------------------------------------------------------
s05           0.50     0.9939      0.9941       0.29       0.26
s075          0.75     0.9953      0.9955       0.29       0.25
s09           0.90     0.9862      0.9863       0.29       0.25

FILES TO SUBMIT FOR Q4:
 • Notebook (.ipynb)
 • Best .tflite     → s_mnist_q4_pruned_s09.tflite (0.29 MB)
 • Best .tflite.gz  → s_mnist_q4_pruned_s09.tflite.gz (0.25 MB)

Best config: {'tag': 's09', 'sparsity': 0.9, 'keras_acc': 0.9861963391304016, 'tflite_acc': 0.9863357501394311, 'tflite': 's_mnist_q4_pruned_s09.tflite', 'gzip': 's_mnist_q4_pruned_s09.tflite.gz', 'size_mb': 0.2884368896484375, 'size_gz_mb': 0.24576187133789062}


## Isa's Write Up

##### This question ended up taking **over seven hours** to complete due to persistent runtime crashes, TensorFlow pruning incompatibilities, and memory issues in Colab. The initial implementation repeatedly failed because TensorFlow Model Optimization (TF-MOT) rejected the model type (“Functional” instead of 'Sequential') and Colab kept running out of RAM during TFLite conversion, causing the kernel to restart. After a lot of debugging, the final working solution used a **safe pruning fallback** that manually zeroed small-magnitude weights when TF-MOT failed, and a **lightweight TFLite conversion** method that avoided exporting a SavedModel to stop the “Endpoint/Captures” memory overload. We also freed memory aggressively between runs and used a fixed-batch CPU interpreter for evaluation. These changes finally stabilized the notebook and produced successful pruning and quantization results. Despite the frustration and wasted time, the final outcome achieved **over 98% accuracy** while compressing the model to **under 0.3 MB**, which made the effort worth it.
