 **Task** 3: **Machine** **Learning** **Prediction** **bold text**& **Logging** **Pipeline**

This notebook implements the end-to-end pipeline for Task 3.
It connects to a FastAPI backend, retrieves the latest record, preprocesses input data, generates predictions using a trained model, and logs results to a database.

I built and tested two API routes:

GET /api/records/latest — fetches the most recent data entry from a CSV or default values.

POST /api/predictions — sends a prediction request and logs the output to SQLite.

The workflow includes:

Setting up a minimal model and FastAPI app.

Exposing endpoints using ngrok.

Fetching, cleaning, and handling missing data (fillna(0)).

Predicting and logging structured results.

Validating the end-to-end process with database inspection.

This forms my contribution for Task 3 in our team project  demonstrating the integration between data retrieval, machine learning prediction, and persistent logging in a production-style API setup.

In [48]:
# Start a fresh project folder
import os, shutil, pathlib
proj = "db-task3-pipeline"
if os.path.exists(proj):
    shutil.rmtree(proj)
os.makedirs(proj, exist_ok=True)
os.chdir(proj)
print("Working in", os.getcwd())

# Standard folders
os.makedirs("ml/artifacts", exist_ok=True)
os.makedirs("data", exist_ok=True)


Working in /content/db-task3-pipeline/db-task3-pipeline/db-task3-pipeline/db-task3-pipeline/db-task3-pipeline/db-task3-pipeline


In [49]:
!pip -q install fastapi uvicorn pyngrok "sqlalchemy<2.0" pydantic joblib scikit-learn pandas numpy requests


In [50]:
#  upload a CSV to use as the "latest" record source
try:
    from google.colab import files
    up = files.upload()
    import shutil
    for name in up.keys():
        shutil.move(name, f"data/{name}")
    print("Uploaded:", list(up.keys()))
except Exception:
    print("No dataset uploaded (that's fine for now).")


Saving archive (1).zip to archive (1).zip
Uploaded: ['archive (1).zip']


In [51]:
import pandas as pd
from sklearn.dummy import DummyRegressor
from joblib import dump
import os

# Train a minimal model so end-to-end runs today
X = pd.DataFrame(
    [[3.5, 2.8, 0.0],
     [2.0, 1.0, 1.0],
     [5.0, 3.0, 2.0]],
    columns=["feature_1","feature_2","feature_3"]
)
y = [42, 35, 60]

model = DummyRegressor(strategy="mean").fit(X, y)
dump(model, "ml/artifacts/model.joblib")
print("Saved → ml/artifacts/model.joblib")


Saved → ml/artifacts/model.joblib


In [52]:
app_code = '''
from fastapi import FastAPI, Body
from pydantic import BaseModel
from typing import Optional, Dict, Any
from datetime import datetime
import os, json
import sqlite3

app = FastAPI(title="Task 3 API")

DB_PATH = "predictions.db"

def init_db():
    conn = sqlite3.connect(DB_PATH)
    cur = conn.cursor()
    cur.execute("""
        CREATE TABLE IF NOT EXISTS prediction_logs (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            record_id TEXT,
            prediction TEXT,
            probability TEXT,
            predicted_at TEXT,
            inputs_json TEXT,
            created_at TEXT
        )
    """)
    conn.commit()
    conn.close()

init_db()

class PredictionLog(BaseModel):
    record_id: Optional[str] = None
    prediction: Any
    probability: Any = None
    predicted_at: str
    inputs: Dict[str, Any]

def latest_from_dataset():
    for name in os.listdir("data"):
        if name.lower().endswith(".csv"):
            import pandas as pd
            df = pd.read_csv(os.path.join("data", name))
            if not df.empty:
                return df.tail(1).to_dict(orient="records")[0]
    return None

@app.get("/api/records/latest")
def get_latest():
    row = latest_from_dataset()
    if row is None:
        # DEFAULT IS NUMERIC to match the dummy model
        row = {"id": 1, "feature_1": 3.5, "feature_2": 2.8, "feature_3": 0.0}
    return row

@app.post("/api/predictions")
def log_prediction(payload: PredictionLog = Body(...)):
    conn = sqlite3.connect(DB_PATH)
    cur = conn.cursor()
    cur.execute(
        "INSERT INTO prediction_logs (record_id, prediction, probability, predicted_at, inputs_json, created_at) VALUES (?,?,?,?,?,?)",
        (
            str(payload.record_id) if payload.record_id is not None else None,
            json.dumps(payload.prediction),
            json.dumps(payload.probability),
            payload.predicted_at,
            json.dumps(payload.inputs),
            datetime.utcnow().isoformat()
        )
    )
    conn.commit()
    conn.close()
    return {"status": "ok", "logged": True}
'''
with open("main.py","w") as f:
    f.write(app_code)
