In [1]:
import mlflow
import mlflow.sklearn
import mlflow.pyfunc
from mlflow.tracking import MlflowClient
from mlflow.models import infer_signature
from datetime import datetime
import os
import json
from typing import Dict, List, Optional, Any

from sqlalchemy import create_engine, text, Column, Integer, String, Float, DateTime, Text, Boolean, BigInteger, PrimaryKeyConstraint, Index, func
from sqlalchemy.orm import declarative_base, sessionmaker, Session
from sqlalchemy.dialects.postgresql import JSON

import pandas as pd
from google.cloud import bigquery
import logging
from pydantic import BaseModel

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

MLFLOW_TRACKING_URI = "https://mlflow-server-555196125082.us-west1.run.app"  # Replace with your MLflow server

model_name = "HousingModel"
stage = "Production"

# Set up MLflow
mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)

model_uri = f"models:/{model_name}/{stage}"

try:
    # Load model
    loaded_model = mlflow.pyfunc.load_model(model_uri)
    print("✅ Model loaded successfully!")
except Exception as e:
    print(f"❌ Model loading/prediction failed: {e}")

# get data from last two weeks via connecting to BigQuery
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql+psycopg2://mlflow:GGDVQDZsp_$57xPL@34.83.81.99:5432/feast")

db_engine = create_engine(
    DATABASE_URL,
    pool_size=1,
    max_overflow=2,
    pool_timeout=30,
    pool_recycle=300,
    pool_pre_ping=True,
    echo=False,
    pool_reset_on_return='commit'
)

column_names = []

with db_engine.connect() as conn:
    result = conn.execute(text("SELECT distinct feature_name FROM public.housing_deployml2025_housing_features"))
    rows = result.fetchall()
    for row in rows:
        print(row)
        column_names.append(row[0])



PROJECT_ID = "mldeploy-468919"
DATASET_ID = "feast_housing"

client = bigquery.Client(project="mldeploy-468919", location="US")
QUERY = f"""
SELECT {','.join(column_names)}, mls_id
FROM {PROJECT_ID}.{DATASET_ID}.house_data
WHERE (DATE(event_timestamp) < DATE("2001-01-01"))
LIMIT 10
"""

df = client.query(QUERY).to_dataframe()

df.head()

  from .autonotebook import tqdm as notebook_tqdm
Downloading artifacts: 100%|██████████| 7/7 [00:00<00:00, 10.71it/s]


✅ Model loaded successfully!
('bedrooms',)
('city',)
('area_sqft',)
('status',)
('bathrooms',)
('days_on_market',)
('price',)
('year_built',)
('listing_agent',)
('property_type',)
('zipcode_encoded',)
('state',)
('lot_size',)




Unnamed: 0,bedrooms,city,area_sqft,status,bathrooms,days_on_market,price,year_built,listing_agent,property_type,zipcode_encoded,state,lot_size,mls_id
0,5,2,1628,1,3,76,1388656,2002,4,0,1388656.0,3,2454,463975
1,6,2,3814,2,3,71,898597,1957,2,1,898597.0,1,2093,574837
2,2,1,3442,1,4,108,1353530,1952,0,2,1353530.0,1,3651,669827
3,4,4,2789,2,3,48,773067,2001,2,2,773067.0,2,7794,294707
4,1,3,3165,1,2,2,579380,1995,3,2,579380.0,4,7690,731935


In [2]:
Base = declarative_base()

class PredictionMetrics(Base):
    __tablename__ = 'prediction_metrics'
    
    id = Column(Integer, primary_key=True, autoincrement=True)
    request_id = Column(String, unique=True, index=True)
    mls_ids = Column(JSON)  # Array of MLS IDs
    predictions = Column(JSON)  # Array of prediction results
    processing_time_ms = Column(Float)
    feature_source = Column(String)
    model_name = Column(String)
    model_stage = Column(String)
    model_version = Column(String)
    scoring_timestamp = Column(DateTime, default=datetime.utcnow, index=True)  # When data was scored
    created_at = Column(DateTime, default=datetime.utcnow)  # When record was created
    success = Column(Boolean, default=True)
    error_message = Column(Text, nullable=True)

    __table_args__ = (
        Index('idx_prediction_metrics_scoring_ts', 'scoring_timestamp'),
    )
    
