In [1]:
import pandas as pd
import numpy as np

# suppress warnings
import warnings
warnings.filterwarnings("ignore")

In [3]:
# Load dataset
df = pd.read_csv("D:/WORKSPACE_CS/LJMU_Research/other/dash/health_policy_recommendation/src/health_policy_recommendation/data/ushealthinsurance_with_company3.csv")
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1338 entries, 0 to 1337
Data columns (total 8 columns):
 #   Column             Non-Null Count  Dtype  
---  ------             --------------  -----  
 0   age                1338 non-null   int64  
 1   sex                1338 non-null   object 
 2   bmi                1338 non-null   float64
 3   children           1338 non-null   int64  
 4   smoker             1338 non-null   object 
 5   region             1338 non-null   object 
 6   charges            1338 non-null   float64
 7   insurance_company  1338 non-null   object 
dtypes: float64(2), int64(2), object(4)
memory usage: 83.8+ KB


In [6]:
# approach 1

from sklearn.preprocessing import OneHotEncoder
from sklearn.metrics.pairwise import cosine_similarity
import joblib

region_budget_encoder = joblib.load(r'D:\WORKSPACE_CS\LJMU_Research\other\dash\health_policy_recommendation\src\health_policy_recommendation\models\content_based\region_budget_encoder.pkl')
company_vectors = pd.read_csv(r'D:\WORKSPACE_CS\LJMU_Research\other\dash\health_policy_recommendation\src\health_policy_recommendation\models\content_based\company_vectors.csv', index_col=0)


def feedback_score(row):
    budget_score = {"low": 5, "medium": 3, "high": 1}.get(row["user_budget"], 0)
    smoker_score = {"yes": -1, "no": 1}.get(row["smoker"], 0)
    bmi_score = 1 if row["bmi"] < 25 else -1

    score = budget_score + smoker_score + bmi_score

    # Clip the score between 0 and 5
    return max(0, min(5, score))


def health_score(row):
    score = 2
    if row["bmi"] < 25:
        score += 1
    elif row["bmi"] >= 30:
        score -= 1

    score += 1 if row["smoker"] == "no" else -1
    return score


def create_user_vector(new_user_input: dict, encoder: OneHotEncoder) -> np.ndarray:
    user_input_df = pd.DataFrame([new_user_input])
    user_input_df["user_health_score"] = user_input_df.apply(health_score, axis=1)

    if "user_feedback" not in user_input_df.columns:
        user_input_df["user_feedback"] = user_input_df.apply(feedback_score, axis=1)

    # Encode categorical features
    encoded_cats = encoder.transform(user_input_df[["user_budget", "user_region"]])

    # Scale numerical features
    scaled_health = user_input_df["user_health_score"] / 4.0
    scaled_feedback = user_input_df["user_feedback"] / 5.0

    # Concatenate encoded and scaled features
    user_vector = np.hstack(
        (
            encoded_cats,
            scaled_health.values.reshape(-1, 1),
            scaled_feedback.values.reshape(-1, 1),
        )
    )

    return user_vector


def recomend(new_user_input, company_vectors, label_encoder):
    # create user vector
    user_vector = create_user_vector(new_user_input, label_encoder)

    # Calculate cosine similarity
    similarities = cosine_similarity(user_vector, company_vectors.values)

    # Map scores back to company names for readability
    similarity_scores = pd.Series(similarities[0], index=company_vectors.index)

    # # Get the top recommendation
    # recommendation = similarity_scores.idxmax() # Get the index (company name) of the max score
    # highest_score = similarity_scores.max()

    # return recommendation, highest_score

    # Display ranked top 3 recommendations
    recommendation = similarity_scores.sort_values(ascending=False).iloc[:3]
    return recommendation.to_dict()

In [9]:
model = joblib.load(
    r"D:\WORKSPACE_CS\LJMU_Research\other\dash\health_policy_recommendation\src\health_policy_recommendation\models\insurance_model.pkl"
)

model_features = joblib.load(
    r"D:\WORKSPACE_CS\LJMU_Research\other\dash\health_policy_recommendation\src\health_policy_recommendation\models\model_features.pkl"
)

