
# End-to-End Machine Learning Engineer Workflow (Google/Industry Style)

**Date:** 2025-10-14 19:04 UTC  

This notebook is an *advanced*, realistic walkthrough of how a Machine Learning Engineer might work end-to-end: from problem framing through deployment and monitoring. It’s designed to be a reusable template that you can adapt to your project.

> This is **tool-agnostic** but uses Google Cloud primitives when relevant (e.g., Vertex AI, BigQuery, GCS). Everything here also maps well to open-source stacks (Kubernetes, MLflow, Feast, etc.).



## 0) Typical Tooling & Resources

**IDEs / Dev Environments**
- Google Colab / Colab Pro
- VS Code (with Remote SSH / Dev Containers), PyCharm
- Cloud Shell Editor (in-browser)
- JupyterLab

**Languages & Libraries**
- Python 3.x, Conda or `pip` venvs
- Data: `pandas`, `numpy`, `pyarrow`, `polars`
- Modeling: `scikit-learn`, `xgboost`, `lightgbm`, `tensorflow`/`keras`, `jax`/`flax`
- Time series: `statsmodels`, `prophet` (or `neuralprophet`), `tsfresh`
- Experiment tracking: `mlflow`, `Weights & Biases` (W&B), Vertex AI Experiments
- Pipelines/Orchestration: **TFX**, **Vertex AI Pipelines**, **Kubeflow Pipelines**, **Apache Airflow**
- Feature store: **Feast** (OSS) or Vertex AI Feature Store
- Model serving: **Vertex AI Endpoints**, **Cloud Run**, **GKE** (Kubernetes), **TF Serving**
- Testing/Quality: `pytest`, `great_expectations`, `pydantic`, `evidently`
- Monitoring: **Vertex AI Model Monitoring**, **Prometheus/Grafana**, **Evidently**, custom logs

**Cloud (Google Cloud Platform)**
- **BigQuery** (data warehouse / SQL analytics)
- **Cloud Storage (GCS)** (data & artifact storage)
- **Dataflow**/**Apache Beam** (stream/batch data pipelines)
- **Pub/Sub** (event streaming)
- **Vertex AI** (training, hyperparameter tuning, pipelines, feature store, endpoints, monitoring)
- **Cloud Build** + **Artifact Registry** (CI/CD, container images)
- **GKE**/**Cloud Run** (containerized services)

**CI/CD & Reproducibility**
- GitHub/GitLab/Cloud Source Repositories
- Cloud Build / GitHub Actions
- Docker, Make, `pyproject.toml` + `tox`
- Infra-as-code: Terraform



## 1) Problem Framing & Success Criteria

**Business objective:** e.g., *Forecast daily demand for Category X to reduce stockouts by 15% and overstock by 10% within the next quarter.*

**ML task type:** Regression (time series); KPI = RMSE (primary), MAE, MAPE/SMAPE, R².

**Constraints & SLAs**
- Latency: < 200ms P95 for online predictions
- Throughput: 200 RPS
- Model refresh cadence: daily retrain
- Fairness/ethics: ensure no harmful bias in allocation decisions
- Privacy/compliance: PII handling, GDPR



## 2) Repository Layout (Recommended)

```
repo/
├─ notebooks/
├─ src/
│  ├─ data/
│  │  ├─ ingest.py
│  │  ├─ validate.py
│  ├─ features/
│  │  ├─ build.py
│  ├─ models/
│  │  ├─ train.py
│  │  ├─ evaluate.py
│  │  ├─ infer.py
│  ├─ serving/
│  │  ├─ service.py
│  ├─ utils/
│  │  ├─ io.py
│  │  ├─ metrics.py
│  └─ config.py
├─ tests/
│  ├─ test_data_validations.py
│  ├─ test_feature_pipeline.py
│  ├─ test_train_eval.py
├─ Dockerfile
├─ pyproject.toml / setup.py
├─ requirements.txt
├─ Makefile
└─ README.md
```



## 3) Data Ingestion & Validation

Below we simulate loading data from CSV. In practice you might:
- Query **BigQuery** using `pandas-gbq` or the BigQuery Python client.
- Load from **GCS** (`gs://bucket/path`) via `gcsfs` or `google-cloud-storage`.
- Stream from **Pub/Sub** or process via **Dataflow/Beam**.

