Alan MH Beem

**Reflection on Pipeline**

The challenge of getting this to run was rewarding, though the issue is not really 100% fixed, as the imports took longer than I was willing to wait.

The world may not need another look at these penguins, though I will follow up with further pruning to the DenseNet model to minimize 100% accuracy, and / but, this is not the purpose of the tutorial. The purpose of the tutorial, and re. the target audience, seem to be displaying the capabilities of Tensorflow Extended (tfx) to an ML engineer or production-minded practitioner where the focus is how one would structure a production ML workflow using tfx.

For example, a dedicated metadata filesystem is unnecessary for a single dataset, single tuning, "pipeline". The metadata involved is relatively static and limited in scope, and, in this notebook, a lightweight configuration file or explicit code-based specification may be sufficient. However, where repeated executions, provenance tracking, auditability, and artifact lineage become important, a structured metadata store provides clear advantages for reproducibility, caching, and orchestration. Thus, while the additional infrastructure may seem excessive for small-scale or educational use, it becomes more justified as system complexity and operational requirements increase.

For production, it may be that many users want to access predictions from the set of current best models that the group has, or particular models, or, to route models through the production environment as is optimal w.r.t. cost. The revised pipeline below does all of that and is built to imitate the functionality of tfx. After completing this assignment using AI to generate most of the code, which I do find to be compatible with following along a tutorial-- in a sense, I'm thinking that a simple database would go the distance that I'd need for keeping track of many variations of models in a project, and comparing runs- it actually would have been quite convenient in my 410 projects, which had a network of Colab notebooks, but that might require a proper database solution. Another option would be a folder of folders each containing a simple JSON file describing the model hyperparameters and data used.


**Specific Deliverables**

1) Complete the tutorial

2) Description of pipeline in tutorial:
This pipeline ingests data, uses a user-provided function to train a model, pushes the model to a filesystem location == database destination (e.g. the database in the example below), runs the pipeline. The data is ingested in a normalized state, after preprocessing.

3) Tutorial pipeline improvements:
Data validation, feature engineering (including preprocessing into [0, 1] as is provided at the start in the tutorial, with unknown data leakage), and adding model analysis, such as visualization of model training, and comparisons of model performance. I'd like the parameters of the models to be more flexible (currently, no user-provided paramters make it to the models), but as-is, and as tfx seems to be, one could serve a point-and-click ML solution using tfx.




The below generated using ChatGPT

In [1]:
# =========================================================
# TFX-lite Notebook Pipeline Infrastructure (single cell)
# - One SQLite DB, multiple tables:
#   feature_sets, pipeline_specs, job_queue, runs, artifacts, metrics
# - User provides config in a later cell by calling:
#   register_feature_set(...), register_pipeline_spec(...), enqueue_job(...)
# - Execution:
#   process_next_job()  (or execute_pipeline("spec_name") directly)
# - Caching:
#   Reuses prior run if (spec + hyperparams + data_fingerprint) matches
# - Artifacts:
#   artifacts/run_<run_id>/ (model + metrics + training_summary.png)
# =========================================================

from __future__ import annotations

import json, os, sqlite3, hashlib
from pathlib import Path
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import tensorflow as tf
from tensorflow import keras

from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix


# Database                                                                                              ###
class Store:
    def __init__(self, db_path: str = "metadata.db"):
        self.db_path = db_path
        self.conn = sqlite3.connect(db_path)
        self.conn.execute("PRAGMA foreign_keys=ON;")
        self._init_schema()

    def _init_schema(self) -> None:
        self.conn.executescript("""
        CREATE TABLE IF NOT EXISTS feature_sets(
            name TEXT PRIMARY KEY,
            label TEXT NOT NULL,
            numeric_json TEXT NOT NULL,
            categorical_json TEXT NOT NULL,
            created_at TEXT NOT NULL
        );

        CREATE TABLE IF NOT EXISTS pipeline_specs(
            name TEXT PRIMARY KEY,
            csv TEXT NOT NULL,
            feature_set TEXT NOT NULL,
            model_type TEXT NOT NULL,
            hyperparams_json TEXT NOT NULL,
            created_at TEXT NOT NULL,
            FOREIGN KEY(feature_set) REFERENCES feature_sets(name)
        );

        CREATE TABLE IF NOT EXISTS job_queue(
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            pipeline_name TEXT NOT NULL,
            status TEXT NOT NULL, -- queued/running/done/failed
            resources_json TEXT NOT NULL,
            created_at TEXT NOT NULL,
            started_at TEXT,
            finished_at TEXT,
            error TEXT,
            FOREIGN KEY(pipeline_name) REFERENCES pipeline_specs(name)
        );

        CREATE TABLE IF NOT EXISTS runs(
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            pipeline_name TEXT NOT NULL,
            timestamp TEXT NOT NULL,
            params_json TEXT NOT NULL,
            data_fingerprint TEXT NOT NULL,
            cache_key TEXT NOT NULL UNIQUE,
            status TEXT NOT NULL, -- done/failed
            FOREIGN KEY(pipeline_name) REFERENCES pipeline_specs(name)
        );

        CREATE TABLE IF NOT EXISTS artifacts(
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            run_id INTEGER NOT NULL,
            name TEXT NOT NULL,
            path TEXT NOT NULL,
            type TEXT NOT NULL,
            FOREIGN KEY(run_id) REFERENCES runs(id)
        );

        CREATE TABLE IF NOT EXISTS metrics(
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            run_id INTEGER NOT NULL,
            key TEXT NOT NULL,
            value REAL NOT NULL,
            FOREIGN KEY(run_id) REFERENCES runs(id)
        );
        """)
        self.conn.commit()

    # ---------- helpers ----------
    @staticmethod
    def _now() -> str:
        return datetime.utcnow().isoformat()

    @staticmethod
    def _json(obj: Any) -> str:
        return json.dumps(obj, sort_keys=True, separators=(",", ":"))

    @staticmethod
    def _sha256_bytes(b: bytes) -> str:
        return hashlib.sha256(b).hexdigest()

    # ---------- register config ----------
    def register_feature_set(self, name: str, label: str, numeric: List[str], categorical: List[str]) -> None:
        self.conn.execute(
            "INSERT OR REPLACE INTO feature_sets(name,label,numeric_json,categorical_json,created_at) VALUES (?,?,?,?,?)",
            (name, label, self._json(numeric), self._json(categorical), self._now())
        )
        self.conn.commit()

    def register_pipeline_spec(self, name: str, csv: str, feature_set: str, model_type: str, hyperparams: Dict[str, Any]) -> None:
        self.conn.execute(
            "INSERT OR REPLACE INTO pipeline_specs(name,csv,feature_set,model_type,hyperparams_json,created_at) VALUES (?,?,?,?,?,?)",
            (name, csv, feature_set, model_type, self._json(hyperparams), self._now())
        )
        self.conn.commit()

    # ---------- read config ----------
    def get_feature_set(self, name: str) -> Dict[str, Any]:
        row = self.conn.execute(
            "SELECT label,numeric_json,categorical_json FROM feature_sets WHERE name=?",
            (name,)
        ).fetchone()
        if not row:
            raise KeyError(f"feature_set not found: {name}")
        label, numeric_json, categorical_json = row
        return {"label": label, "numeric": json.loads(numeric_json), "categorical": json.loads(categorical_json)}

    def get_pipeline_spec(self, name: str) -> Dict[str, Any]:
        row = self.conn.execute(
            "SELECT csv,feature_set,model_type,hyperparams_json FROM pipeline_specs WHERE name=?",
            (name,)
        ).fetchone()
        if not row:
            raise KeyError(f"pipeline_spec not found: {name}")
        csv, feature_set, model_type, hyperparams_json = row
        return {"name": name, "csv": csv, "feature_set": feature_set, "model_type": model_type, "hyperparams": json.loads(hyperparams_json)}

    # ---------- queue ----------
    def enqueue_job(self, pipeline_name: str, resources: Optional[Dict[str, Any]] = None) -> int:
        resources = resources or {"runner": "local", "device": "cpu"}
        cur = self.conn.execute(
            "INSERT INTO job_queue(pipeline_name,status,resources_json,created_at) VALUES (?,?,?,?)",
            (pipeline_name, "queued", self._json(resources), self._now())
        )
        self.conn.commit()
        return int(cur.lastrowid)

    def pop_next_job(self) -> Optional[Dict[str, Any]]:
        row = self.conn.execute(
            "SELECT id,pipeline_name,resources_json FROM job_queue WHERE status='queued' ORDER BY id ASC LIMIT 1"
        ).fetchone()
        if not row:
            return None
        job_id, pipeline_name, resources_json = row
        self.conn.execute(
            "UPDATE job_queue SET status='running', started_at=? WHERE id=?",
            (self._now(), job_id)
        )
        self.conn.commit()
        return {"id": int(job_id), "pipeline_name": pipeline_name, "resources": json.loads(resources_json)}

    def finish_job(self, job_id: int, status: str, error: Optional[str] = None) -> None:
        self.conn.execute(
            "UPDATE job_queue SET status=?, finished_at=?, error=? WHERE id=?",
            (status, self._now(), error, job_id)
        )
        self.conn.commit()

    # ---------- runs / caching ----------
    def find_cached_run(self, cache_key: str) -> Optional[int]:
        row = self.conn.execute("SELECT id FROM runs WHERE cache_key=? AND status='done'", (cache_key,)).fetchone()
        return int(row[0]) if row else None

    def create_run(self, pipeline_name: str, params: Dict[str, Any], data_fingerprint: str, cache_key: str) -> int:
        cur = self.conn.execute(
            "INSERT INTO runs(pipeline_name,timestamp,params_json,data_fingerprint,cache_key,status) VALUES (?,?,?,?,?,?)",
            (pipeline_name, self._now(), self._json(params), data_fingerprint, cache_key, "done")
        )
        self.conn.commit()
        return int(cur.lastrowid)

    def mark_run_failed(self, pipeline_name: str, params: Dict[str, Any], data_fingerprint: str, cache_key: str) -> int:
        cur = self.conn.execute(
            "INSERT OR REPLACE INTO runs(pipeline_name,timestamp,params_json,data_fingerprint,cache_key,status) VALUES (?,?,?,?,?,?)",
            (pipeline_name, self._now(), self._json(params), data_fingerprint, cache_key, "failed")
        )
        self.conn.commit()
        return int(cur.lastrowid)

    def log_artifact(self, run_id: int, name: str, path: str, typ: str) -> None:
        self.conn.execute(
            "INSERT INTO artifacts(run_id,name,path,type) VALUES (?,?,?,?)",
            (run_id, name, path, typ)
        )
        self.conn.commit()

    def log_metric(self, run_id: int, key: str, value: float) -> None:
        self.conn.execute(
            "INSERT INTO metrics(run_id,key,value) VALUES (?,?,?)",
            (run_id, key, float(value))
        )
        self.conn.commit()

    def best_runs(self, metric: str = "macro_f1", limit: int = 5) -> List[Tuple[int, float]]:
        rows = self.conn.execute(
            """
            SELECT r.id, m.value
            FROM runs r JOIN metrics m ON m.run_id=r.id
            WHERE r.status='done' AND m.key=?
            ORDER BY m.value DESC
            LIMIT ?
            """,
            (metric, limit)
        ).fetchall()
        return [(int(rid), float(val)) for rid, val in rows]

