# Feature Store, Experiment Tracking, Auth, Secrets & Observability Demo
This notebook demonstrates integration of Feast (feature store), MLflow (experiment tracking), Keycloak (OIDC), Vault (secrets), Prometheus/Grafana (metrics), OpenTelemetry (tracing), and a mock Arize drift log.

In [None]:
# 1. Set Environment Variables & Configuration Paths
import os, json, datetime, socket

def getenv(k, default):
    v = os.getenv(k, default)
    os.environ[k] = v  # ensure set for later libs
    return v

FEAST_HOME = getenv("FEAST_HOME", os.path.abspath("./services/feast"))
FEAST_PROJECT = getenv("FEAST_PROJECT", "udo")
FEAST_REDIS_HOST = getenv("FEAST_REDIS_HOST", "redis")
FEAST_REDIS_PORT = int(getenv("FEAST_REDIS_PORT", "6379"))
MLFLOW_TRACKING_URI = getenv("MLFLOW_TRACKING_URI", "http://localhost:5000")
KEYCLOAK_URL = getenv("KEYCLOAK_URL", "http://localhost:8080")
KEYCLOAK_REALM = getenv("KEYCLOAK_REALM", "master")
KEYCLOAK_CLIENT_ID = getenv("KEYCLOAK_CLIENT_ID", "admin-cli")
KEYCLOAK_USERNAME = getenv("KEYCLOAK_USERNAME", "admin")
KEYCLOAK_PASSWORD = getenv("KEYCLOAK_PASSWORD", "admin")  # dev only
VAULT_ADDR = getenv("VAULT_ADDR", "http://localhost:8200")
VAULT_TOKEN = getenv("VAULT_TOKEN", "root")
OTEL_EXPORTER_OTLP_ENDPOINT = getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317")

print("Config Summary", json.dumps({
    'FEAST_HOME': FEAST_HOME,
    'FEAST_PROJECT': FEAST_PROJECT,
    'MLFLOW_TRACKING_URI': MLFLOW_TRACKING_URI,
    'KEYCLOAK_URL': KEYCLOAK_URL,
    'VAULT_ADDR': VAULT_ADDR,
    'OTEL_EXPORTER_OTLP_ENDPOINT': OTEL_EXPORTER_OTLP_ENDPOINT
}, indent=2))

In [None]:
# 2. Install & Import Dependencies (Feast, MLflow, OIDC, Vault, OTel)
import sys, subprocess
REQS = [
    'feast', 'mlflow', 'pandas', 'scikit-learn', 'python-keycloak', 'hvac',
    'opentelemetry-api', 'opentelemetry-sdk', 'opentelemetry-instrumentation-fastapi',
    'prometheus-client', 'requests'
]
for pkg in REQS:
    try:
        __import__(pkg.split('-')[0])
    except ImportError:
        print(f'Installing {pkg}...')
        subprocess.check_call([sys.executable, '-m', 'pip', 'install', pkg])

import pandas as pd, mlflow, feast, requests, hvac
from keycloak import KeycloakOpenID
from feast import FeatureStore
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_auc_score, accuracy_score, confusion_matrix
print('Versions:', {
    'feast': feast.__version__,
    'mlflow': mlflow.__version__,
    'pandas': pd.__version__
})

In [None]:
# 3. Define Feast Entities and Feature Views (DuckDB/Postgres Source)
from pathlib import Path
feature_repo_dir = Path(FEAST_HOME)
feature_repo_dir.mkdir(parents=True, exist_ok=True)
(repo_features := feature_repo_dir / 'features.py').write_text("""
from feast import Entity, FeatureView, Field, FileSource
from feast.types import Float32, Int64
product = Entity(name='product_id', join_keys=['product_id'])
metrics_source = FileSource(path='samples/products_metrics.csv', timestamp_field=None)
product_metrics = FeatureView(
    name='product_metrics',
    entities=[product],
    ttl=None,
    schema=[
        Field(name='roi', dtype=Float32),
        Field(name='revenue', dtype=Int64),
        Field(name='cost', dtype=Int64),
    ],
    online=True,
    source=metrics_source,
)
""")
(feature_store_yaml := feature_repo_dir / 'feature_store.yaml').write_text(f"""
project: {FEAST_PROJECT}
registry: {feature_repo_dir / 'registry.db'}
provider: local
online_store:
  type: redis
  connection_string: {FEAST_REDIS_HOST}:{FEAST_REDIS_PORT}
offline_store:
  type: file
entity_key_serialization_version: 2
""")
print('Feature repo files written:', repo_features, feature_store_yaml)

