# TensorFlow pattern-centric fossil pipeline (Geo Fossils-I)
- Preprocess + mask fossils, compute classical texture features (154-dim)
- TF model: EfficientNetB0 with concatenated texture vector
- Train/val/test on Geo Fossils-I; confusion matrix + Grad-CAM

Figures saved to `figures_tf/`:
- `cm_tf.png`: confusion matrix
- `gradcam_tf.png`: Grad-CAM overlay


In [4]:
import json
import math
import os
import random
from pathlib import Path
from typing import Dict, List

import cv2
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import tensorflow as tf
from sklearn.metrics import ConfusionMatrixDisplay, accuracy_score, confusion_matrix, f1_score
from sklearn.model_selection import train_test_split
from skimage.feature import graycomatrix, graycoprops, local_binary_pattern
from skimage.filters import gabor

tf.get_logger().setLevel("ERROR")
print("TensorFlow version:", tf.__version__)

class Config:
    def __init__(self, root: Path, img_size: int = 224, batch_size: int = 32, epochs: int = 25):
        self.root = Path(root)
        self.img_size = img_size
        self.batch_size = batch_size
        self.epochs = epochs
        self.figdir = Path("figures_tf")
        self.figdir.mkdir(parents=True, exist_ok=True)
        self.device = "cuda" if tf.config.list_physical_devices("GPU") else "cpu"
        print("Using device:", self.device)

cfg = Config(root=Path("geo fossil I"), batch_size=32, epochs=20)


ImportError: DLL load failed while importing cv2: The specified module could not be found.

In [None]:
# Utility functions
def set_seed(seed: int = 13):
    random.seed(seed)
    np.random.seed(seed)
    tf.random.set_seed(seed)

def list_images_by_class(root: Path):
    manifest = []
    classes = sorted([d for d in root.iterdir() if d.is_dir()])
    class_to_idx = {cls.name: i for i, cls in enumerate(classes)}
    for cls in classes:
        for img_path in sorted(cls.glob("*.jpg")):
            manifest.append({"path": str(img_path), "label": class_to_idx[cls.name], "classname": cls.name})
    return manifest, classes

def stratified_split(manifest: List[Dict], train_ratio=0.7, val_ratio=0.15):
    df = pd.DataFrame(manifest)
    train_df, temp_df = train_test_split(df, test_size=1 - train_ratio, stratify=df["label"], random_state=42)
    rel_val = val_ratio / (1 - train_ratio)
    val_df, test_df = train_test_split(temp_df, test_size=1 - rel_val, stratify=temp_df["label"], random_state=99)
    return train_df.to_dict("records"), val_df.to_dict("records"), test_df.to_dict("records")

def resize_and_pad(img: np.ndarray, size: int = 224) -> np.ndarray:
    h, w = img.shape[:2]
    scale = size / min(h, w)
    new_h, new_w = int(round(h * scale)), int(round(w * scale))
    resized = cv2.resize(img, (new_w, new_h), interpolation=cv2.INTER_AREA)
    pad_vert = max(size - new_h, 0)
    pad_h1 = pad_vert // 2; pad_h2 = pad_vert - pad_h1
    pad_horiz = max(size - new_w, 0)
    pad_w1 = pad_horiz // 2; pad_w2 = pad_horiz - pad_w1
    padded = cv2.copyMakeBorder(resized, pad_h1, pad_h2, pad_w1, pad_w2, cv2.BORDER_CONSTANT, value=0)
    return padded

def clean_mask(mask: np.ndarray, min_component: int = 50) -> np.ndarray:
    mask = (mask > 0).astype(np.uint8)
    kernel = np.ones((3, 3), np.uint8)
    mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel, iterations=1)
    mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel, iterations=2)
    num, labels, stats, _ = cv2.connectedComponentsWithStats(mask, connectivity=8)
    if num <= 1:
        return mask
    largest = 1 + np.argmax(stats[1:, cv2.CC_STAT_AREA])
    clean = (labels == largest).astype(np.uint8)
    if stats[largest, cv2.CC_STAT_AREA] < min_component:
        return np.zeros_like(mask, dtype=np.uint8)
    return clean