start_time = datetime.now()
request_id = f"req_{start_time.strftime('%Y%m%d_%H%M%S_%f')}"       
feature_source = 'feast_api'
model_stage = stage
MODEL_STAGE = model_stage
model_version="1"
mls_ids = df['mls_id'].to_list()
MODEL_NAME = model_name

In [None]:
predictions = loaded_model.predict(df.drop(columns=['price']))

class PredictionResult(BaseModel):
    mls_id: int
    predicted_price: float
    formatted_price: str

def log_prediction_metrics(request_id: str, mls_ids: List[int], predictions: List[dict], 
                          processing_time: float, feature_source: str, success: bool, 
                          features_df: pd.DataFrame = None, model_version: str = "1",
                          error_message: str = None, db_session=None):
    """Log comprehensive prediction metrics to database - now optional"""
    if db_session is None:
        logger.debug("Database session not available - skipping metrics logging")
        return
    
    try:
        scoring_time = datetime.utcnow()
        
        # Log main prediction metrics
        metric = PredictionMetrics(
            request_id=request_id,
            mls_ids=mls_ids,
            predictions=predictions,
            processing_time_ms=processing_time,
            feature_source=feature_source,
            model_name=MODEL_NAME,
            model_stage=MODEL_STAGE,
            model_version=model_version,
            scoring_timestamp=scoring_time,
            success=success,
            error_message=error_message
        )
        db_session.add(metric)        
        db_session.commit()
        logger.info(f"📊 Comprehensive metrics logged for request {request_id} ({len(mls_ids)} predictions)")
        
    except Exception as e:
        logger.warning(f"Failed to log metrics (non-critical): {e}")
        try:
            db_session.rollback()
        except:
            pass


results = []
for mls_id, prediction in zip(df['mls_id'].to_list(), predictions):
    results.append({
        'mls_id':mls_id,
        'predicted_price':float(prediction),
        'formatted_price':f"${prediction:,.2f}"}
    )

end_time = datetime.now()
processing_time = (end_time - start_time).total_seconds() * 1000 



In [5]:
MODEL_FEATURE_ORDER = [
    'city', 'state', 'bedrooms', 'bathrooms', 'area_sqft', 
    'lot_size', 'year_built', 'days_on_market', 'property_type', 
    'listing_agent', 'status', 'zipcode_encoded'
]

feature_columns = [col for col in MODEL_FEATURE_ORDER if col in df.columns]
features_df = df[feature_columns]
DATABASE_URL = "postgresql+psycopg2://mlflow:GGDVQDZsp_$57xPL@34.83.81.99:5432/metrics"

# Create engine
engine = create_engine(DATABASE_URL, echo=True, future=True)

# Create session factory
SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False)

# Open a session
db = SessionLocal()

In [6]:
log_prediction_metrics(
    request_id=request_id,
    mls_ids=mls_ids,
    predictions=results,
    processing_time=processing_time,
    feature_source=feature_source,
    success=True,
    features_df=features_df,
    model_version="1",
    db_session=db
)

2025-08-18 17:26:04,897 INFO sqlalchemy.engine.Engine select pg_catalog.version()


INFO:sqlalchemy.engine.Engine:select pg_catalog.version()


2025-08-18 17:26:04,900 INFO sqlalchemy.engine.Engine [raw sql] {}


INFO:sqlalchemy.engine.Engine:[raw sql] {}


2025-08-18 17:26:04,960 INFO sqlalchemy.engine.Engine select current_schema()


INFO:sqlalchemy.engine.Engine:select current_schema()


2025-08-18 17:26:04,961 INFO sqlalchemy.engine.Engine [raw sql] {}


INFO:sqlalchemy.engine.Engine:[raw sql] {}


2025-08-18 17:26:05,021 INFO sqlalchemy.engine.Engine show standard_conforming_strings