Always add **schema/data quality checks** (e.g., `great_expectations`, `pydantic`).


In [None]:

# (Simulated) Data Ingestion
import pandas as pd
import numpy as np

# Synthetic example: time series with covariates
rng = pd.date_range("2022-01-01", periods=600, freq="D")
np.random.seed(42)
df = pd.DataFrame({
    "date": rng,
    "feature_search_interest": np.clip(np.sin(np.linspace(0, 30, len(rng))) * 50 + 50 + np.random.randn(len(rng))*5, 0, None),
    "feature_promo": np.random.binomial(1, 0.1, size=len(rng)),
    "feature_price": np.clip(100 + np.random.randn(len(rng))*3, 0, None)
})
# target depends on features with noise
df["target_demand"] = (
    0.6*df["feature_search_interest"] -
    0.3*df["feature_price"] +
    20*df["feature_promo"] +
    np.random.randn(len(rng))*8 +
    30
).clip(0)

df.head()


In [None]:

# Basic schema/quality checks
assert df.isna().sum().sum() == 0, "Found missing values"
assert (df["date"].diff().dropna() > pd.Timedelta(0)).all(), "Dates must be increasing"
print("Basic validations passed.")



## 4) Time-Aware Split (Train/Val/Test)

For time series:
- Train: oldest 70%
- Val: next 15%
- Test: most recent 15%

This avoids leakage.


In [None]:

# Time-aware split
df = df.sort_values("date").reset_index(drop=True)

n = len(df)
train_end = int(n*0.70)
val_end = int(n*0.85)

train = df.iloc[:train_end]
val   = df.iloc[train_end:val_end]
test  = df.iloc[val_end:]

features = ["feature_search_interest", "feature_promo", "feature_price"]
target = "target_demand"

X_train, y_train = train[features], train[target]
X_val, y_val     = val[features], val[target]
X_test, y_test   = test[features], test[target]

len(train), len(val), len(test)



## 5) Feature Engineering Pipeline

Use `ColumnTransformer` and `Pipeline` for reproducibility.  
For sequence models (LSTM/JAX/TF), build windows; for tabular, keep as-is.


In [None]:

from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer

numeric = ["feature_search_interest", "feature_price"]
categorical = ["feature_promo"]

preprocess = ColumnTransformer([
    ("num", Pipeline([
        ("imputer", SimpleImputer(strategy="median")),
        ("scaler", StandardScaler())
    ]), numeric),
    ("cat", OneHotEncoder(handle_unknown="ignore"), categorical)
])

preprocess



## 6) Baseline Model (XGBoost) & Metrics

Start with a strong baseline (often gradient boosting). This is your anchor before deep models.


In [None]:

from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.pipeline import Pipeline
import numpy as np

try:
    from xgboost import XGBRegressor
    booster = XGBRegressor(
        n_estimators=400, max_depth=6, learning_rate=0.05,
        subsample=0.9, colsample_bytree=0.9, random_state=42
    )
except Exception as e:
    # Fallback to RandomForest if xgboost not available
    from sklearn.ensemble import RandomForestRegressor
    booster = RandomForestRegressor(n_estimators=500, random_state=42)

pipe = Pipeline([
    ("preprocess", preprocess),
    ("model", booster)
])

pipe.fit(X_train, y_train)
pred_val = pipe.predict(X_val)

rmse = mean_squared_error(y_val, pred_val, squared=False)
mae  = mean_absolute_error(y_val, pred_val)
r2   = r2_score(y_val, pred_val)

print(f"Validation RMSE: {rmse:.3f}")
print(f"Validation MAE : {mae:.3f}")
print(f"Validation R²  : {r2:.3f}")



## 7) Hyperparameter Tuning (RandomizedSearchCV)

In production you might use **Vertex AI Vizier**, **KerasTuner**, **Optuna**, or **Ray Tune**.  
Here we show a lightweight scikit-learn randomized search.


In [None]:

from sklearn.model_selection import RandomizedSearchCV
from scipy.stats import randint, uniform

param_distributions = {
    "model__n_estimators": randint(200, 800),
    # for XGBRegressor; for RF these will be ignored or adapted
    "model__max_depth": randint(3, 10),
}