print("Wrote main.py")


Wrote main.py


In [53]:
import subprocess, time, os
from pyngrok import ngrok, conf

#  Start API server
server = subprocess.Popen(["uvicorn","main:app","--host","0.0.0.0","--port","8000"])
time.sleep(2)


os.environ["NGROK_TOKEN"] = "34vE1IWptyz0D5dtgkAPIK35Mse_6isvdvSmPzGBk4uQZnZDg"
conf.get_default().auth_token = os.environ["NGROK_TOKEN"]

#  Create tunnel
public = ngrok.connect(8000, "http")
print("PUBLIC URL:", public.public_url)


PUBLIC URL: https://orderly-unideographic-maire.ngrok-free.dev


In [54]:
import requests
base = public.public_url
print("Docs:", base + "/docs")
print("Latest:", base + "/api/records/latest")

r = requests.get(base + "/api/records/latest")
print("Latest status:", r.status_code)
print("Latest body:", r.text)


Docs: https://orderly-unideographic-maire.ngrok-free.dev/docs
Latest: https://orderly-unideographic-maire.ngrok-free.dev/api/records/latest
Latest status: 200
Latest body: {"id":1,"feature_1":3.5,"feature_2":2.8,"feature_3":"Medium"}


In [55]:
# Use the public URL from Cell 7
BASE_URL = public.public_url
GET_LATEST_ENDPOINT = "/api/records/latest"
POST_LOG_ENDPOINT   = "/api/predictions"

MODEL_PATH = "ml/artifacts/model.joblib"               # replace with your real model later
FEATURE_COLUMNS = ["feature_1","feature_2","feature_3"]  # exact training order
CANDIDATE_ID_KEYS = ["id","_id","uuid","record_id"]
MOCK_MODE = False


In [56]:
import json, time
from typing import Optional, Dict, Any, List
from datetime import datetime, timezone
import requests, pandas as pd, numpy as np
from joblib import load

def _ts() -> str:
    return datetime.now(timezone.utc).isoformat()

def api_get(path: str, params: Optional[Dict[str, Any]] = None, timeout: int = 20) -> Any:
    url = BASE_URL.rstrip("/") + path
    r = requests.get(url, params=params or {}, timeout=timeout)
    r.raise_for_status()
    return r.json()

def api_post(path: str, payload: Dict[str, Any], timeout: int = 20) -> Any:
    url = BASE_URL.rstrip("/") + path
    r = requests.post(url, json=payload, timeout=timeout)
    r.raise_for_status()
    return r.json()

def log_local(msg: str, **kv):
    try:
        extra = "" if not kv else " :: " + json.dumps(kv, default=str)
    except Exception:
        extra = ""
    print(f"[{_ts()}] {msg}{extra}")


In [57]:
if MOCK_MODE:
    latest_row = {"id": 1, "feature_1": 3.5, "feature_2": 2.8, "feature_3": 0.0}
    log_local("Using MOCK latest_row", keys=list(latest_row.keys()))
else:
    latest_payload = api_get(GET_LATEST_ENDPOINT)
    if isinstance(latest_payload, dict) and "data" in latest_payload:
        latest_row = latest_payload["data"]
    elif isinstance(latest_payload, list) and len(latest_payload) > 0:
        latest_row = latest_payload[0]
    elif isinstance(latest_payload, dict):
        latest_row = latest_payload
    else:
        raise ValueError("Unexpected latest payload shape")
    log_local("Fetched latest record", keys=list(latest_row.keys())[:10])

latest_row


[2025-11-02T15:45:50.836004+00:00] Fetched latest record :: {"keys": ["id", "feature_1", "feature_2", "feature_3"]}


{'id': 1, 'feature_1': 3.5, 'feature_2': 2.8, 'feature_3': 'Medium'}

In [65]:
def build_input_df(raw: Dict[str, Any], feature_cols: List[str]) -> pd.DataFrame:
    if not feature_cols:
        raise ValueError("FEATURE_COLUMNS is empty.")
    row = {k: raw.get(k, None) for k in feature_cols}
    X = pd.DataFrame([row], columns=feature_cols)
    return X

X_latest = build_input_df(latest_row, FEATURE_COLUMNS)
log_local("Built input DataFrame", shape=X_latest.shape, columns=FEATURE_COLUMNS)
X_latest.head(1)

X_latest = X_latest.fillna(0)



[2025-11-02T15:58:54.365460+00:00] Built input DataFrame :: {"shape": [1, 3], "columns": ["feature_1", "feature_2", "feature_3"]}


In [59]:
model = load(MODEL_PATH)
log_local("Loaded model", path=MODEL_PATH, model_type=str(type(model)))

result: Dict[str, Any] = {"predicted_at": _ts()}