STORE = Store("metadata.db")  # single DB file
# end Database                                                                                          ###


# Model builders                                                                                        ###
def build_keras_dense(feature_keys: List[str], n_classes: int, lr: float = 1e-3) -> tf.keras.Model:
    inputs = [keras.layers.Input(shape=(1,), name=f) for f in feature_keys]
    d = keras.layers.concatenate(inputs)
    for _ in range(2):
        d = keras.layers.Dense(8, activation="relu")(d)
    # outputs = keras.layers.Dense(n_classes, activation="softmax")(d)
    outputs = keras.layers.Dense(n_classes)(d)

    model = keras.Model(inputs=inputs, outputs=outputs)
    model.compile(
        optimizer=keras.optimizers.Adam(lr),
        # loss=tf.keras.losses.SparseCategoricalCrossentropy(),
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=[keras.metrics.SparseCategoricalAccuracy(name="acc")]
    )
    return model

def build_keras_densenet(
    feature_keys: List[str],
    n_classes: int,
    lr: float = 1e-3,
    num_layers: int = 8,
    growth: int = 32,
    dropout: float = 0.0,
    use_bn: bool = True,
) -> tf.keras.Model:
    # Inputs: one scalar per feature key (same as your original)
    inputs = [keras.layers.Input(shape=(1,), name=f) for f in feature_keys]

    # Base "stem" tensor
    x0 = keras.layers.Concatenate(name="concat_inputs")(inputs)

    # Keep a running list of features to concatenate (DenseNet-style)
    features = [x0]

    for i in range(num_layers):
        x = keras.layers.Concatenate(name=f"dense_concat_{i}")(features)

        if use_bn:
            x = keras.layers.BatchNormalization(name=f"bn_{i}")(x)

        x = keras.layers.Dense(growth, activation="relu", name=f"dense_{i}")(x)

        if dropout and dropout > 0:
            x = keras.layers.Dropout(dropout, name=f"dropout_{i}")(x)

        # Add this layer's output to the feature list for future layers
        features.append(x)

    # Final classifier sees all accumulated features
    x = keras.layers.Concatenate(name="final_concat")(features)
    # outputs = keras.layers.Dense(n_classes, name="logits", activation="softmax")(x)
    outputs = keras.layers.Dense(n_classes, name="logits")(x)

    model = keras.Model(inputs=inputs, outputs=outputs, name="DenseNetMLP")
    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=lr),
        # loss=tf.keras.losses.SparseCategoricalCrossentropy(),
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=[keras.metrics.SparseCategoricalAccuracy(name="acc")],
    )
    return model

