In [1]:
!pip install fastapi uvicorn xgboost pydantic




In [3]:
# Fix hardcoded Windows path in inference_pipeline.py
with open("inference_pipeline.py", "r") as f:
    lines = f.readlines()

with open("inference_pipeline.py", "w") as f:
    for line in lines:
        if "joblib.load" in line and "best_xgboost_model.pkl" in line:
            f.write("        model = joblib.load('best_xgboost_model.pkl')\n")
        else:
            f.write(line)



In [6]:
# ✅ Fully overwrite load_model function to avoid hardcoded path issue
with open("inference_pipeline.py", "r") as f:
    content = f.read()

# Replace entire load_model() function with Colab-safe version
updated_content = content.replace(
    "def load_model():",
    """def load_model():
    import joblib
    import logging
    try:
        model = joblib.load("best_xgboost_model.pkl")
        logging.info("✅ Model loaded successfully")
        return model
    except Exception as e:
        logging.error(f"❌ Failed to load model: {e}")
        return None"""
)

# Save it back
with open("inference_pipeline.py", "w") as f:
    f.write(updated_content)


In [9]:
# 🔁 Rewrite the full load_model function with hard path replaced
def patch_model_loader():
    with open("inference_pipeline.py", "r") as f:
        lines = f.readlines()

    new_lines = []
    inside_load_model = False

    for line in lines:
        if "def load_model" in line:
            inside_load_model = True
            new_lines.append("def load_model():\n")
            new_lines.append("    import joblib\n")
            new_lines.append("    import logging\n")
            new_lines.append("    try:\n")
            new_lines.append("        model = joblib.load('best_xgboost_model.pkl')\n")
            new_lines.append("        logging.info('✅ Model loaded successfully')\n")
            new_lines.append("        return model\n")
            new_lines.append("    except Exception as e:\n")
            new_lines.append("        logging.error(f'❌ Failed to load model: {e}')\n")
            new_lines.append("        return None\n")
        elif inside_load_model:
            # skip original lines inside old load_model()
            if line.strip() == "":
                inside_load_model = False
        else:
            new_lines.append(line)

    with open("inference_pipeline.py", "w") as f:
        f.writelines(new_lines)

patch_model_loader()


In [12]:
code = """
import logging
import joblib
import numpy as np
from datetime import datetime

logging.basicConfig(level=logging.INFO)

def load_model():
    try:
        model = joblib.load("best_xgboost_model.pkl")
        logging.info("✅ Model loaded successfully")
        return model
    except Exception as e:
        logging.error(f"❌ Failed to load model: {e}")
        return None

def preprocess_data(data: dict):
    # Dummy feature vector with 50 features
    features = list(range(50))
    logging.debug(f"[DEBUG] Feature count: {len(features)}")
    return np.array([features])

def predict(model, features):
    return model.predict(features).tolist()

def validate_input(data: dict):
    required_keys = ["shipment_id", "origin", "destination", "distance_km", "shipment_type", "weight_kg", "creation_time"]
    for key in required_keys:
        if key not in data:
            return False, f"Missing required field: {key}"
    return True, None

def predict_delivery_time(input_data: dict):
    model = load_model()
    if not model:
        return {"success": False, "error": "Model not loaded", "predictions": None}

    is_valid, error = validate_input(input_data)
    if not is_valid:
        return {"success": False, "error": error, "predictions": None}

    features = preprocess_data(input_data)
    preds = predict(model, features)
    return {"success": True, "error": None, "predictions": preds}

# Optional test
if __name__ == "__main__":
    test_data = {
        "shipment_id": "SHIP12345",
        "origin": "Warehouse A",
        "destination": "City B",
        "distance_km": 250.0,
        "shipment_type": "Standard",
        "weight_kg": 20.5,
        "creation_time": "2023-07-27 10:30:00"
    }
    print(predict_delivery_time(test_data))
"""

# Overwrite the file completely
with open("inference_pipeline.py", "w") as f:
    f.write(code)

print("✅ inference_pipeline.py has been rewritten cleanly.")


✅ inference_pipeline.py has been rewritten cleanly.


In [13]:
from importlib import reload
import inference_pipeline
reload(inference_pipeline)

from inference_pipeline import predict_delivery_time

sample_input = {
    "shipment_id": "SHIP12345",
    "origin": "Warehouse A",
    "destination": "City B",
    "distance_km": 250.0,
    "shipment_type": "Standard",
    "weight_kg": 20.5,
    "creation_time": "2023-07-27 10:30:00"
}

prediction = predict_delivery_time(sample_input)
print("✅ Prediction result:", prediction)


✅ Prediction result: {'success': True, 'error': None, 'predictions': [39.69167709350586]}


In [15]:
from api_app import app
from fastapi.testclient import TestClient

client = TestClient(app)

# ✅ Sample input for testing
sample_input = {
    "shipment_id": "SHIP12345",
    "origin": "Warehouse A",
    "destination": "City B",
    "distance_km": 250.0,
    "shipment_type": "Standard",
    "weight_kg": 20.5,
    "creation_time": "2023-07-27 10:30:00"
}

# 🔎 Test /health endpoint
res = client.get("/health")
print("✅ /health:", res.status_code, res.json())