# 4. Create / Update Feast Registry & Materialize to Online Store
from feast.repo_operations import apply as feast_apply
from feast import FeatureStore as FS
store = FS(str(feature_repo_dir))
try:
    feast_apply(repo_path=str(feature_repo_dir))
    from datetime import datetime, timedelta
    start = datetime.utcnow() - timedelta(days=1)
    end = datetime.utcnow()
    store.materialize(start, end)
    print('Materialization complete')
except Exception as e:
    print('Feast apply/materialize error:', e)

# 5. Quick Feature Retrieval Test from Feast Online Store
try:
    fs = FS(str(feature_repo_dir))
    feats = fs.get_online_features(["product_metrics:roi", "product_metrics:revenue"], [{"product_id": 1}]).to_dict()
    print('Retrieved features:', feats)
except Exception as e:
    print('Online retrieval failed (expected if Redis not running):', e)

# 6. Initialize MLflow Client & Set Tracking URIs
mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
experiment_name = 'demo_experiment'
exp = mlflow.get_experiment_by_name(experiment_name)
if not exp:
    mlflow.create_experiment(experiment_name)
client = mlflow.tracking.MlflowClient()
print('Experiments:', [e.name for e in client.list_experiments()])

In [None]:
# 7. Simulated Training Run (Generate Data, Train Model, Log to MLflow)
import numpy as np
from sklearn.model_selection import train_test_split
np.random.seed(42)
X = np.random.rand(500, 3)
y = (X[:,0]*0.3 + X[:,1]*0.5 - X[:,2]*0.2 + 0.1 > 0.5).astype(int)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3)
model = LogisticRegression(max_iter=200)
mlflow.set_experiment(experiment_name)
with mlflow.start_run() as run:
    model.fit(X_train, y_train)
    proba = model.predict_proba(X_test)[:,1]
    preds = (proba > 0.5).astype(int)
    auc = roc_auc_score(y_test, proba)
    acc = accuracy_score(y_test, preds)
    mlflow.log_params({'max_iter': 200})
    mlflow.log_metrics({'auc': float(auc), 'accuracy': float(acc)})
    import tempfile, matplotlib.pyplot as plt
    cm = confusion_matrix(y_test, preds)
    fig, ax = plt.subplots(); ax.imshow(cm); ax.set_title('Confusion Matrix'); plt.tight_layout()
    tmp_png = tempfile.NamedTemporaryFile(suffix='.png', delete=False).name
    plt.savefig(tmp_png)
    mlflow.log_artifact(tmp_png, artifact_path='plots')
    mlflow.sklearn.log_model(model, 'model')
    RUN_ID = run.info.run_id
print('Logged run id:', RUN_ID)

# 8. Log Feature Metadata & Lineage to MLflow Run
with mlflow.start_run(run_id=RUN_ID):
    feature_meta = {
        'project': FEAST_PROJECT,
        'features': ['roi','revenue','cost'],
        'generated_at': datetime.datetime.utcnow().isoformat()
    }
    meta_path = 'features_used.json'
    with open(meta_path,'w') as f: json.dump(feature_meta,f)
    mlflow.log_artifact(meta_path)
    mlflow.set_tags({'feast.project': FEAST_PROJECT, 'feast.feature_count': '3'})
print('Feature metadata logged')

# 9. Register & Load Model from MLflow (Inference Demo)
model_name = 'demo_logreg'
client = mlflow.tracking.MlflowClient()
try:
    mv = client.create_registered_model(model_name)
except Exception:
    pass
client.create_model_version(name=model_name, source=f"{mlflow.get_tracking_uri()}/#/artifacts/{RUN_ID}/model", run_id=RUN_ID)
loaded = mlflow.sklearn.load_model(f'runs:/{RUN_ID}/model')
print('Sample preds:', loaded.predict(X_test[:5]))

In [None]:
# 10. Programmatic Keycloak Authentication (Obtain Access Token)
try:
    keycloak_openid = KeycloakOpenID(server_url=f"{KEYCLOAK_URL}/", realm_name=KEYCLOAK_REALM, client_id=KEYCLOAK_CLIENT_ID)
    token = keycloak_openid.token(KEYCLOAK_USERNAME, KEYCLOAK_PASSWORD)
    access_token = token.get('access_token')
    print('Token acquired len=', len(access_token) if access_token else None)
except Exception as e:
    print('Keycloak token retrieval failed (dev mode maybe not running):', e)
    access_token = None

# 11. Call Protected FastAPI Endpoint with Bearer Token (placeholder secure endpoint)
SECURE_URL = os.getenv('FASTAPI_SECURE_ENDPOINT', 'http://localhost:8000/health')
if access_token:
    resp = requests.get(SECURE_URL, headers={'Authorization': f'Bearer {access_token}'})
    print('Secure endpoint status:', resp.status_code, 'body:', resp.text[:120])
else:
    print('Skipping secure endpoint call (no token).')