# model support function
def to_keras_dict(X: pd.DataFrame, feature_keys: List[str]) -> Dict[str, np.ndarray]:
    return {c: X[c].to_numpy(dtype=np.float32).reshape(-1, 1) for c in feature_keys}
# end Model Builders                                                                                    ###

# Plotting: loss | accuracy | confusion                                                                 ###
def plot_training_summary(history: keras.callbacks.History, cm: np.ndarray, labels: List[str], outpath: Path) -> None:
    fig, axes = plt.subplots(1, 3, figsize=(14, 4))

    # Loss
    axes[0].plot(history.history.get("loss", []), label="train")
    if "val_loss" in history.history:
        axes[0].plot(history.history["val_loss"], label="val")
    axes[0].set_title("Loss")
    axes[0].set_xlabel("Epoch")
    axes[0].legend()

    # Accuracy
    # Keras metric key can be "acc" or "sparse_categorical_accuracy"/"accuracy" depending on setup
    acc_key = "acc" if "acc" in history.history else ("accuracy" if "accuracy" in history.history else None)
    val_acc_key = "val_acc" if "val_acc" in history.history else ("val_accuracy" if "val_accuracy" in history.history else None)
    if acc_key:
        axes[1].plot(history.history.get(acc_key, []), label="train")
    if val_acc_key:
        axes[1].plot(history.history.get(val_acc_key, []), label="val")
    axes[1].set_title("Accuracy")
    axes[1].set_xlabel("Epoch")
    axes[1].legend()

    # Confusion matrix heatmap
    im = axes[2].imshow(cm)
    axes[2].set_title("Confusion Matrix")
    axes[2].set_xticks(range(len(labels)))
    axes[2].set_yticks(range(len(labels)))
    axes[2].set_xticklabels(labels, rotation=45, ha="right")
    axes[2].set_yticklabels(labels)
    for i in range(len(labels)):
        for j in range(len(labels)):
            axes[2].text(j, i, str(cm[i, j]), ha="center", va="center")

    fig.colorbar(im, ax=axes[2], fraction=0.046)
    fig.tight_layout()
    fig.savefig(outpath, dpi=140)
    plt.close(fig)
# end Plotting: loss | accuracy | confusion                                                             ###


# Core pipeline execution                                                                               ###
ART_ROOT = Path("artifacts")
ART_ROOT.mkdir(exist_ok=True)

def _data_fingerprint(df: pd.DataFrame) -> str:
    # Stable content hash for caching (OK for small/medium data).
    # For huge data, hash the file bytes or an external dataset version id instead.
    h = pd.util.hash_pandas_object(df, index=True).values.tobytes()
    return hashlib.sha256(h).hexdigest()

def _preprocess(df: pd.DataFrame, label: str, numeric: List[str], categorical: List[str]) -> Tuple[pd.DataFrame, np.ndarray, List[str], Dict[str, Dict[str, int]]]:
    df = df.copy().replace(r"^\s*$", np.nan, regex=True)

    # Label -> id
    label_vocab = sorted(df[label].dropna().unique().tolist())
    label_to_id = {v: i for i, v in enumerate(label_vocab)}
    y = df[label].map(label_to_id).astype("int32").to_numpy()

    # Numeric: coerce + median impute
    for c in numeric:
        df[c] = pd.to_numeric(df[c], errors="coerce")
        df[c] = df[c].fillna(df[c].median()).astype("float32")

    # Categorical: vocab -> ids, reserve 0 for UNK
    cat_maps: Dict[str, Dict[str, int]] = {}
    for c in categorical:
        vocab = sorted(df[c].dropna().astype(str).unique().tolist())
        m = {v: (i + 1) for i, v in enumerate(vocab)}  # start at 1
        cat_maps[c] = m
        df[c] = df[c].astype("string").fillna("UNK").map(lambda x: m.get(str(x), 0)).astype("float32")

    features = numeric + categorical
    X = df[features]
    return X, y, label_vocab, cat_maps