def compute_mask(gray: np.ndarray) -> np.ndarray:
    _, thresh = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
    mask = clean_mask(thresh)
    return mask


In [None]:
# Texture features (154 dims)
def edge_features(gray, mask):
    edges = cv2.Canny(gray, 100, 200) * mask
    coords = np.column_stack(np.nonzero(edges))
    if len(coords) == 0:
        return np.zeros(20, dtype=np.float32)
    sobelx = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=3)
    sobely = cv2.Sobel(gray, cv2.CV_64F, 0, 1, ksize=3)
    angles = (np.arctan2(sobely, sobelx) + np.pi) * (180.0 / np.pi)
    ang_masked = angles[edges > 0]
    hist, _ = np.histogram(ang_masked, bins=18, range=(0, 180))
    edge_density = edges.sum() / mask.size
    mean_length = len(coords) / (np.count_nonzero(hist) + 1e-6)
    return np.concatenate(([edge_density, mean_length], hist.astype(np.float32)))

def glcm_features(gray, mask):
    masked = gray.copy(); masked[mask == 0] = 0
    levels = 32
    quant = (masked / (256 / levels)).astype(np.uint8)
    distances = [1, 2]; angles = [0, np.pi / 4, np.pi / 2, 3 * np.pi / 4]
    glcm = graycomatrix(quant, distances=distances, angles=angles, levels=levels, symmetric=True, normed=True)
    feats = []
    for prop in ("contrast", "homogeneity", "energy", "correlation"):
        vals = graycoprops(glcm, prop).flatten(); feats.append(vals)
    return np.concatenate(feats).astype(np.float32)

def lbp_features(gray, mask):
    lbp = local_binary_pattern(gray, P=8, R=1, method="uniform")
    lbp_masked = lbp[mask > 0]
    hist, _ = np.histogram(lbp_masked, bins=np.arange(0, 60), range=(0, 59), density=True)
    return hist.astype(np.float32)

def fft_radial_energy(gray, mask, bands: int = 16):
    f = np.fft.fft2(gray * mask)
    fshift = np.fft.fftshift(f)
    magnitude = np.abs(fshift)
    h, w = magnitude.shape
    y, x = np.indices((h, w))
    center = np.array([(h - 1) / 2.0, (w - 1) / 2.0])
    r = np.sqrt((x - center[1]) ** 2 + (y - center[0]) ** 2)
    r_norm = r / r.max()
    hist, _ = np.histogram(r_norm, bins=bands, range=(0.0, 1.0), weights=magnitude)
    hist = hist / (hist.sum() + 1e-8)
    return hist.astype(np.float32)

def gabor_features(gray, mask):
    orientations = np.linspace(0, np.pi, 5, endpoint=False)
    wavelengths = [2, 4, 8, 16]
    feats = []
    masked_gray = gray * (mask > 0)
    for theta in orientations:
        for wl in wavelengths:
            real, imag = gabor(masked_gray, frequency=1.0 / wl, theta=theta)
            mag = np.sqrt(real**2 + imag**2)
            resp = mag[mask > 0].mean() if mask.sum() > 0 else mag.mean()
            feats.append(resp)
    return np.array(feats, dtype=np.float32)

def spiral_curvature(mask):
    contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    if not contours:
        return np.zeros(4, dtype=np.float32)
    cnt = max(contours, key=cv2.contourArea)
    if len(cnt) < 5:
        return np.zeros(4, dtype=np.float32)
    pts = cnt[:, 0, :].astype(np.float32)
    dx = np.gradient(pts[:, 0]); dy = np.gradient(pts[:, 1]); ddx = np.gradient(dx); ddy = np.gradient(dy)
    curvature = np.abs(dx * ddy - dy * ddx) / (np.power(dx**2 + dy**2, 1.5) + 1e-6)
    curv_valid = curvature[np.isfinite(curvature)]
    if len(curv_valid) == 0:
        return np.zeros(4, dtype=np.float32)
    mean_c = curv_valid.mean(); std_c = curv_valid.std(); p95_c = np.percentile(curv_valid, 95)
    axes = (1.0, 1.0)
    if len(cnt) >= 5:
        ellipse = cv2.fitEllipse(cnt); axes = ellipse[1]
    major = max(axes); minor = min(axes) + 1e-6; pitch_ratio = major / minor
    return np.array([mean_c, std_c, p95_c, pitch_ratio], dtype=np.float32)