y_pred = model.predict(X_latest)
pred_value = y_pred[0]
if hasattr(pred_value, "item"):
    pred_value = pred_value.item()
result["prediction"] = pred_value

probability: Any = None
if hasattr(model, "predict_proba"):
    try:
        proba = model.predict_proba(X_latest)
        if getattr(proba, "ndim", 1) == 2 and proba.shape[1] == 2:
            probability = float(proba[0, 1])
        else:
            probability = proba[0].tolist()
    except Exception:
        probability = None

result["probability"] = probability
log_local("Prediction complete", result=result)
result


[2025-11-02T15:45:55.027200+00:00] Loaded model :: {"path": "ml/artifacts/model.joblib", "model_type": "<class 'sklearn.dummy.DummyRegressor'>"}
[2025-11-02T15:45:55.027901+00:00] Prediction complete :: {"result": {"predicted_at": "2025-11-02T15:45:55.027310+00:00", "prediction": 45.666666666666664, "probability": null}}


{'predicted_at': '2025-11-02T15:45:55.027310+00:00',
 'prediction': 45.666666666666664,
 'probability': None}

In [61]:

import json, numpy as np, requests
from typing import Dict, Any

# Optional mapping for categorical
CATEGORY_MAPS: Dict[str, Dict[Any, Any]] = {
    # Example: "feature_3": {"Low": 0.0, "Medium": 1.0, "High": 2.0}
    "feature_3": {"Low": 0.0, "Medium": 1.0, "High": 2.0}
}

def to_jsonable(v):
    if v is None: return None
    if isinstance(v, (str, bool, int, float)): return v
    if isinstance(v, (np.bool_,)): return bool(v)
    if isinstance(v, (np.integer,)): return int(v)
    if isinstance(v, (np.floating,)): return float(v)
    if hasattr(v, "item"):
        try: return v.item()
        except Exception: pass
    if isinstance(v, (list, tuple)): return [to_jsonable(x) for x in v]
    if isinstance(v, dict): return {str(k): to_jsonable(vv) for k, vv in v.items()}
    return str(v)

def coerce_inputs_for_model(raw_inputs: Dict[str, Any]) -> Dict[str, Any]:
    clean = {}
    for col in FEATURE_COLUMNS:
        val = raw_inputs.get(col, None)

        if col in CATEGORY_MAPS and isinstance(val, str):
            val = CATEGORY_MAPS[col].get(val, val)

        try:
            if isinstance(val, str):
                # convert numeric-looking strings, leave others as-is
                if val.strip().replace(".","",1).replace("-","",1).isdigit():
                    val = float(val)
        except Exception:
            pass
        clean[col] = val
    return clean

record_id_value = None
for k in CANDIDATE_ID_KEYS:
    if k in latest_row:
        record_id_value = latest_row[k]
        break
record_id_str = None if record_id_value is None else str(record_id_value)

#  Coerce inputs to what the model expects
inputs_clean = coerce_inputs_for_model({k: latest_row.get(k, None) for k in FEATURE_COLUMNS})

#  Build fully JSON-serializable payload
clean_payload = {
    "record_id": record_id_str,                              # <-- string
    "prediction": to_jsonable(result.get("prediction")),
    "probability": to_jsonable(result.get("probability")),
    "predicted_at": str(result.get("predicted_at")),
    "inputs": to_jsonable(inputs_clean),
}

#  POST and print detailed response
url = BASE_URL.rstrip("/") + POST_LOG_ENDPOINT
print("POST", url)
print("Payload:", json.dumps(clean_payload)[:500], "...")
r = requests.post(url, json=clean_payload)
print("Status:", r.status_code)
print("Response:", r.text[:1000])
r.raise_for_status()

resp = r.json()
log_local("Logged prediction", response_preview=str(resp)[:240])
resp


POST https://orderly-unideographic-maire.ngrok-free.dev/api/predictions
Payload: {"record_id": "1", "prediction": 45.666666666666664, "probability": null, "predicted_at": "2025-11-02T15:45:55.027310+00:00", "inputs": {"feature_1": 3.5, "feature_2": 2.8, "feature_3": 1.0}} ...
Status: 200
Response: {"status":"ok","logged":true}
[2025-11-02T15:47:20.064030+00:00] Logged prediction :: {"response_preview": "{'status': 'ok', 'logged': True}"}


{'status': 'ok', 'logged': True}

In [62]:
import sqlite3
import pandas as pd

conn = sqlite3.connect("predictions.db")
logs = pd.read_sql_query(
    "SELECT * FROM prediction_logs ORDER BY id DESC LIMIT 5",
    conn
)
conn.close()

logs  # display the most recent 5 rows



Unnamed: 0,id,record_id,prediction,probability,predicted_at,inputs_json,created_at