def execute_pipeline(pipeline_name: str) -> Dict[str, Any]:
    spec = STORE.get_pipeline_spec(pipeline_name)
    fs = STORE.get_feature_set(spec["feature_set"])

    label = fs["label"]
    numeric = fs["numeric"]
    categorical = fs["categorical"]
    features = numeric + categorical

    # Load
    df = pd.read_csv(spec["csv"])
    df = df[[label] + features]

    # Fingerprint + cache key
    data_fp = _data_fingerprint(df)

    hyper = spec["hyperparams"]
    # Defaults if omitted
    seed = int(hyper.get("seed", 0))
    epochs = int(hyper.get("epochs", 30))
    batch_size = int(hyper.get("batch_size", 32))
    lr = float(hyper.get("lr", 1e-3))
    test_size = float(hyper.get("test_size", 0.2))

    params = {
        "pipeline_name": pipeline_name,
        "csv": spec["csv"],
        "feature_set": spec["feature_set"],
        "model_type": spec["model_type"],
        "hyperparams": {"seed": seed, "epochs": epochs, "batch_size": batch_size, "lr": lr, "test_size": test_size},
        "features": {"label": label, "numeric": numeric, "categorical": categorical},
        "data_fingerprint": data_fp,
    }
    cache_key = hashlib.sha256(Store._json(params).encode("utf-8")).hexdigest()

    cached_run_id = STORE.find_cached_run(cache_key)
    if cached_run_id is not None:
        run_dir = ART_ROOT / f"run_{cached_run_id}"
        return {
            "status": "cached",
            "run_id": cached_run_id,
            "run_dir": str(run_dir),
            "message": f"Using cached run_{cached_run_id} (cache_key match)."
        }

    # Preprocess + split
    X, y, label_vocab, cat_maps = _preprocess(df, label, numeric, categorical)

    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=test_size, random_state=seed, stratify=y
    )

    # Train
    try:
        if spec["model_type"] == "keras_dense_2x8":
            model = build_keras_dense(features, n_classes=len(label_vocab), lr=lr)
        elif spec["model_type"] == "keras_densenet":
            model = build_keras_densenet(features, n_classes=len(label_vocab), lr=lr)
        else:
            raise ValueError(f"Unsupported model_type: {spec['model_type']}")

        history = model.fit(
            to_keras_dict(X_train, features), y_train,
            validation_data=(to_keras_dict(X_test, features), y_test),
            epochs=epochs,
            batch_size=batch_size,
            verbose=0,
        )

        # Eval
        logits = model.predict(to_keras_dict(X_test, features), verbose=0)
        y_pred = logits.argmax(axis=1).astype("int32")

        report = classification_report(y_test, y_pred, output_dict=True, zero_division=0)
        cm = confusion_matrix(y_test, y_pred)
        macro_f1 = float(report["macro avg"]["f1-score"])
        acc = float(report["accuracy"])

        # Persist run + artifacts
        run_id = STORE.create_run(pipeline_name, params=params, data_fingerprint=data_fp, cache_key=cache_key)
        run_dir = ART_ROOT / f"run_{run_id}"
        run_dir.mkdir(exist_ok=True)

        # Save model
        model_path = run_dir / "model.keras"
        model.save(model_path)

        # Save metrics + spec snapshot
        with open(run_dir / "pipeline_params.json", "w") as f:
            json.dump(params, f, indent=2)
        with open(run_dir / "metrics.json", "w") as f:
            json.dump({"report": report, "confusion_matrix": cm.tolist(), "macro_f1": macro_f1, "accuracy": acc}, f, indent=2)
        with open(run_dir / "label_vocab.json", "w") as f:
            json.dump(label_vocab, f, indent=2)
        with open(run_dir / "categorical_maps.json", "w") as f:
            json.dump({k: v for k, v in cat_maps.items()}, f, indent=2)

        # Plot training summary
        plot_path = run_dir / "training_summary.png"
        plot_training_summary(history, cm, label_vocab, plot_path)

        # Log artifacts/metrics
        STORE.log_artifact(run_id, "model", str(model_path), "keras")
        STORE.log_artifact(run_id, "training_summary", str(plot_path), "figure")
        STORE.log_artifact(run_id, "metrics", str(run_dir / "metrics.json"), "json")
        STORE.log_metric(run_id, "macro_f1", macro_f1)
        STORE.log_metric(run_id, "accuracy", acc)

        return {
            "status": "trained",
            "run_id": run_id,
            "run_dir": str(run_dir),
            "macro_f1": macro_f1,
            "accuracy": acc,
        }

    except Exception as e:
        failed_run_id = STORE.mark_run_failed(pipeline_name, params=params, data_fingerprint=data_fp, cache_key=cache_key)
        raise