def shape_features(mask):
    area = float(mask.sum())
    contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    perimeter = float(cv2.arcLength(contours[0], True)) if contours else 0.0
    roundness = 4 * math.pi * area / (perimeter**2 + 1e-6) if perimeter > 0 else 0.0
    return np.array([area, perimeter, roundness], dtype=np.float32)

def compute_texture_features(gray, mask):
    mask = (mask > 0).astype(np.uint8)
    if mask.sum() == 0:
        return np.zeros(154, dtype=np.float32)
    feats = [
        edge_features(gray, mask),
        glcm_features(gray, mask),
        lbp_features(gray, mask),
        fft_radial_energy(gray, mask),
        gabor_features(gray, mask),
        spiral_curvature(mask),
        shape_features(mask),
    ]
    return np.concatenate(feats).astype(np.float32)


In [None]:
# tf.data preprocessing wrapper
def preprocess_np(path, label, img_size):
    # path comes from tf.numpy_function as a NumPy scalar (bytes)
    # make it a normal Python string
    if isinstance(path, np.ndarray):
        path = path.item()          # get scalar
    path = path.decode("utf-8")     # bytes -> str

    # img_size is a NumPy scalar, ensure it's a Python int
    if isinstance(img_size, np.ndarray):
        img_size = int(img_size.item())
    else:
        img_size = int(img_size)

    # --- your existing logic ---
    img = cv2.cvtColor(cv2.imread(path), cv2.COLOR_BGR2RGB)
    img = resize_and_pad(img, img_size)
    gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
    mask = compute_mask(gray)
    texture = compute_texture_features(gray, mask)

    # normalize image
    img = img.astype(np.float32) / 255.0
    img = (img - np.array([0.485, 0.456, 0.406], dtype=np.float32)) / \
          np.array([0.229, 0.224, 0.225], dtype=np.float32)

    # ensure texture is float32 to match Tout=tf.float32
    texture = texture.astype(np.float32)

    # label as int32 to match Tout=tf.int32
    return img, np.int32(label), texture

def tf_preprocess(img_path, label, img_size=224):
    img, lbl, tex = tf.numpy_function(
        func=preprocess_np,
        inp=[img_path, label, tf.constant(img_size, dtype=tf.int64)],
        Tout=[tf.float32, tf.int32, tf.float32],
    )

    # Static shapes for tf.data / model
    img.set_shape((img_size, img_size, 3))
    tex.set_shape((154,))   # as you specified
    lbl.set_shape(())

    return (img, tex), lbl

def make_dataset(records, batch_size=32, shuffle=True, img_size=224):
    paths = [r["path"] for r in records]; labels = [r["label"] for r in records]
    ds = tf.data.Dataset.from_tensor_slices((paths, labels))
    if shuffle:
        ds = ds.shuffle(buffer_size=len(paths), seed=42, reshuffle_each_iteration=True)
    ds = ds.map(lambda p, l: tf_preprocess(p, l, img_size), num_parallel_calls=tf.data.AUTOTUNE)
    ds = ds.batch(batch_size).prefetch(tf.data.AUTOTUNE)
    return ds


In [None]:
# Model and plotting
def build_model(num_classes: int, use_texture: bool = True):
    img_in = tf.keras.Input(shape=(cfg.img_size, cfg.img_size, 3), name="image")
    tex_in = tf.keras.Input(shape=(154,), name="texture")
    base = tf.keras.applications.EfficientNetB0(include_top=False, weights="imagenet", pooling="avg")
    x = base(img_in)
    if use_texture:
        x = tf.keras.layers.Concatenate()([x, tex_in])
    x = tf.keras.layers.Dense(256, activation="relu")(x)
    x = tf.keras.layers.Dropout(0.3)(x)
    out = tf.keras.layers.Dense(num_classes, activation="softmax")(x)
    model = tf.keras.Model(inputs=[img_in, tex_in], outputs=out)
    return model