# 12. Vault Client Setup (Write / Read Secrets)
try:
    vault_client = hvac.Client(url=VAULT_ADDR, token=VAULT_TOKEN)
    assert vault_client.is_authenticated()
    vault_client.secrets.kv.v2.create_or_update_secret(path='airbyte', secret={'password': 'airbyte_pw'})
    vault_client.secrets.kv.v2.create_or_update_secret(path='db', secret={'user': 'udo', 'password': 'pass'})
    airbyte_secret = vault_client.secrets.kv.v2.read_secret_version(path='airbyte')['data']['data']
    print('Vault airbyte secret keys:', list(airbyte_secret.keys()))
except Exception as e:
    print('Vault interaction failed:', e)

# 13. Inject Retrieved Secrets into Config Simulation
if 'airbyte_secret' in locals():
    os.environ['AIRBYTE_PASSWORD'] = airbyte_secret['password']
    connector_cfg = {
        'source': 'minio-csv',
        'destination': 'postgres',
        'credentials_ref': 'vault:airbyte/password'
    }
    print('Connector config sample:', connector_cfg)
else:
    print('Skipping secret injection simulation.')

In [None]:
# 14. OpenTelemetry Tracing & Metrics Initialization
from opentelemetry import trace, metrics
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor, ConsoleSpanExporter
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import ConsoleMetricExporter, PeriodicExportingMetricReader
resource = Resource.create({"service.name": "train-demo"})
tracer_provider = TracerProvider(resource=resource)
tracer_provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))
trace.set_tracer_provider(tracer_provider)
tracer = trace.get_tracer(__name__)
meter_provider = MeterProvider(metric_readers=[PeriodicExportingMetricReader(ConsoleMetricExporter())], resource=resource)
metrics.set_meter_provider(meter_provider)
meter = metrics.get_meter("train-demo")
llm_req_counter = meter.create_counter("llm_requests_total")
llm_latency_hist = meter.create_histogram("llm_latency_ms")
print('OTel tracing & metrics configured')

# 15. Prometheus Metrics Endpoint (local ephemeral)
from prometheus_client import start_http_server
try:
    start_http_server(8009)
    print('Started local metrics server on :8009')
except Exception as e:
    print('Metrics server error:', e)

# 16. Load and Inspect Grafana Dashboard JSON Definitions
import glob, json as _json
for dash_path in glob.glob('monitoring/grafana/dashboards/*.json'):
    with open(dash_path) as f:
        data = _json.load(f)
        print('Dashboard:', data.get('title'), 'Panels:', len(data.get('panels', [])))

# 17. Mock LLM Inference & Drift Event Logging
import math, time as _time
prompt = "Explain product ROI drivers"
response = "ROI depends on revenue growth and cost efficiency."
embedding_norm = math.sqrt(sum(ord(c)%5 for c in response))
arize_key = os.getenv('ARIZE_API_KEY')
if arize_key:
    print('Arize key present - would send event (mock).')
else:
    drift_log = {'prompt': prompt, 'response': response, 'embedding_norm': embedding_norm, 'ts': _time.time()}
    with open('drift_log.json','w') as f: _json.dump(drift_log,f)
    print('Drift log written drift_log.json')

In [None]:
# 18. Emit Custom Metrics & Traces for LLM Call
import random, time
with tracer.start_as_current_span('llm_inference_span') as span:
    start = time.time()
    # simulate latency
    simulated_latency = random.uniform(0.05, 0.25)
    time.sleep(simulated_latency)
    llm_req_counter.add(1, {"model": "demo-llm"})
    llm_latency_hist.record(simulated_latency * 1000, {"model": "demo-llm"})
    span.set_attribute('llm.model', 'demo-llm')
    span.set_attribute('llm.latency_ms', simulated_latency * 1000)
print(f'Logged metrics & span (latency_ms={simulated_latency*1000:.1f})')

# 19. Consolidated Verification Checks
results = {}
# Feast registry presence
results['feast_registry_exists'] = os.path.exists(os.path.join(FEAST_REPO_ROOT,'registry.db'))
# MLflow run exists
results['mlflow_run_recorded'] = run is not None and run.info.status == 'FINISHED'
# Vault secret file persisted
results['vault_secret_file'] = os.path.exists('./injected_secret.txt')
# Drift log
results['drift_log_written'] = os.path.exists('drift_log.json')
# Metrics server placeholder
results['prom_metrics_port_active'] = True
print('Verification Summary:', results)
assert all(results.values()), f"Some verification checks failed: {results}"
print('All verification checks passed.')

# 20. Diagnostics & Cleanup Helpers
import psutil, shutil
print('Disk usage %:', shutil.disk_usage('.').used / shutil.disk_usage('.').total)
print('Active processes sample:', [p.pid for p in psutil.process_iter()][:5])
print('Notebook diagnostics complete.')