INFO:sqlalchemy.engine.Engine:show standard_conforming_strings


2025-08-18 17:26:05,021 INFO sqlalchemy.engine.Engine [raw sql] {}


INFO:sqlalchemy.engine.Engine:[raw sql] {}


2025-08-18 17:26:05,082 INFO sqlalchemy.engine.Engine BEGIN (implicit)


INFO:sqlalchemy.engine.Engine:BEGIN (implicit)


2025-08-18 17:26:05,085 INFO sqlalchemy.engine.Engine INSERT INTO prediction_metrics (request_id, mls_ids, predictions, processing_time_ms, feature_source, model_name, model_stage, model_version, scoring_timestamp, created_at, success, error_message) VALUES (%(request_id)s, %(mls_ids)s, %(predictions)s, %(processing_time_ms)s, %(feature_source)s, %(model_name)s, %(model_stage)s, %(model_version)s, %(scoring_timestamp)s, %(created_at)s, %(success)s, %(error_message)s) RETURNING prediction_metrics.id


INFO:sqlalchemy.engine.Engine:INSERT INTO prediction_metrics (request_id, mls_ids, predictions, processing_time_ms, feature_source, model_name, model_stage, model_version, scoring_timestamp, created_at, success, error_message) VALUES (%(request_id)s, %(mls_ids)s, %(predictions)s, %(processing_time_ms)s, %(feature_source)s, %(model_name)s, %(model_stage)s, %(model_version)s, %(scoring_timestamp)s, %(created_at)s, %(success)s, %(error_message)s) RETURNING prediction_metrics.id


2025-08-18 17:26:05,085 INFO sqlalchemy.engine.Engine [generated in 0.00066s] {'request_id': 'req_20250818_172604_543290', 'mls_ids': '[463975, 574837, 669827, 294707, 731935, 651375, 128562, 351405, 402448, 275900]', 'predictions': '[{"mls_id": 463975, "predicted_price": 1387969.0840496037, "formatted_price": "$1,387,969.08"}, {"mls_id": 574837, "predicted_price": 895346.83870238 ... (634 characters truncated) ... e": 576484.8090211035, "formatted_price": "$576,484.81"}, {"mls_id": 275900, "predicted_price": 833303.1043586832, "formatted_price": "$833,303.10"}]', 'processing_time_ms': 29.27, 'feature_source': 'feast_api', 'model_name': 'HousingModel', 'model_stage': 'Production', 'model_version': '1', 'scoring_timestamp': datetime.datetime(2025, 8, 19, 0, 26, 4, 586355), 'created_at': datetime.datetime(2025, 8, 19, 0, 26, 5, 85092), 'success': True, 'error_message': None}


INFO:sqlalchemy.engine.Engine:[generated in 0.00066s] {'request_id': 'req_20250818_172604_543290', 'mls_ids': '[463975, 574837, 669827, 294707, 731935, 651375, 128562, 351405, 402448, 275900]', 'predictions': '[{"mls_id": 463975, "predicted_price": 1387969.0840496037, "formatted_price": "$1,387,969.08"}, {"mls_id": 574837, "predicted_price": 895346.83870238 ... (634 characters truncated) ... e": 576484.8090211035, "formatted_price": "$576,484.81"}, {"mls_id": 275900, "predicted_price": 833303.1043586832, "formatted_price": "$833,303.10"}]', 'processing_time_ms': 29.27, 'feature_source': 'feast_api', 'model_name': 'HousingModel', 'model_stage': 'Production', 'model_version': '1', 'scoring_timestamp': datetime.datetime(2025, 8, 19, 0, 26, 4, 586355), 'created_at': datetime.datetime(2025, 8, 19, 0, 26, 5, 85092), 'success': True, 'error_message': None}


2025-08-18 17:26:05,148 INFO sqlalchemy.engine.Engine COMMIT


INFO:sqlalchemy.engine.Engine:COMMIT
INFO:__main__:📊 Comprehensive metrics logged for request req_20250818_172604_543290 (10 predictions)


In [7]:
db.close()