def plot_confusion(cm, classes, figpath: Path, title: str):
    fig, ax = plt.subplots(figsize=(6, 5))
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=classes)
    disp.plot(ax=ax, cmap="Blues", colorbar=False)
    ax.set_title(title)
    fig.tight_layout(); fig.savefig(figpath, dpi=200); plt.close(fig)

def grad_cam_tf(model, image, label_idx, layer_name=None):
    if layer_name is None:
        conv_layers = [l.name for l in model.layers if isinstance(l, tf.keras.layers.Conv2D)]
        layer_name = conv_layers[-1]
    grad_model = tf.keras.Model([model.inputs], [model.get_layer(layer_name).output, model.output])
    with tf.GradientTape() as tape:
        conv_out, preds = grad_model([tf.expand_dims(image[0], 0), tf.expand_dims(image[1], 0)])
        loss = preds[:, label_idx]
    grads = tape.gradient(loss, conv_out)[0]
    pooled = tf.reduce_mean(grads, axis=(0, 1, 2))
    conv_out = conv_out[0]
    cam = tf.reduce_sum(tf.multiply(pooled, conv_out), axis=-1)
    cam = tf.maximum(cam, 0)
    cam = cam / (tf.reduce_max(cam) + 1e-8)
    cam = tf.image.resize(cam[..., None], (cfg.img_size, cfg.img_size)).numpy().squeeze()
    return cam

def overlay_cam(img, cam, alpha=0.4):
    cam_color = cv2.applyColorMap(np.uint8(255 * cam), cv2.COLORMAP_JET)
    img_uint8 = np.clip((img * np.array([0.229, 0.224, 0.225]) + np.array([0.485, 0.456, 0.406])) * 255, 0, 255).astype(np.uint8)
    overlay = cv2.addWeighted(img_uint8, 1 - alpha, cam_color, alpha, 0)
    return overlay


In [None]:
# Run pipeline on Geo Fossils-I
set_seed(13)
manifest, classes = list_images_by_class(cfg.root)
train_rec, val_rec, test_rec = stratified_split(manifest)
with open("splits_tf.json", "w") as f:
    json.dump({k: [r["path"] for r in v] for k, v in zip(["train", "val", "test"], [train_rec, val_rec, test_rec])}, f, indent=2)

train_ds = make_dataset(train_rec, batch_size=cfg.batch_size, shuffle=True, img_size=cfg.img_size)
val_ds = make_dataset(val_rec, batch_size=cfg.batch_size, shuffle=False, img_size=cfg.img_size)
test_ds = make_dataset(test_rec, batch_size=cfg.batch_size, shuffle=False, img_size=cfg.img_size)

model = build_model(num_classes=len(classes), use_texture=True)
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4), loss="sparse_categorical_crossentropy", metrics=["accuracy"])
es = tf.keras.callbacks.EarlyStopping(patience=5, restore_best_weights=True, monitor="val_accuracy")
history = model.fit(train_ds, validation_data=val_ds, epochs=cfg.epochs, callbacks=[es])

# Evaluate
y_true, y_pred = [], []
for (imgs, tex), labels in test_ds:
    preds = model.predict((imgs, tex), verbose=0)
    y_true.extend(labels.numpy().tolist())
    y_pred.extend(np.argmax(preds, axis=1).tolist())
cm = confusion_matrix(y_true, y_pred)
plot_confusion(cm, [c.name for c in classes], cfg.figdir / "cm_tf.png", "TF Confusion (Geo Fossils-I)")
print("Test accuracy:", accuracy_score(y_true, y_pred), "macro-F1:", f1_score(y_true, y_pred, average="macro"))

# Grad-CAM on one test sample
sample = next(iter(test_ds))
img0 = sample[0][0][0].numpy(); tex0 = sample[0][1][0].numpy()
pred_idx = int(np.argmax(model.predict((img0[None], tex0[None]), verbose=0)))
cam = grad_cam_tf(model, (img0, tex0), label_idx=pred_idx)
overlay = overlay_cam(img0, cam)
plt.imsave(cfg.figdir / "gradcam_tf.png", overlay)
print("Saved figures:", list(cfg.figdir.glob("*.png")))


## Results figures to cite
- `cm_tf.png`: confusion matrix on Geo Fossils-I (TF pipeline)
- `gradcam_tf.png`: Grad-CAM overlay showing salient texture cues