search = RandomizedSearchCV(
    pipe, param_distributions=param_distributions,
    n_iter=8, scoring="neg_root_mean_squared_error",
    refit=True, random_state=42, n_jobs=-1, cv=3
)
search.fit(X_train, y_train)
best_model = search.best_estimator_

pred_val_best = best_model.predict(X_val)
rmse_best = mean_squared_error(y_val, pred_val_best, squared=False)
mae_best  = mean_absolute_error(y_val, pred_val_best)
r2_best   = r2_score(y_val, pred_val_best)

print("Best Params:", search.best_params_)
print(f"Validation RMSE (best): {rmse_best:.3f}")
print(f"Validation MAE  (best): {mae_best:.3f}")
print(f"Validation R²   (best): {r2_best:.3f}")



## 8) Deep Model (TensorFlow) — Optional

For sequences, you might build an LSTM/Temporal CNN/Transformer. Below is a **template**; adjust windowing to your use case.


In [None]:

# Template only — may require TensorFlow installed in your environment.
try:
    import tensorflow as tf
    from tensorflow.keras import layers, models

    # Build a simple MLP on tabular features as an example (replace with proper sequence windows for LSTM)
    tf_model = models.Sequential([
        layers.Input(shape=(X_train.shape[1],)),
        layers.Dense(64, activation="relu"),
        layers.Dropout(0.2),
        layers.Dense(32, activation="relu"),
        layers.Dense(1)  # linear output for regression
    ])
    tf_model.compile(optimizer="adam", loss="mse", metrics=["mae"])

    # Use preprocessed features to feed TF model (fit the preprocesser first)
    X_train_p = preprocess.fit_transform(X_train)
    X_val_p = preprocess.transform(X_val)

    history = tf_model.fit(
        X_train_p, y_train,
        validation_data=(X_val_p, y_val),
        epochs=30, batch_size=32, verbose=0
    )
    print("TF model trained (template).")
except Exception as e:
    print("TensorFlow not available in this environment; skipping deep model. Error:", e)



## 9) Final Evaluation on Unseen Test Data

Select the best model by validation performance, then evaluate once on the **holdout test** set.


In [None]:

import matplotlib.pyplot as plt

pred_test = best_model.predict(X_test)
rmse_test = mean_squared_error(y_test, pred_test, squared=False)
mae_test  = mean_absolute_error(y_test, pred_test)
r2_test   = r2_score(y_test, pred_test)

print(f"Test RMSE: {rmse_test:.3f}")
print(f"Test MAE : {mae_test:.3f}")
print(f"Test R²  : {r2_test:.3f}")

# Residual plot
resid = y_test - pred_test
plt.figure()
plt.scatter(pred_test, resid, alpha=0.6)
plt.axhline(0, linestyle="--")
plt.xlabel("Predicted")
plt.ylabel("Residuals")
plt.title("Residuals vs Predicted (Test)")
plt.show()

# Prediction vs Actual over time
plt.figure()
plt.plot(test["date"], y_test.values, label="Actual")
plt.plot(test["date"], pred_test, label="Predicted")
plt.title("Actual vs Predicted (Test)")
plt.xlabel("Date")
plt.ylabel("Target")
plt.legend()
plt.show()



## 10) Experiment Tracking

Options:
- **MLflow**: model registry, experiment UI
- **Vertex AI Experiments**: native on GCP
- **Weights & Biases (W&B)**

> In production at Google scale, you would log: datasets (URIs), code commit, params, metrics, artifacts, and environment hashes.


In [None]:

# Example MLflow usage (requires mlflow installed & a tracking server)
# import mlflow
# mlflow.set_experiment("demand_forecasting")
# with mlflow.start_run():
#     mlflow.log_params(search.best_params_)
#     mlflow.log_metric("rmse_val", rmse_best)
#     mlflow.log_metric("mae_val", mae_best)
#     mlflow.log_metric("r2_val", r2_best)
#     mlflow.sklearn.log_model(best_model, "model")
#     mlflow.log_artifact("path/to/data_schema.json")
# print("Logged to MLflow.")
print("MLflow example commented out — enable if available.")



## 11) Persist Artifacts

Save the model, preprocessors, and metadata. Store in **GCS** in production (`gs://...`) and register in a model registry.