label_enconder = joblib.load(
    r"D:\WORKSPACE_CS\LJMU_Research\other\dash\health_policy_recommendation\src\health_policy_recommendation\models\label_encoder.pkl"
)


# Basic (Static Preferences)


def get_utility_scores_for_user(user_input_df):
    # Example static mapping (normally you'd fetch this from user profile/preferences)

    preferred_region = (
        user_input_df.iloc[0]["preferred_region"] or user_input_df.iloc[0]["region"]
    )

    budget_level = user_input_df.iloc[0]["budget"]  # low / medium / high

    # Define utility rules for companies (this could come from a config or DB)
    company_meta = {
        "Company A": {"region": "northwest", "price": "medium"},
        "Company B": {"region": "southeast", "price": "low"},
        "Company C": {"region": "southwest", "price": "high"},
        "Company D": {"region": "northeast", "price": "medium"},
    }

    utility_scores = {}

    for company, meta in company_meta.items():
        score = 1.0  # base utility

        # Increase utility if region matches
        if meta["region"] == preferred_region:
            score += 0.3

        # Modify based on budget match
        if meta["price"] == budget_level:
            score += 0.2

        utility_scores[company] = score

    return utility_scores


def preprocess_user_input(user_input_df, model_features):
    # One-hot encode like training
    user_input_encoded = pd.get_dummies(
        user_input_df, columns=["sex", "smoker", "region", "budget"], drop_first=True
    )

    # Add missing columns that existed during training
    for col in model_features:
        if col not in user_input_encoded.columns:
            user_input_encoded[col] = 0

    # Reorder to match model input exactly
    user_input_encoded = user_input_encoded[model_features]

    return user_input_encoded


def health_score(row):
    score = 2
    if row["bmi"] < 25:
        score += 1
    elif row["bmi"] >= 30:
        score -= 1

    score += 1 if row["smoker"] == "no" else -1
    return score


def predict(user_input_df, top_n=3):

    user_input_df = pd.DataFrame([user_input_df])

    # Map budget to score
    budget_score = user_input_df["budget"].map({"low": 5, "medium": 3, "high": 1})

    # Compute feedback
    user_input_df["feedback"] = (
        budget_score
        + user_input_df["smoker"].map({"yes": -1, "no": 1})
        + user_input_df["bmi"].apply(lambda x: 1 if x < 25 else -1)
    )

    # Clip to range 0–5
    user_input_df["feedback"] = user_input_df["feedback"].clip(0, 5)

    user_input_df["health_score"] = user_input_df.apply(health_score, axis=1)

    # Preprocess user input to match training features
    user_input_encoded = preprocess_user_input(user_input_df, model_features)

    # Step 1: Predict class probabilities
    probs = model.predict_proba(user_input_encoded)
    class_names = label_enconder.inverse_transform(
        model.classes_
    )  # get original labels

    # Step 2: Get top-N most probable recommendations
    # top_n_probs = sorted(
    #     list(zip(class_names, probs[0])),
    #     key=lambda x: x[1],
    #     reverse=True
    # )[:top_n]

    prob_dict = dict(zip(class_names, probs[0]))  # {'CompanyA': 0.45, ...}

    # Step 3: Sort and get top-N as a dictionary
    top_n_probs = dict(
        sorted(prob_dict.items(), key=lambda x: x[1], reverse=True)[:top_n]
    )

    # # Step 3: Fetch user-specific utility scores
    # utility_scores = get_utility_scores_for_user(user_input_df)

    # # Step 4: Multiply probabilities with utility scores
    # recommendations = {
    #     company: prob * utility_scores.get(company, 1.0)  # Default utility = 1.0 if not found
    #     for company, prob in top_n_probs.items()
    # }

    # Step 5: Re-rank based on adjusted utility score
    # recommendations = sorted(recommendations, key=lambda x: x[1], reverse=True)

    # return recommendations

    return top_n_probs