def process_next_job() -> Optional[Dict[str, Any]]:
    job = STORE.pop_next_job()
    if not job:
        print("No queued jobs.")
        return None

    jid = job["id"]
    name = job["pipeline_name"]
    try:
        result = execute_pipeline(name)
        STORE.finish_job(jid, "done", None)
        print(f"Job {jid} done -> {result.get('status')} | run_{result.get('run_id')} | {result.get('run_dir')}")
        if result.get("status") == "trained":
            print(f"  macro_f1={result['macro_f1']:.4f} accuracy={result['accuracy']:.4f}")
        else:
            print(f"  {result.get('message','')}")
        return result
    except Exception as e:
        STORE.finish_job(jid, "failed", str(e))
        print(f"Job {jid} failed: {e}")
        return None
# end Core pipeline execution                                                                           ###

A user-defined use of the databse and model system

In [5]:
# Example "penguins" config (TFX tutorial-style)
STORE.register_feature_set(
    name="penguins_features_v1",
    label="species",
    numeric=["bill_length_mm","bill_depth_mm","flipper_length_mm","body_mass_g","year"],
    categorical=["island","sex"],
)
STORE.register_feature_set(
    name="penguins_features_labelled_tfx",
    label="species",
    numeric=["culmen_length_mm", "culmen_depth_mm", "flipper_length_mm", "body_mass_g"],
    categorical=[],
)

  return datetime.utcnow().isoformat()


In [6]:
STORE.register_pipeline_spec(
    name="penguins_keras_dense_lr=1e-3",
    csv='https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/penguin/data/labelled/penguins_processed.csv',
    feature_set="penguins_features_labelled_tfx",
    model_type="keras_dense_2x8",
    hyperparams={"seed": 1337, "epochs": 30, "batch_size": 16, "lr": 1e-3, "test_size": 0.2},
)

# Queue + run one job (comment these out if you only want to define config now)
job_id = STORE.enqueue_job("penguins_keras_dense_lr=1e-3", resources={"runner": "local", "device": "cpu"})
process_next_job()

# Optional: quick leaderboard
best = STORE.best_runs(metric="macro_f1", limit=5)
if best:
    print("\nTop runs by macro_f1:")
    for rid, score in best:
        print(f"  run_{rid}: {score:.4f}")


  return datetime.utcnow().isoformat()
  return datetime.utcnow().isoformat()


Job 3 done -> trained | run_1 | artifacts/run_1
  macro_f1=0.8530 accuracy=0.8955

Top runs by macro_f1:
  run_1: 0.8530


  return datetime.utcnow().isoformat()


In [7]:
STORE.register_pipeline_spec(
    name="penguins_keras_densenet",
    csv='https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/penguin/data/labelled/penguins_processed.csv',
    feature_set="penguins_features_labelled_tfx",
    model_type="keras_densenet",
    hyperparams={"seed": 1337, "epochs": 100, "batch_size": 8, "lr": 1e-5, "test_size": 0.2},
)

# Queue + run one job (comment these out if you only want to define config now)
job_id = STORE.enqueue_job("penguins_keras_densenet", resources={"runner": "local", "device": "cpu"})
process_next_job()

# quick leaderboard
best = STORE.best_runs(metric="macro_f1", limit=5)
if best:
    print("\nTop runs by macro_f1:")
    for rid, score in best:
        print(f"  run_{rid}: {score:.4f}")


  return datetime.utcnow().isoformat()


Job 4 done -> trained | run_2 | artifacts/run_2
  macro_f1=1.0000 accuracy=1.0000

Top runs by macro_f1:
  run_2: 1.0000
  run_1: 0.8530


  return datetime.utcnow().isoformat()
  return datetime.utcnow().isoformat()