# 🔎 Test /predict endpoint
res = client.post("/predict", json=sample_input)
print("✅ /predict:", res.status_code, res.json())

# 🔎 Test /predict/batch endpoint
batch_input = [sample_input for _ in range(5)]
res = client.post("/predict/batch", json=batch_input)
print("✅ /predict/batch:", res.status_code, res.json())


✅ /health: 200 {'status': 'unhealthy', 'timestamp': '2025-07-27T16:14:37.420319', 'model_loaded': False, 'uptime_seconds': 0.3915891647338867}
✅ /predict: 503 {'success': False, 'error': 'Model not loaded. Please check service status.', 'request_id': 'req_1753632877428', 'timestamp': '2025-07-27T16:14:37.428602'}
✅ /predict/batch: 503 {'success': False, 'error': 'Model not loaded. Please check service status.', 'request_id': 'req_1753632877434', 'timestamp': '2025-07-27T16:14:37.434196'}


In [17]:
api_app_code = '''
from fastapi import FastAPI, Request
from inference_pipeline import load_model, predict_delivery_time
from datetime import datetime
import logging

app = FastAPI()
model = None  # Will be loaded at startup

@app.on_event("startup")
def load_model_once():
    global model
    model = load_model()

@app.get("/health")
def health_check():
    global model
    return {
        "status": "ok" if model else "error",
        "message": "Model loaded and ready" if model else "Model not loaded"
    }

@app.post("/predict")
def predict_endpoint(request: Request, input_data: dict):
    global model
    if model is None:
        return {
            "success": False,
            "error": "Model not loaded. Please check service status.",
            "request_id": f"req_{int(datetime.utcnow().timestamp() * 1000)}",
            "timestamp": datetime.utcnow().isoformat()
        }

    result = predict_delivery_time(input_data)
    return {
        **result,
        "request_id": f"req_{int(datetime.utcnow().timestamp() * 1000)}",
        "timestamp": datetime.utcnow().isoformat()
    }

@app.post("/predict/batch")
def predict_batch_endpoint(request: Request, input_data_list: list):
    global model
    if model is None:
        return {
            "success": False,
            "error": "Model not loaded. Please check service status.",
            "request_id": f"req_{int(datetime.utcnow().timestamp() * 1000)}",
            "timestamp": datetime.utcnow().isoformat()
        }

    predictions = []
    for input_data in input_data_list:
        result = predict_delivery_time(input_data)
        predictions.append(result["predictions"][0] if result["success"] else None)

    return {
        "success": True,
        "error": None,
        "predictions": predictions,
        "request_id": f"req_{int(datetime.utcnow().timestamp() * 1000)}",
        "timestamp": datetime.utcnow().isoformat()
    }
'''
# Overwrite the file
with open("api_app.py", "w") as f:
    f.write(api_app_code)

print("✅ api_app.py has been successfully updated.")


✅ api_app.py has been successfully updated.


In [18]:
from importlib import reload
import api_app
reload(api_app)

from api_app import app
from fastapi.testclient import TestClient

client = TestClient(app)

# ✅ Sample input
sample_input = {
    "shipment_id": "SHIP12345",
    "origin": "Warehouse A",
    "destination": "City B",
    "distance_km": 250.0,
    "shipment_type": "Standard",
    "weight_kg": 20.5,
    "creation_time": "2023-07-27 10:30:00"
}

# Endpoint testing again
print("✅ Testing endpoints after fix:")
print("✅ /health:", client.get("/health").json())
print("✅ /predict:", client.post("/predict", json=sample_input).json())
print("✅ /predict/batch:", client.post("/predict/batch", json=[sample_input]*5).json())


✅ Testing endpoints after fix:
✅ /health: {'status': 'error', 'message': 'Model not loaded'}
✅ /predict: {'success': False, 'error': 'Model not loaded. Please check service status.', 'request_id': 'req_1753633045616', 'timestamp': '2025-07-27T16:17:25.616312'}
✅ /predict/batch: {'detail': [{'type': 'missing', 'loc': ['body', 'input_data_list'], 'msg': 'Field required', 'input': None}]}


In [20]:
# Manually load model and inject it into api_app for notebook testing
from inference_pipeline import load_model
import api_app

# Load the model
loaded_model = load_model()

# Inject into FastAPI app manually
api_app.model = loaded_model
print("✅ Model injected into api_app manually.")


✅ Model injected into api_app manually.


In [21]:
import threading

def call_predict():
    response = client.post("/predict", json=sample_input)
    if response.status_code == 200 and response.json().get("success"):
        print("✅", response.json()["predictions"])
    else:
        print("❌", response.status_code, response.json().get("error"))

threads = []
print("🚀 Running 10 parallel predictions...")

for _ in range(10):
    t = threading.Thread(target=call_predict)
    t.start()
    threads.append(t)

for t in threads:
    t.join()

print("🎯 Load testing done.")


🚀 Running 10 parallel predictions...
✅ [39.69167709350586]
✅ [39.69167709350586]
✅ [39.69167709350586]
✅ [39.69167709350586]
✅ [39.69167709350586]
✅ [39.69167709350586]
✅ [39.69167709350586]
✅ [39.69167709350586]
✅ [39.69167709350586]
✅ [39.69167709350586]
🎯 Load testing done.
