# 

MLOps/CVOps Platform - Complete Codebase Exploration

This notebook provides hands-on exploration of every component in the platform.

## Table of Contents
1. [Setup & Imports](#1-setup)
2. [Settings & Configuration (Pydantic)](#2-settings)
3. [Resources - LakeFS](#3-lakefs)
4. [Resources - Trino/Iceberg](#4-trino)
5. [Resources - Redis](#5-redis)
6. [Resources - MLflow](#6-mlflow)
7. [Feature Registry & Generator](#7-feature-registry)
8. [Streaming - Kafka](#8-kafka)
9. [Streaming - ksqlDB](#9-ksqldb)
10. [API - Fraud Detection](#10-fraud-api)
11. [API - Serving Layer](#11-serving)
12. [Monitoring & Metrics](#12-monitoring)
13. [Dagster Pipelines](#13-dagster)
14. [End-to-End Flow Test](#14-e2e)

---
## 1. Setup & Imports <a id='1-setup'></a>

First, let's set up the environment and import all necessary modules.

In [None]:
# Install dependencies if running in Jupyter container
# !pip install requests redis trino pandas pyyaml mlflow boto3 kafka-python feast --quiet

In [1]:
import os
import sys
import json
import requests
from datetime import datetime
from pprint import pprint

# Add project root to path
PROJECT_ROOT = os.path.abspath('..')
if PROJECT_ROOT not in sys.path:
    sys.path.insert(0, PROJECT_ROOT)

print(f"Project root: {PROJECT_ROOT}")
print(f"Python version: {sys.version}")

Project root: /home/jovyan
Python version: 3.11.6 | packaged by conda-forge | (main, Oct  3 2023, 10:40:35) [GCC 12.3.0]


In [None]:
# Environment configuration for Docker network
# When running inside exp-jupyter container, use Docker service names
# When running locally, use localhost with mapped ports

IN_DOCKER = os.path.exists('/.dockerenv') or os.getenv('JUPYTER_ENABLE_LAB')

if IN_DOCKER:
    # Inside Docker network
    LAKEFS_URL = "http://exp-lakefs:8000"
    TRINO_HOST = "exp-trino"
    TRINO_PORT = 8080
    REDIS_HOST = "exp-redis"
    REDIS_PORT = 6379
    MLFLOW_URL = "http://exp-mlflow:5000"
    KAFKA_BOOTSTRAP = "exp-kafka:9092"
    KSQLDB_URL = "http://exp-ksqldb-server:8088"
    FRAUD_API_URL = "http://exp-fraud-api:8001"
    MINIO_ENDPOINT = "http://exp-minio:9000"
else:
    # Local development with port mapping
    LAKEFS_URL = "http://localhost:18000"
    TRINO_HOST = "localhost"
    TRINO_PORT = 18083
    REDIS_HOST = "localhost"
    REDIS_PORT = 16379
    MLFLOW_URL = "http://localhost:15000"
    KAFKA_BOOTSTRAP = "localhost:29092"
    KSQLDB_URL = "http://localhost:8088"
    FRAUD_API_URL = "http://localhost:18002"
    MINIO_ENDPOINT = "http://localhost:19000"

# LakeFS credentials - Get from environment or set in .env file
LAKEFS_ACCESS_KEY = os.getenv("LAKEFS_ACCESS_KEY_ID", "your-lakefs-access-key")
LAKEFS_SECRET_KEY = os.getenv("LAKEFS_SECRET_ACCESS_KEY", "your-lakefs-secret-key")

print(f"Running in Docker: {IN_DOCKER}")
print(f"LakeFS URL: {LAKEFS_URL}")
print(f"Trino: {TRINO_HOST}:{TRINO_PORT}")

---
## 2. Settings & Configuration (Pydantic) <a id='2-settings'></a>

Explore the centralized settings system using Pydantic.

In [3]:
# Import settings module
try:
    from pipelines.settings import settings, InfraSettings, MLOpsSettings, CVOpsSettings
    
    print("=" * 60)
    print("INFRASTRUCTURE SETTINGS")
    print("=" * 60)
    print(f"Trino Host: {settings.infra.trino_host}")
    print(f"Trino Port: {settings.infra.trino_port}")
    print(f"Trino Catalog: {settings.infra.trino_catalog}")
    print(f"Redis Host: {settings.infra.redis_host}")
    print(f"MLflow URI: {settings.infra.mlflow_tracking_uri}")
    print(f"Kafka Bootstrap: {settings.infra.kafka_bootstrap_servers}")
    
    print("\n" + "=" * 60)
    print("MLOPS SETTINGS")
    print("=" * 60)
    print(f"MLflow Experiment: {settings.mlops.mlflow_experiment}")
    print(f"Model Name: {settings.mlops.mlflow_model_name}")
    print(f"Fraud API URL: {settings.mlops.fraud_api_url}")
    print(f"AB Test Enabled: {settings.mlops.ab_test_enabled}")
    
    print("\n" + "=" * 60)
    print("CVOPS SETTINGS")
    print("=" * 60)
    print(f"CV Repo: {settings.cvops.cvops_repo}")
    print(f"CV Branch: {settings.cvops.cvops_branch}")
    print(f"YOLO Device: {settings.cvops.yolo_device}")
    print(f"YOLO Confidence: {settings.cvops.yolo_confidence}")
    
except ImportError as e:
    print(f"Could not import settings: {e}")
    print("This is expected if pydantic-settings is not installed")

Could not import settings: No module named 'pipelines'
This is expected if pydantic-settings is not installed


In [None]:
# Explore Pydantic model structure
print("InfraSettings Fields:")
print("-" * 40)
for field_name, field_info in InfraSettings.model_fields.items():
    default = field_info.default
    print(f"  {field_name}: {field_info.annotation} = {default}")

---
## 3. Resources - LakeFS <a id='3-lakefs'></a>

Explore the LakeFS resource for data versioning.

In [1]:
# Test LakeFS connection directly
def test_lakefs_connection():
    """Test basic LakeFS API connectivity."""
    try:
        response = requests.get(
            f"{LAKEFS_URL}/api/v1/healthcheck",
            timeout=5
        )
        print(f"LakeFS Health: {response.status_code}")
        return response.status_code == 204
    except Exception as e:
        print(f"LakeFS connection failed: {e}")
        return False

lakefs_healthy = test_lakefs_connection()

LakeFS connection failed: name 'requests' is not defined


In [None]:
# LakeFS API wrapper functions (matching resources.py)
class LakeFSClient:
    """Simple LakeFS client for exploration."""
    
    def __init__(self, server_url, access_key, secret_key):
        self.server_url = server_url
        self.auth = (access_key, secret_key)
    
    def _api_call(self, method, endpoint, json_data=None):
        url = f"{self.server_url}/api/v1{endpoint}"
        response = requests.request(
            method, url, auth=self.auth, json=json_data, timeout=30
        )
        if response.status_code == 404:
            return None
        response.raise_for_status()
        return response.json() if response.content else {}
    
    def list_repositories(self):
        """List all LakeFS repositories."""
        return self._api_call("GET", "/repositories")
    
    def get_repository(self, repo_name):
        """Get repository details."""
        return self._api_call("GET", f"/repositories/{repo_name}")
    
    def list_branches(self, repo_name):
        """List branches in a repository."""
        return self._api_call("GET", f"/repositories/{repo_name}/branches")
    
    def list_objects(self, repo_name, branch, prefix=""):
        """List objects in a branch."""
        return self._api_call("GET", f"/repositories/{repo_name}/refs/{branch}/objects/ls?prefix={prefix}")
    
    def get_commit_log(self, repo_name, branch, limit=5):
        """Get commit history."""
        return self._api_call("GET", f"/repositories/{repo_name}/refs/{branch}/commits?amount={limit}")

# Create client
lakefs = LakeFSClient(LAKEFS_URL, LAKEFS_ACCESS_KEY, LAKEFS_SECRET_KEY)

In [None]:
# List all repositories
if lakefs_healthy:
    repos = lakefs.list_repositories()
    print("=" * 60)
    print("LAKEFS REPOSITORIES")
    print("=" * 60)
    
    if repos and 'results' in repos:
        for repo in repos['results']:
            print(f"\nRepository: {repo['id']}")
            print(f"  Storage: {repo.get('storage_namespace', 'N/A')}")
            print(f"  Default Branch: {repo.get('default_branch', 'main')}")
            print(f"  Created: {repo.get('creation_date', 'N/A')}")
    else:
        print("No repositories found")

In [None]:
# Explore a specific repository (bronze or warehouse)
REPO_TO_EXPLORE = "bronze"  # Change to explore different repos

if lakefs_healthy:
    repo_info = lakefs.get_repository(REPO_TO_EXPLORE)
    if repo_info:
        print(f"Repository: {REPO_TO_EXPLORE}")
        pprint(repo_info)
        
        # List branches
        branches = lakefs.list_branches(REPO_TO_EXPLORE)
        print(f"\nBranches:")
        if branches and 'results' in branches:
            for b in branches['results']:
                print(f"  - {b['id']} (commit: {b.get('commit_id', 'N/A')[:8]}...)")
    else:
        print(f"Repository '{REPO_TO_EXPLORE}' not found")

In [None]:
# List objects in a branch
BRANCH_TO_EXPLORE = "main"

if lakefs_healthy:
    objects = lakefs.list_objects(REPO_TO_EXPLORE, BRANCH_TO_EXPLORE)
    if objects and 'results' in objects:
        print(f"Objects in {REPO_TO_EXPLORE}/{BRANCH_TO_EXPLORE}:")
        for obj in objects['results'][:20]:  # Limit to 20
            path = obj.get('path', 'N/A')
            size = obj.get('size_bytes', 0)
            print(f"  {path} ({size:,} bytes)")
        
        if len(objects['results']) > 20:
            print(f"  ... and {len(objects['results']) - 20} more")
    else:
        print("No objects found or branch empty")

In [None]:
# View commit history
if lakefs_healthy:
    commits = lakefs.get_commit_log(REPO_TO_EXPLORE, BRANCH_TO_EXPLORE, limit=5)
    if commits and 'results' in commits:
        print(f"Recent commits in {REPO_TO_EXPLORE}/{BRANCH_TO_EXPLORE}:")
        print("-" * 60)
        for commit in commits['results']:
            commit_id = commit.get('id', 'N/A')[:8]
            message = commit.get('message', 'No message')
            committer = commit.get('committer', 'Unknown')
            print(f"{commit_id}... | {committer} | {message[:50]}")

---
## 4. Resources - Trino/Iceberg <a id='4-trino'></a>

Query Iceberg tables using Trino.

In [None]:
from trino.dbapi import connect
from trino.auth import BasicAuthentication

def get_trino_connection():
    """Create Trino connection."""
    return connect(
        host=TRINO_HOST,
        port=TRINO_PORT,
        user="trino",
        catalog="iceberg_dev",
        schema="bronze",
    )

def run_query(sql, fetch=True):
    """Execute Trino query and return results."""
    conn = get_trino_connection()
    cursor = conn.cursor()
    cursor.execute(sql)
    
    if fetch:
        columns = [desc[0] for desc in cursor.description]
        rows = cursor.fetchall()
        cursor.close()
        conn.close()
        return columns, rows
    else:
        cursor.close()
        conn.close()
        return None, None

# Test connection
try:
    cols, rows = run_query("SELECT 1 as test")
    print(f"Trino connection successful: {rows}")
    TRINO_HEALTHY = True
except Exception as e:
    print(f"Trino connection failed: {e}")
    TRINO_HEALTHY = False

In [None]:
# List all schemas in iceberg_dev catalog
if TRINO_HEALTHY:
    cols, rows = run_query("SHOW SCHEMAS FROM iceberg_dev")
    print("=" * 60)
    print("ICEBERG SCHEMAS")
    print("=" * 60)
    for row in rows:
        print(f"  {row[0]}")

In [None]:
# List tables in each schema
if TRINO_HEALTHY:
    schemas_to_check = ['bronze', 'silver', 'gold', 'evaluation', 'cv']
    
    for schema in schemas_to_check:
        try:
            cols, rows = run_query(f"SHOW TABLES FROM iceberg_dev.{schema}")
            if rows:
                print(f"\n{schema.upper()} TABLES:")
                for row in rows:
                    print(f"  - {row[0]}")
        except Exception as e:
            print(f"\n{schema}: Schema not found or empty")

In [None]:
# Sample data from fraud_transactions (if exists)
if TRINO_HEALTHY:
    try:
        cols, rows = run_query("""
            SELECT * FROM iceberg_dev.bronze.fraud_transactions 
            LIMIT 5
        """)
        
        print("SAMPLE: bronze.fraud_transactions")
        print("-" * 60)
        print(f"Columns: {cols}")
        print(f"\nRows:")
        for row in rows:
            print(row)
    except Exception as e:
        print(f"Table not found or empty: {e}")

In [None]:
# Check table row counts
if TRINO_HEALTHY:
    tables_to_count = [
        ('bronze', 'fraud_transactions'),
        ('silver', 'fraud_enriched'),
        ('gold', 'fraud_training_data'),
    ]
    
    print("TABLE ROW COUNTS")
    print("-" * 40)
    for schema, table in tables_to_count:
        try:
            cols, rows = run_query(f"SELECT COUNT(*) FROM iceberg_dev.{schema}.{table}")
            count = rows[0][0]
            print(f"{schema}.{table}: {count:,} rows")
        except:
            print(f"{schema}.{table}: (not found)")

In [None]:
# Query Iceberg table history (snapshots)
if TRINO_HEALTHY:
    try:
        cols, rows = run_query("""
            SELECT snapshot_id, committed_at, operation, summary
            FROM iceberg_dev.bronze."fraud_transactions$snapshots"
            ORDER BY committed_at DESC
            LIMIT 5
        """)
        
        print("ICEBERG SNAPSHOTS (fraud_transactions)")
        print("-" * 60)
        for row in rows:
            print(f"Snapshot: {row[0]}")
            print(f"  Time: {row[1]}")
            print(f"  Operation: {row[2]}")
            print()
    except Exception as e:
        print(f"Could not query snapshots: {e}")

---
## 5. Resources - Redis <a id='5-redis'></a>

Explore Redis for streaming features and caching.

In [None]:
import redis

# Connect to Redis
try:
    r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=0, decode_responses=True)
    r.ping()
    print(f"Redis connection successful!")
    print(f"Redis info: {r.info('server')['redis_version']}")
    REDIS_HEALTHY = True
except Exception as e:
    print(f"Redis connection failed: {e}")
    REDIS_HEALTHY = False

In [None]:
# Explore Redis keys
if REDIS_HEALTHY:
    # Get all keys (limited)
    all_keys = r.keys('*')
    print(f"Total keys in Redis: {len(all_keys)}")
    
    # Group by prefix
    prefixes = {}
    for key in all_keys:
        prefix = key.split(':')[0] if ':' in key else key
        prefixes[prefix] = prefixes.get(prefix, 0) + 1
    
    print("\nKeys by prefix:")
    for prefix, count in sorted(prefixes.items()):
        print(f"  {prefix}: {count}")

In [None]:
# Check streaming features (from Kafka consumer)
if REDIS_HEALTHY:
    streaming_keys = r.keys('feast:streaming:*')
    print(f"Streaming feature keys: {len(streaming_keys)}")
    
    if streaming_keys:
        # Sample one key
        sample_key = streaming_keys[0]
        print(f"\nSample key: {sample_key}")
        
        key_type = r.type(sample_key)
        print(f"Type: {key_type}")
        
        if key_type == 'hash':
            data = r.hgetall(sample_key)
            print(f"Data:")
            for k, v in data.items():
                print(f"  {k}: {v}")
        elif key_type == 'string':
            data = r.get(sample_key)
            print(f"Value: {data}")
        
        ttl = r.ttl(sample_key)
        print(f"TTL: {ttl} seconds")

In [None]:
# Write and read test data
if REDIS_HEALTHY:
    # Write a test hash
    test_key = "notebook:test:customer_123"
    test_data = {
        "tx_count_5min": "3",
        "amount_sum_5min": "450.00",
        "velocity_score": "0.6",
        "high_velocity_flag": "0",
        "timestamp": datetime.now().isoformat()
    }
    
    r.hset(test_key, mapping=test_data)
    r.expire(test_key, 300)  # 5 minute TTL
    
    # Read it back
    retrieved = r.hgetall(test_key)
    print("Wrote and retrieved test data:")
    pprint(retrieved)
    
    # Clean up
    r.delete(test_key)
    print(f"\nCleaned up test key")

---
## 6. Resources - MLflow <a id='6-mlflow'></a>

Explore MLflow for model registry and experiment tracking.

In [None]:
import mlflow
from mlflow.tracking import MlflowClient

# Set tracking URI
mlflow.set_tracking_uri(MLFLOW_URL)

# Test connection
try:
    client = MlflowClient()
    experiments = client.search_experiments()
    print(f"MLflow connection successful!")
    print(f"Found {len(experiments)} experiments")
    MLFLOW_HEALTHY = True
except Exception as e:
    print(f"MLflow connection failed: {e}")
    MLFLOW_HEALTHY = False

In [None]:
# List all experiments
if MLFLOW_HEALTHY:
    print("=" * 60)
    print("MLFLOW EXPERIMENTS")
    print("=" * 60)
    
    for exp in experiments:
        print(f"\nExperiment: {exp.name}")
        print(f"  ID: {exp.experiment_id}")
        print(f"  Artifact Location: {exp.artifact_location}")
        print(f"  Lifecycle Stage: {exp.lifecycle_stage}")

In [None]:
# List registered models
if MLFLOW_HEALTHY:
    models = client.search_registered_models()
    
    print("=" * 60)
    print("REGISTERED MODELS")
    print("=" * 60)
    
    if models:
        for model in models:
            print(f"\nModel: {model.name}")
            print(f"  Description: {model.description or 'N/A'}")
            
            # Get latest versions
            for version in model.latest_versions:
                print(f"  Version {version.version}:")
                print(f"    Stage: {version.current_stage}")
                print(f"    Run ID: {version.run_id[:8]}...")
    else:
        print("No registered models found")

In [None]:
# Get runs from fraud-detection experiment
if MLFLOW_HEALTHY:
    try:
        exp = client.get_experiment_by_name("fraud-detection")
        if exp:
            runs = client.search_runs(
                experiment_ids=[exp.experiment_id],
                max_results=5,
                order_by=["start_time DESC"]
            )
            
            print("Recent runs from 'fraud-detection' experiment:")
            print("-" * 60)
            for run in runs:
                print(f"\nRun: {run.info.run_id[:8]}...")
                print(f"  Status: {run.info.status}")
                print(f"  Start: {run.info.start_time}")
                
                # Show key metrics
                metrics = run.data.metrics
                if 'accuracy' in metrics:
                    print(f"  Accuracy: {metrics['accuracy']:.4f}")
                if 'f1_score' in metrics:
                    print(f"  F1 Score: {metrics['f1_score']:.4f}")
        else:
            print("Experiment 'fraud-detection' not found")
    except Exception as e:
        print(f"Error: {e}")

In [None]:
# Load a model (if available)
if MLFLOW_HEALTHY:
    try:
        # Try to load production model
        model_uri = "models:/fraud-detector/Production"
        model = mlflow.pyfunc.load_model(model_uri)
        print(f"Loaded model from: {model_uri}")
        print(f"Model type: {type(model)}")
    except Exception as e:
        print(f"Could not load production model: {e}")
        print("This is expected if no model is in Production stage")

---
## 7. Feature Registry & Generator <a id='7-feature-registry'></a>

Explore the feature registry YAML and code generator.

In [None]:
import yaml

# Load feature registry
registry_path = os.path.join(PROJECT_ROOT, 'feature_registry', 'fraud_detection.yaml')

with open(registry_path, 'r') as f:
    registry = yaml.safe_load(f)

print("=" * 60)
print("FEATURE REGISTRY STRUCTURE")
print("=" * 60)
print(f"Project: {registry['project']['name']}")
print(f"Version: {registry['project']['version']}")
print(f"Description: {registry['project']['description']}")

In [None]:
# Explore entities
print("ENTITIES")
print("-" * 40)
for name, entity in registry['entities'].items():
    print(f"  {name}:")
    print(f"    Join Key: {entity['join_key']}")
    print(f"    Type: {entity['type']}")
    print(f"    Description: {entity['description']}")

In [None]:
# Explore feature groups
print("FEATURE GROUPS")
print("-" * 40)
for group_name, group in registry['features'].items():
    columns = group.get('columns', [])
    print(f"\n{group_name}:")
    print(f"  Entity: {group.get('entity', 'N/A')}")
    print(f"  TTL Days: {group.get('ttl_days', 'N/A')}")
    print(f"  Online: {group.get('online', False)}")
    print(f"  Columns: {len(columns)}")
    
    # Show first 3 columns as sample
    for col in columns[:3]:
        print(f"    - {col['name']}: {col['type']}")

In [None]:
# Explore streaming configuration
if 'streaming' in registry:
    streaming = registry['streaming']
    print("STREAMING CONFIGURATION")
    print("-" * 40)
    print(f"Source Topic: {streaming.get('source_topic')}")
    print(f"Output Topic: {streaming.get('output_topic')}")
    print(f"Value Format: {streaming.get('value_format')}")
    
    # Windows
    print(f"\nWindows:")
    for window in streaming.get('windows', []):
        duration = window['duration_seconds']
        agg_count = len(window.get('aggregations', []))
        print(f"  {window['name']}: {duration}s, {agg_count} aggregations")
    
    # Computed flags
    print(f"\nComputed Flags:")
    for flag in streaming.get('computed_flags', []):
        print(f"  {flag['name']}: {flag['logic']}")

In [None]:
# Use the generator
try:
    from feature_registry.generator import FeatureGenerator
    
    generator = FeatureGenerator.from_yaml(registry_path)
    
    print("Feature Generator loaded successfully!")
    print(f"Project: {generator.registry.project_name}")
    print(f"Source Table: {generator.registry.source_table}")
    print(f"Output Table: {generator.registry.output_table}")
    print(f"Feature Groups: {len(generator.registry.feature_groups)}")
    print(f"Feature Order: {len(generator.registry.feature_order)} features")
except ImportError as e:
    print(f"Could not import generator: {e}")

In [None]:
# Generate SQL sample
try:
    sql = generator.generate_sql()
    print("GENERATED SQL (first 100 lines):")
    print("=" * 60)
    for i, line in enumerate(sql.split('\n')[:100]):
        print(f"{i+1:3d} | {line}")
except Exception as e:
    print(f"Error: {e}")

In [None]:
# Generate ksqlDB sample
try:
    if generator.registry.streaming:
        ksql = generator.generate_ksqldb_streams()
        print("GENERATED ksqlDB STREAMS (first 50 lines):")
        print("=" * 60)
        for i, line in enumerate(ksql.split('\n')[:50]):
            print(f"{i+1:3d} | {line}")
    else:
        print("No streaming config in registry")
except Exception as e:
    print(f"Error: {e}")

---
## 8. Streaming - Kafka <a id='8-kafka'></a>

Explore Kafka topics and messages.

In [2]:
from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer
from kafka.admin import NewTopic

# Connect to Kafka
try:
    admin = KafkaAdminClient(bootstrap_servers=KAFKA_BOOTSTRAP)
    print(f"Kafka connection successful!")
    KAFKA_HEALTHY = True
except Exception as e:
    print(f"Kafka connection failed: {e}")
    KAFKA_HEALTHY = False

ModuleNotFoundError: No module named 'kafka'

In [None]:
# List all topics
if KAFKA_HEALTHY:
    topics = admin.list_topics()
    
    print("=" * 60)
    print("KAFKA TOPICS")
    print("=" * 60)
    
    # Group by prefix
    fraud_topics = [t for t in topics if t.startswith('fraud')]
    cv_topics = [t for t in topics if t.startswith('cv')]
    debezium_topics = [t for t in topics if t.startswith('debezium')]
    internal_topics = [t for t in topics if t.startswith('_')]
    other_topics = [t for t in topics if t not in fraud_topics + cv_topics + debezium_topics + internal_topics]
    
    print(f"\nFraud Topics ({len(fraud_topics)}):")
    for t in fraud_topics:
        print(f"  - {t}")
    
    print(f"\nCV Topics ({len(cv_topics)}):")
    for t in cv_topics:
        print(f"  - {t}")
    
    print(f"\nDebezium Topics ({len(debezium_topics)}):")
    for t in debezium_topics:
        print(f"  - {t}")
    
    print(f"\nOther Topics ({len(other_topics)}):")
    for t in other_topics:
        print(f"  - {t}")

In [None]:
# Consume sample messages from a topic
def sample_topic(topic_name, max_messages=3, timeout_ms=5000):
    """Consume sample messages from a topic."""
    consumer = KafkaConsumer(
        topic_name,
        bootstrap_servers=KAFKA_BOOTSTRAP,
        auto_offset_reset='earliest',
        consumer_timeout_ms=timeout_ms,
        value_deserializer=lambda x: x.decode('utf-8') if x else None,
    )
    
    messages = []
    for msg in consumer:
        messages.append({
            'partition': msg.partition,
            'offset': msg.offset,
            'key': msg.key.decode('utf-8') if msg.key else None,
            'value': msg.value[:500] if msg.value and len(msg.value) > 500 else msg.value,
        })
        if len(messages) >= max_messages:
            break
    
    consumer.close()
    return messages

# Sample fraud CDC topic
if KAFKA_HEALTHY and 'fraud.demo.fraud_transactions' in topics:
    print("Sample messages from fraud.demo.fraud_transactions:")
    print("-" * 60)
    msgs = sample_topic('fraud.demo.fraud_transactions', max_messages=2)
    for msg in msgs:
        print(f"\nPartition: {msg['partition']}, Offset: {msg['offset']}")
        try:
            value = json.loads(msg['value'])
            pprint(value)
        except:
            print(msg['value'])

In [None]:
# Produce a test message
if KAFKA_HEALTHY:
    producer = KafkaProducer(
        bootstrap_servers=KAFKA_BOOTSTRAP,
        value_serializer=lambda x: json.dumps(x).encode('utf-8'),
    )
    
    test_topic = 'notebook.test.topic'
    test_message = {
        'source': 'exploration_notebook',
        'timestamp': datetime.now().isoformat(),
        'data': {'test_field': 'test_value'}
    }
    
    future = producer.send(test_topic, value=test_message)
    result = future.get(timeout=10)
    
    print(f"Produced message to {test_topic}")
    print(f"  Partition: {result.partition}")
    print(f"  Offset: {result.offset}")
    
    producer.close()

---
## 9. Streaming - ksqlDB <a id='9-ksqldb'></a>

Explore ksqlDB streams and tables.

In [None]:
def ksql_query(statement):
    """Execute ksqlDB statement."""
    response = requests.post(
        f"{KSQLDB_URL}/ksql",
        json={"ksql": statement, "streamsProperties": {}},
        headers={"Content-Type": "application/vnd.ksql.v1+json"},
        timeout=30,
    )
    return response.json()

# Test connection
try:
    response = requests.get(f"{KSQLDB_URL}/healthcheck", timeout=5)
    print(f"ksqlDB health: {response.status_code}")
    KSQLDB_HEALTHY = response.status_code == 200
except Exception as e:
    print(f"ksqlDB connection failed: {e}")
    KSQLDB_HEALTHY = False

In [None]:
# List streams
if KSQLDB_HEALTHY:
    result = ksql_query("SHOW STREAMS;")
    
    print("=" * 60)
    print("KSQLDB STREAMS")
    print("=" * 60)
    
    if result and len(result) > 0:
        streams = result[0].get('streams', [])
        for stream in streams:
            print(f"\n{stream.get('name', 'N/A')}")
            print(f"  Topic: {stream.get('topic', 'N/A')}")
            print(f"  Format: {stream.get('keyFormat', 'N/A')} / {stream.get('valueFormat', 'N/A')}")

In [None]:
# List tables (aggregations)
if KSQLDB_HEALTHY:
    result = ksql_query("SHOW TABLES;")
    
    print("=" * 60)
    print("KSQLDB TABLES (Aggregations)")
    print("=" * 60)
    
    if result and len(result) > 0:
        tables = result[0].get('tables', [])
        for table in tables:
            print(f"\n{table.get('name', 'N/A')}")
            print(f"  Topic: {table.get('topic', 'N/A')}")
            print(f"  Windowed: {table.get('isWindowed', False)}")

In [None]:
# Describe a stream schema
if KSQLDB_HEALTHY:
    stream_name = "TRANSACTIONS_ENRICHED"  # Change as needed
    
    try:
        result = ksql_query(f"DESCRIBE {stream_name};")
        
        if result and len(result) > 0:
            info = result[0].get('sourceDescription', {})
            fields = info.get('fields', [])
            
            print(f"Schema for {stream_name}:")
            print("-" * 40)
            for field in fields:
                print(f"  {field['name']}: {field['schema']['type']}")
    except Exception as e:
        print(f"Stream not found or error: {e}")

---
## 10. API - Fraud Detection <a id='10-fraud-api'></a>

Test the Fraud Detection API endpoints.

In [None]:
# Test API health
try:
    response = requests.get(f"{FRAUD_API_URL}/health", timeout=5)
    print(f"Fraud API Health: {response.status_code}")
    if response.status_code == 200:
        pprint(response.json())
    FRAUD_API_HEALTHY = response.status_code == 200
except Exception as e:
    print(f"Fraud API connection failed: {e}")
    FRAUD_API_HEALTHY = False

In [None]:
# Get API documentation
if FRAUD_API_HEALTHY:
    response = requests.get(f"{FRAUD_API_URL}/openapi.json", timeout=5)
    if response.status_code == 200:
        openapi = response.json()
        print("API Endpoints:")
        print("-" * 40)
        for path, methods in openapi.get('paths', {}).items():
            for method in methods.keys():
                if method in ['get', 'post', 'put', 'delete']:
                    print(f"  {method.upper():6s} {path}")

In [None]:
# Make a prediction request
if FRAUD_API_HEALTHY:
    prediction_request = {
        "transaction_id": "notebook_test_001",
        "customer_id": "cust_notebook_test",
        "amount": 150.00,
        "quantity": 2,
        "country": "US",
        "device_type": "mobile",
        "payment_method": "credit_card",
        "category": "Electronics",
        "account_age_days": 365,
        "tx_hour": 14,
        "tx_dayofweek": 2
    }
    
    try:
        response = requests.post(
            f"{FRAUD_API_URL}/predict",
            json=prediction_request,
            timeout=30
        )
        
        print("Prediction Request:")
        pprint(prediction_request)
        print("\nPrediction Response:")
        pprint(response.json())
    except Exception as e:
        print(f"Prediction failed: {e}")

In [None]:
# Batch prediction
if FRAUD_API_HEALTHY:
    batch_request = {
        "transactions": [
            {
                "transaction_id": f"batch_{i}",
                "customer_id": f"cust_{i}",
                "amount": 100.0 + i * 50,
                "quantity": 1,
                "country": "US",
                "device_type": "desktop",
                "payment_method": "credit_card",
                "category": "Electronics",
                "account_age_days": 100,
                "tx_hour": 10,
                "tx_dayofweek": 3
            }
            for i in range(5)
        ]
    }
    
    try:
        response = requests.post(
            f"{FRAUD_API_URL}/predict/batch",
            json=batch_request,
            timeout=60
        )
        
        print("Batch Prediction Results:")
        results = response.json()
        for r in results.get('predictions', results)[:5]:
            print(f"  {r.get('transaction_id')}: score={r.get('fraud_score', r.get('score', 'N/A'))}")
    except Exception as e:
        print(f"Batch prediction failed: {e}")

---
## 11. API - Serving Layer <a id='11-serving'></a>

Explore the modular serving layer components.

In [None]:
# Import serving layer modules
try:
    from api.serving.features.feature_service import FeatureService
    from api.serving.models.model_loader import ModelLoader
    from api.serving.scoring.adjustments import ScoreAdjuster
    
    print("Serving layer modules imported successfully!")
    print("\nAvailable classes:")
    print("  - FeatureService: Unified feature retrieval (Feast + Redis)")
    print("  - ModelLoader: MLflow model loading with caching")
    print("  - ScoreAdjuster: Score adjustments and business rules")
except ImportError as e:
    print(f"Could not import serving modules: {e}")

In [None]:
# Explore FeatureService
try:
    import inspect
    from api.serving.features.feature_service import FeatureService
    
    print("FeatureService Methods:")
    print("-" * 40)
    for name, method in inspect.getmembers(FeatureService, predicate=inspect.isfunction):
        if not name.startswith('_'):
            sig = inspect.signature(method)
            print(f"  {name}{sig}")
except Exception as e:
    print(f"Error: {e}")

In [None]:
# Read serving layer source code
feature_service_path = os.path.join(PROJECT_ROOT, 'api', 'serving', 'features', 'feature_service.py')

if os.path.exists(feature_service_path):
    with open(feature_service_path, 'r') as f:
        content = f.read()
    
    print("FeatureService Source (first 80 lines):")
    print("=" * 60)
    for i, line in enumerate(content.split('\n')[:80]):
        print(f"{i+1:3d} | {line}")
else:
    print(f"File not found: {feature_service_path}")

---
## 12. Monitoring & Metrics <a id='12-monitoring'></a>

Explore the monitoring infrastructure.

In [None]:
# Import monitoring utilities
try:
    from src.core.monitoring import get_or_create_counter, get_or_create_histogram, track_execution
    
    print("Monitoring utilities loaded!")
    print("\nAvailable functions:")
    print("  - get_or_create_counter: Safe Prometheus counter creation")
    print("  - get_or_create_histogram: Safe Prometheus histogram creation")
    print("  - track_execution: Decorator for automatic metrics tracking")
except ImportError as e:
    print(f"Could not import monitoring: {e}")

In [None]:
# Check Prometheus metrics endpoint (if API exposes it)
if FRAUD_API_HEALTHY:
    try:
        response = requests.get(f"{FRAUD_API_URL}/metrics", timeout=5)
        if response.status_code == 200:
            print("Prometheus Metrics (sample):")
            print("-" * 60)
            # Show first 30 lines
            for line in response.text.split('\n')[:30]:
                print(line)
        else:
            print(f"Metrics endpoint returned: {response.status_code}")
    except Exception as e:
        print(f"Metrics endpoint not available: {e}")

In [None]:
# Read monitoring configuration
prometheus_config = os.path.join(PROJECT_ROOT, 'monitoring', 'prometheus', 'prometheus.yml')

if os.path.exists(prometheus_config):
    with open(prometheus_config, 'r') as f:
        config = yaml.safe_load(f)
    
    print("Prometheus Scrape Configs:")
    print("-" * 40)
    for job in config.get('scrape_configs', []):
        print(f"\nJob: {job.get('job_name')}")
        targets = job.get('static_configs', [{}])[0].get('targets', [])
        for target in targets:
            print(f"  - {target}")
else:
    print("Prometheus config not found")

---
## 13. Dagster Pipelines <a id='13-dagster'></a>

Explore Dagster assets and jobs.

In [None]:
# List pipeline files
pipelines_dir = os.path.join(PROJECT_ROOT, 'pipelines')

print("Pipeline Files:")
print("=" * 60)

for f in sorted(os.listdir(pipelines_dir)):
    if f.endswith('.py') and not f.startswith('__'):
        filepath = os.path.join(pipelines_dir, f)
        size = os.path.getsize(filepath)
        
        # Count lines
        with open(filepath, 'r') as file:
            lines = len(file.readlines())
        
        print(f"  {f:40s} {lines:5d} lines")

In [None]:
# Explore MLOps pipeline structure
mlops_path = os.path.join(PROJECT_ROOT, 'pipelines', 'mlops.py')

if os.path.exists(mlops_path):
    with open(mlops_path, 'r') as f:
        content = f.read()
    
    # Find all @asset decorated functions
    import re
    assets = re.findall(r'@asset[^\n]*\ndef (\w+)', content)
    
    print("Assets in mlops.py:")
    print("-" * 40)
    for asset in assets:
        print(f"  - {asset}")
    
    # Find all sensors
    sensors = re.findall(r'@sensor[^\n]*\ndef (\w+)', content)
    if sensors:
        print("\nSensors:")
        for sensor in sensors:
            print(f"  - {sensor}")
    
    # Find all jobs
    jobs = re.findall(r'define_asset_job\(["\']([\w_]+)', content)
    if jobs:
        print("\nJobs:")
        for job in jobs:
            print(f"  - {job}")

In [None]:
# View Dagster definitions file
definitions_path = os.path.join(PROJECT_ROOT, 'pipelines', 'definitions.py')

if os.path.exists(definitions_path):
    with open(definitions_path, 'r') as f:
        content = f.read()
    
    print("Dagster Definitions (first 100 lines):")
    print("=" * 60)
    for i, line in enumerate(content.split('\n')[:100]):
        print(f"{i+1:3d} | {line}")
else:
    print("definitions.py not found")

---
## 14. End-to-End Flow Test <a id='14-e2e'></a>

Test the complete data flow from ingestion to inference.

In [None]:
# Summary of all connections
print("=" * 60)
print("PLATFORM HEALTH SUMMARY")
print("=" * 60)

services = [
    ("LakeFS", LAKEFS_URL, lakefs_healthy if 'lakefs_healthy' in dir() else False),
    ("Trino", f"{TRINO_HOST}:{TRINO_PORT}", TRINO_HEALTHY if 'TRINO_HEALTHY' in dir() else False),
    ("Redis", f"{REDIS_HOST}:{REDIS_PORT}", REDIS_HEALTHY if 'REDIS_HEALTHY' in dir() else False),
    ("MLflow", MLFLOW_URL, MLFLOW_HEALTHY if 'MLFLOW_HEALTHY' in dir() else False),
    ("Kafka", KAFKA_BOOTSTRAP, KAFKA_HEALTHY if 'KAFKA_HEALTHY' in dir() else False),
    ("ksqlDB", KSQLDB_URL, KSQLDB_HEALTHY if 'KSQLDB_HEALTHY' in dir() else False),
    ("Fraud API", FRAUD_API_URL, FRAUD_API_HEALTHY if 'FRAUD_API_HEALTHY' in dir() else False),
]

for name, url, healthy in services:
    status = "✅" if healthy else "❌"
    print(f"{status} {name:12s} | {url}")

In [None]:
# Data flow diagram
print("""
=============================================================================
                        END-TO-END DATA FLOW
=============================================================================

1. DATA INGESTION
   MySQL → Debezium CDC → Kafka (fraud.demo.fraud_transactions)

2. BATCH PROCESSING  
   Kafka → Dagster (mlops_bronze) → Trino/Iceberg (bronze.fraud_transactions)
         → Dagster (mlops_gold) → Trino/Iceberg (gold.fraud_training_data)

3. STREAMING PROCESSING
   Kafka → ksqlDB (enrichment + windowing) → Kafka (fraud.streaming.features)
         → Dagster Sensor → Redis (feast:streaming:*)

4. FEATURE STORE
   Trino/Iceberg → Feast (offline store)
   Redis → Feast (online store - streaming features)

5. MODEL TRAINING
   Feast (offline) → Training → MLflow (model registry)

6. INFERENCE
   API Request → Feature Service (Feast + Redis) → Model → Prediction

7. MONITORING
   All services → Prometheus → Grafana

=============================================================================
""")

In [None]:
print("""
=============================================================================
                     EXPLORATION COMPLETE!
=============================================================================

You've explored:

✓ Settings & Configuration (Pydantic)
✓ LakeFS (Data Versioning)
✓ Trino/Iceberg (Data Warehouse)
✓ Redis (Caching & Streaming Features)
✓ MLflow (Model Registry)
✓ Feature Registry & Generator
✓ Kafka (Event Streaming)
✓ ksqlDB (Stream Processing)
✓ Fraud Detection API
✓ Serving Layer Architecture
✓ Monitoring Infrastructure
✓ Dagster Pipelines

Next Steps:
1. Modify cells to explore specific areas in more depth
2. Try the Quick Test commands in STREAMING_ARCHITECTURE.md
3. Run the full pipeline using Dagster UI (http://localhost:13000)
4. Check Grafana dashboards (http://localhost:3002)

=============================================================================
""")