In [21]:
# Colab / General setup
!pip -q install lightgbm shap psycopg2-binary transformers==4.41.2 huggingface_hub==0.33.5

import os, re, json, glob, joblib, math, textwrap, datetime as dt
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Callable, Tuple

import numpy as np
import pandas as pd
import lightgbm as lgb
import shap
from transformers import pipeline


[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/515.7 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m515.7/515.7 kB[0m [31m25.1 MB/s[0m eta [36m0:00:00[0m
[?25h

In [22]:
login(token="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")

# --- Search for model in Drive ---
model_name = "loan_default_predictor.pkl"
search_root = "/content/drive/MyDrive"
model_path = None

for root, dirs, files in os.walk(search_root):
    if model_name in files:
        model_path = os.path.join(root, model_name)
        break

if model_path:
    print(f"✅ Found model at: {model_path}")
else:
    raise FileNotFoundError(f"❌ Could not find {model_name} in {search_root}")

✅ Found model at: /content/drive/MyDrive/loan_default_predictor.pkl


In [24]:
sample_features = [[35, 500000, 250000, 60, 12.0, 2, 30, 1, 3, 12.5, 40, 5, 0, 0.2]]

try:
    if hasattr(lgb_model, "predict_proba"):
        risk = float(lgb_model.predict_proba(sample_features)[0][1])
    else:
        risk = float(lgb_model.predict(sample_features)[0])
    print(f"📊 Sample Risk Prediction: {risk:.3f}")
except Exception as e:
    print(f"⚠️ Prediction error: {e}")

⚠️ Prediction error: 'dict' object has no attribute 'predict'


In [25]:
# --- Mount Google Drive ---
from google.colab import drive
drive.mount('/content/drive')

# --- Model path ---
model_path = "/content/drive/MyDrive/Colab Notebooks/loan_default_predictor.pkl"



# --- Check if model file exists ---
if os.path.exists(model_path):
    print(f"✅ Found model at: {model_path}")
else:
    print(f"❌ Model file not found. Please check path: {model_path}")


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
✅ Found model at: /content/drive/MyDrive/Colab Notebooks/loan_default_predictor.pkl


In [27]:
def int_in_range(min_v: int, max_v: int):
    def _v(x: str):
        x = x.strip()
        if not re.fullmatch(r"-?\d+", x):
            return False, None, f"Please enter a whole number between {min_v} and {max_v}."
        val = int(x)
        if not (min_v <= val <= max_v):
            return False, None, f"Value must be between {min_v} and {max_v}."
        return True, val, ""
    return _v

def float_in_range(min_v: float, max_v: float):
    def _v(x: str):
        x = x.strip().replace(",", "")
        try:
            val = float(x)
        except ValueError:
            return False, None, f"Please enter a number between {min_v} and {max_v}."
        if not (min_v <= val <= max_v):
            return False, None, f"Value must be between {min_v} and {max_v}."
        return True, val, ""
    return _v

def positive_float():
    def _v(x: str):
        x = x.strip().replace(",", "")
        try:
            val = float(x)
        except ValueError:
            return False, None, "Please enter a positive number."
        if val <= 0:
            return False, None, "Value must be > 0."
        return True, val, ""
    return _v

def one_of(options: List[str]):
    lower_opts = [o.lower() for o in options]
    def _v(x: str):
        s = x.strip()
        if s.lower() not in lower_opts:
            return False, None, f"Please choose one of: {', '.join(options)}."
        return True, options[lower_opts.index(s.lower())], ""
    return _v


In [28]:
@dataclass
class FieldSpec:
    name: str
    prompt: str
    validator: Callable[[str], Tuple[bool, Optional[Any], str]]
    required: bool = True
    depends_on: Optional[Callable[[Dict[str, Any]], bool]] = None

def income_optional_if_unemployed_or_student(state: Dict[str, Any]) -> bool:
    status = state.get("EmploymentStatus", "").lower()
    return status not in {"unemployed", "student"}

def partial_payments_if_missed(state: Dict[str, Any]) -> bool:
    return state.get("MissedPayments", 0) > 0
SCHEMA: List[FieldSpec] = [
    FieldSpec(
        "CustomerID",
        "Enter a unique Customer ID (format: CUST0001):",
        lambda s: (
            bool(re.fullmatch(r"CUST\d{4}", s.strip())),
            s.strip(),
            "Customer ID must be in the format 'CUST' followed by 4 digits, e.g., CUST0001."
        )
    ),
    FieldSpec("Age", "Please enter your age (18–75):", int_in_range(18, 75)),
    FieldSpec("Income", "Annual income in INR (e.g., 450000):", positive_float(),
              depends_on=income_optional_if_unemployed_or_student),
    FieldSpec("Location", "Your location (Urban/Suburban/Rural):", one_of(["Urban", "Suburban", "Rural"])),
    FieldSpec("EmploymentStatus", "Employment status (Self-Employed/Salaried/Student/Unemployed):",
              one_of(["Self-Employed", "Salaried", "Student", "Unemployed"])),
    FieldSpec("LoanAmount", "Requested loan amount in INR (must be > 0):", positive_float()),
    FieldSpec("TenureMonths", "Loan tenure in months (6–360):", int_in_range(6, 360)),
    FieldSpec("InterestRate", "Annual interest rate in % (1–30):", float_in_range(1.0, 30.0)),
    FieldSpec("LoanType", "Type of loan (Personal/Auto/Home/Education/Business):",
              one_of(["Personal", "Auto", "Home", "Education", "Business"])),
    FieldSpec("MissedPayments", "Number of missed payments (0–24):", int_in_range(0, 24)),
    FieldSpec("DelaysDays", "Total delay in days (0–365):", int_in_range(0, 365)),
    FieldSpec("PartialPayments", "Number of partial payments (0–24):", int_in_range(0, 24),
              depends_on=partial_payments_if_missed),
    FieldSpec("InteractionAttempts", "Number of contact attempts made (0–50):", int_in_range(0, 50)),
    FieldSpec("SentimentScore", "Sentiment score from -1 to 1 (e.g., -0.3, 0.7):", float_in_range(-1.0, 1.0)),
    FieldSpec("ResponseTimeHours", "Average response time in hours (0–240):", float_in_range(0.0, 240.0)),
    FieldSpec("AppUsageFrequency", "App usage frequency score (0–100):", float_in_range(0.0, 100.0)),
    FieldSpec("WebsiteVisits", "Number of visits to the loan portal (0–500):", int_in_range(0, 500)),
    FieldSpec("Complaints", "Number of complaints registered (0–50):", int_in_range(0, 50)),
]


In [34]:
EMO_TO_PERSONA = {
    "joy": "cooperative",
    "neutral": "evasive",     # neutral silence can look avoidant
    "surprise": "confused",
    "fear": "confused",
    "sadness": "confused",
    "anger": "aggressive",
    "disgust": "aggressive",
}
def recommend_strategy(persona: str, risk: float, missed: int):
    if risk >= 0.7:
        return "offer_plan_high" if persona == "cooperative" else \
               "senior_agent" if persona == "aggressive" else \
               "educational_call" if persona == "confused" else "escalate_call"
    elif 0.5 <= risk < 0.7:
        return "reminder_payment" if persona == "cooperative" else \
               "structured_negotiation" if persona == "aggressive" else \
               "clarification_message" if persona == "confused" else "follow_up_calls"
    else:
        return "soft_reminder" if missed > 0 else "no_contact"

def analyze_and_recommend(lgb, answers: Dict[str, Any]):
    feature_cols = [
        "Age","Income","LoanAmount","TenureMonths","InterestRate",
        "MissedPayments","DelaysDays","PartialPayments","InteractionAttempts",
        "ResponseTimeHours","AppUsageFrequency","WebsiteVisits","Complaints",
        "SentimentScore"
    ]
    X = [[float(answers.get(col, 0)) for col in feature_cols]]

    risk = float(lgb.predict_proba(X)[0][1]) if hasattr(lgb, "predict_proba") else float(lgb.predict(X)[0])
    target = "Yes" if risk >= 0.5 else "No"

    persona = EMO_TO_PERSONA.get(answers.get("SentimentLabel", "neutral").lower(), "cooperative")
    strategy = recommend_strategy(persona, risk, int(answers.get("MissedPayments", 0)))

    answers.update({"RiskScore": risk, "Target": target, "Persona": persona, "RecommendedStrategy": strategy})

    print("\n--- Analysis Results ---")
    print(f"📊 Will Miss Next Payment: {target}")
    print(f"🔢 Risk Score: {risk:.3f}")
    print(f"🧩 Persona Detected: {persona}")
    print(f"🎯 Recommended Strategy: {strategy}")
    print("-------------------------\n")

    return answers

In [35]:
from huggingface_hub import login

login(token="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")


In [36]:
# --- Mount Google Drive ---
from google.colab import drive
drive.mount('/content/drive')

# --- Model path ---
model_path = "/content/drive/MyDrive/Colab Notebooks/loan_default_predictor.pkl"



# --- Check if model file exists ---
if os.path.exists(model_path):
    print(f"✅ Found model at: {model_path}")
else:
    print(f"❌ Model file not found. Please check path: {model_path}")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
✅ Found model at: /content/drive/MyDrive/Colab Notebooks/loan_default_predictor.pkl


In [37]:


def load_lgb_model(path: str = model_path):
    """Load LightGBM model from pkl file in Google Drive"""
    if not os.path.exists(path):
        raise FileNotFoundError(f"❌ Model not found at {path}")
    print(f"✅ Model found at {path}")
    return joblib.load(path)

def load_emotion_model():
    """Load HuggingFace emotion classification model"""
    print("⏳ Loading emotion model...")
    emo_pipe = pipeline(
        "text-classification",
        model="j-hartmann/emotion-english-distilroberta-base",
        top_k=1
    )
    print("✅ Emotion model loaded successfully!")
    return emo_pipe

# --- Load models once ---
lgb_model = load_lgb_model()
emotion_pipe = load_emotion_model()


✅ Model found at /content/drive/MyDrive/Colab Notebooks/loan_default_predictor.pkl
⏳ Loading emotion model...


Device set to use cuda:0


✅ Emotion model loaded successfully!


In [None]:
# ----------------------------
# Main Loop
# ----------------------------
def main():
    print("🤖 Welcome to the Loan Collection Persona Chatbot (PostgreSQL Edition)\n")

    #init_db()
    lgb = load_lgb_model
    emo_pipe = load_emotion_model()

    answers: Dict[str, Any] = {}
    persona = "cooperative"

    for field in SCHEMA:
        if field.depends_on and not field.depends_on(answers):
            continue
        while True:
            prompt = style_prompt(field.prompt, persona)
            raw = input(prompt + "\n> ")
            valid, val, err = field.validator(raw)
            if valid:
                answers[field.name] = val
                break
            else:
                print(style_error(err, persona))

    while True:
        msg = input(style_prompt("Lastly, how do you feel about repayment? (free text)", persona) + "\n> ")
        if msg.strip():
            emo_result = emo_pipe(msg)[0][0]
            answers["UserMessage"] = msg
            answers["SentimentLabel"] = emo_result["label"]
            answers["SentimentScore"] = normalize_sentiment_score(emo_result["label"], emo_result["score"])
            persona = EMO_TO_PERSONA.get(emo_result["label"].lower(), "cooperative")
            break
        else:
            print(style_error("Message cannot be empty.", persona))

    results = analyze_and_recommend(lgb, answers)
    save_to_db(results)

if __name__ == "__main__":
    main()

🤖 Welcome to the Loan Collection Persona Chatbot (PostgreSQL Edition)

⏳ Loading emotion model...


Device set to use cuda:0


✅ Emotion model loaded successfully!



💬 Loan Assistant Chatbot (CLI)

❌ Database connection failed: connection to server at "localhost" (127.0.0.1), port 5432 failed: Connection refused
	Is the server running on that host and accepting TCP/IP connections?
connection to server at "localhost" (::1), port 5432 failed: Cannot assign requested address
	Is the server running on that host and accepting TCP/IP connections?

⚠️ Skipping DB init — connection unavailable.
ℹ️ [DEBUG] Session already initialized — skipping reset.

🤖 Bot: 🙂 What is your age? (18–75)
👤 You: 10


ValueError: too many values to unpack (expected 3)