In [8]:
def fuse_recommendations(similarity_scores, feedback_scores, alpha=0.5):
    """
    Combine content-based similarity scores with utility-based ML predictions.

    Parameters:
        similarity_scores (dict): item_id -> content-based similarity score.
        feedback_scores (dict): item_id -> utility/feedback score from ML model.
        alpha (float): Balance between content-based and ML-based scores.
                       alpha=1: only content-based; alpha=0: only ML-based.

    Returns:
        fused_scores (dict): item_id -> combined fused score.
    """

    # Align the items present in both systems
    item_ids = list(set(similarity_scores) | set(feedback_scores))

    df = pd.DataFrame(
        {
            "item_id": item_ids,
            "similarity_score": [similarity_scores.get(i, 0) for i in item_ids],
            "feedback_score": [feedback_scores.get(i, 0) for i in item_ids],
        }
    )

    # Fuse scores using weighted average
    df["fused_score"] = (
        alpha * df["similarity_score"] + (1 - alpha) * df["feedback_score"]
    )

    # Sort or filter if needed:
    df = df.sort_values(by="fused_score", ascending=False)[:3]

    return dict(zip(df["item_id"], df["fused_score"]))

In [10]:
new_user_input = {
    'age': 19,
    'sex': 'female',
    'bmi': 27.9,
    'children': 0,
    'smoker': 'yes',
    'region': 'southeast',
    'budget': 'high',
    'user_feedback': 4.0,
}

# Derive and restructure into UserPreferences3 format
user_preferences_payload = {
    "user_preferences_1": {
        "user_budget": new_user_input["budget"],
        "user_region": new_user_input["region"],
        "bmi": new_user_input["bmi"],
        "smoker": new_user_input["smoker"],
        "user_feedback": new_user_input.get("user_feedback"),
    },
    "user_preferences_2": {
        "age": new_user_input["age"],
        "sex": new_user_input["sex"],
        "bmi": new_user_input["bmi"],
        "children": new_user_input["children"],
        "smoker": new_user_input["smoker"],
        "region": new_user_input["region"],
        "budget": new_user_input["budget"],
        "preferred_region": new_user_input.get("region"),
    }
}



In [11]:
def content_based_recommendation(user_preferences_1: dict):
    return recomend(user_preferences_1, company_vectors, region_budget_encoder)

def model_based_recommendation(user_preferences_2: dict):
    return predict(user_preferences_2, top_n=3)


In [12]:
content_based_result = content_based_recommendation(user_preferences_payload['user_preferences_1'])
model_based_result = model_based_recommendation(user_preferences_payload['user_preferences_2'])

In [13]:
combined_recomendation = fuse_recommendations(content_based_result, model_based_result, alpha=0.6)

In [14]:
combined_recomendation

{'Company C': 0.9388967556074301,
 'Company B': 0.5244893467738976,
 'Company D': 0.3794639181751614}

## Evaluation

In [15]:
import numpy as np
import pandas as pd
import heapq

def precision_at_k(recommended_items, relevant_items, k):
    recommended_k = recommended_items[:k]
    return len(set(recommended_k) & set(relevant_items)) / k

def recall_at_k(recommended_items, relevant_items, k):
    recommended_k = recommended_items[:k]
    return len(set(recommended_k) & set(relevant_items)) / len(relevant_items) if relevant_items else 0

def hit_at_k(recommended_items, relevant_items, k):
    recommended_k = recommended_items[:k]
    return 1 if set(recommended_k) & set(relevant_items) else 0

def ndcg_at_k(recommended_items, relevant_items, k):
    dcg = 0.0
    for i, item in enumerate(recommended_items[:k]):
        if item in relevant_items:
            dcg += 1 / np.log2(i + 2)
    idcg = sum([1 / np.log2(i + 2) for i in range(min(len(relevant_items), k))])
    return dcg / idcg if idcg > 0 else 0

In [None]:
def evaluate_content_based(similarity_scores, ground_truth, k=5):
    recommended_items = [item for item, _ in sorted(similarity_scores.items(), key=lambda x: x[1], reverse=True)]
    return {
        'Precision@K': precision_at_k(recommended_items, ground_truth, k),
        'Recall@K': recall_at_k(recommended_items, ground_truth, k),
        'Hit@K': hit_at_k(recommended_items, ground_truth, k),
        'NDCG@K': ndcg_at_k(recommended_items, ground_truth, k)
    }


In [None]:
eval = evaluate_content_based(combined_recomendation, )   # incomplete