In [None]:

import os, joblib, json, pathlib

art_dir = pathlib.Path("artifacts")
art_dir.mkdir(exist_ok=True)

joblib.dump(best_model, art_dir / "model.joblib")
joblib.dump(preprocess, art_dir / "preprocess.joblib")
meta = {
    "created_utc": "2025-10-14T19:04:47.162400Z",
    "features": ["feature_search_interest", "feature_promo", "feature_price"]
}
with open(art_dir / "metadata.json", "w") as f:
    json.dump(meta, f, indent=2)

sorted(os.listdir(art_dir))



## 12) Serving API (FastAPI) — Local or Cloud Run/GKE

You can containerize this and deploy to **Cloud Run** or **GKE**. For Vertex AI, use **Endpoints**.


In [None]:

# Example FastAPI app (save as src/serving/service.py in a real repo)
fastapi_example = r'''
from fastapi import FastAPI
import joblib
import pandas as pd

app = FastAPI()
model = joblib.load("artifacts/model.joblib")
preprocess = joblib.load("artifacts/preprocess.joblib")

@app.post("/predict")
def predict(payload: dict):
    X = pd.DataFrame([payload])
    Xp = preprocess.transform(X)
    y = model.predict(X)
    return {"prediction": float(y[0])}
'''
print(fastapi_example)



## 13) Dockerfile (for Cloud Run/GKE)

Build and push with **Cloud Build** or GitHub Actions. Example:


In [None]:

dockerfile = r'''
FROM python:3.11-slim
WORKDIR /app
COPY artifacts/ artifacts/
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY src/serving/service.py service.py
EXPOSE 8080
CMD ["python", "-m", "uvicorn", "service:app", "--host", "0.0.0.0", "--port", "8080"]
'''
print(dockerfile)



## 14) Vertex AI Deployment (High-Level)

- Upload model artifact to GCS.
- Create a **Model** resource in Vertex AI, point to artifact.
- Deploy to an **Endpoint** with autoscaling.
- Optionally enable **Model Monitoring** (drift/anomaly).

Example (Python, unexecuted here):
```python
from google.cloud import aiplatform

aiplatform.init(project="YOUR_PROJECT", location="us-central1")
model = aiplatform.Model.upload(
    display_name="demand-forecast-model",
    artifact_uri="gs://your-bucket/artifacts/",
    serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-4:latest",
)
endpoint = model.deploy(
    machine_type="n1-standard-2",
    traffic_split={"0": 100},
)
```



## 15) Testing Strategy

- **Unit tests** for data validators, feature pipelines, custom metrics (`pytest`).
- **Integration tests**: end-to-end pipeline on a small sample.
- **Shadow testing**: send real traffic to the new model but don’t serve results.
- **Canary release / A/B tests**: split % of users to new model; compare KPIs.
- **Load tests**: ensure latency & throughput SLAs.


In [None]:

# Tiny example "unit test" (in-notebook) for metrics
def rmse(y_true, y_pred):
    from sklearn.metrics import mean_squared_error
    return mean_squared_error(y_true, y_pred, squared=False)

# Sanity check
_pred = np.array([1.0, 2.0, 3.0])
_true = np.array([1.0, 2.0, 4.0])
assert abs(rmse(_true, _pred) - (1/3)**0.5) < 1e-6
print("Metric unit test passed.")



## 16) Post-Deployment Monitoring

- **Data drift**: compare live feature distributions to training (Evidently, Vertex Monitoring).
- **Performance drift**: monitor RMSE/MAE on delayed ground truth.
- **Logging/Tracing**: request logs, latency, error rates (Cloud Logging, Prometheus).
- **Alerting**: SLO breaches trigger rollbacks or retraining.

**Retraining loop**: schedule pipelines (Vertex AI Pipelines/Airflow) to refresh data → features → train → evaluate → deploy if better.



---

### What to customize next
- Replace synthetic data with your BigQuery query.
- Swap baseline model for LSTM/Transformer with proper windowing.
- Wire up MLflow or Vertex AI Experiments for full tracking.
- Containerize and deploy to **Cloud Run** or **Vertex AI Endpoints**.
- Add CI/CD (Cloud Build) + automated tests.

> Save this notebook as your project template and iterate! 🚀
