In [None]:
!pip install gradio pandas numpy faiss-cpu torch sentence-transformers transformers optuna scikit-learn python-dotenv openpyxl typing-extensions tqdm

Collecting gradio
  Downloading gradio-5.9.1-py3-none-any.whl.metadata (16 kB)
Collecting faiss-cpu
  Downloading faiss_cpu-1.9.0.post1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.4 kB)
Collecting optuna
  Downloading optuna-4.1.0-py3-none-any.whl.metadata (16 kB)
Collecting python-dotenv
  Downloading python_dotenv-1.0.1-py3-none-any.whl.metadata (23 kB)
Collecting aiofiles<24.0,>=22.0 (from gradio)
  Downloading aiofiles-23.2.1-py3-none-any.whl.metadata (9.7 kB)
Collecting fastapi<1.0,>=0.115.2 (from gradio)
  Downloading fastapi-0.115.6-py3-none-any.whl.metadata (27 kB)
Collecting ffmpy (from gradio)
  Downloading ffmpy-0.5.0-py3-none-any.whl.metadata (3.0 kB)
Collecting gradio-client==1.5.2 (from gradio)
  Downloading gradio_client-1.5.2-py3-none-any.whl.metadata (7.1 kB)
Collecting markupsafe~=2.0 (from gradio)
  Downloading MarkupSafe-2.1.5-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.0 kB)
Collecting pydub (from gradio)
  Down

            ### --- Intent Matching using FAISS or Keywords --- ###


In [None]:
import gradio as gr
import pandas as pd
import numpy as np
import faiss
import torch
import logging
import re
from sentence_transformers import SentenceTransformer
from transformers import T5Tokenizer, T5ForConditionalGeneration, AdamW, get_scheduler
from typing import List, Optional, Dict, Any, Union
from torch.utils.data import DataLoader, Dataset

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class InsuranceBot:
    def __init__(self):
        self.data = None
        self.sentence_model = None
        self.t5_model = None
        self.t5_tokenizer = None
        self.embeddings = None
        self.faiss_index = None
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.initialise_models()

    def load_data(self, file_path: str) -> str:
        try:
            self.data = pd.read_excel(file_path)

            # validating required columns
            required_columns = [
                'Insurance Provider', 'Plan Name', 'Premium Price (S$)',
                'Overseas Medical Expenses (S$)', 'Trip Cancellation (S$)'
            ]
            missing_columns = [col for col in required_columns if col not in self.data.columns]
            if missing_columns:
                return f"Missing required columns: {', '.join(missing_columns)}"

            # cleaning numeric values
            for col in ['Premium Price (S$)', 'Overseas Medical Expenses (S$)', 'Trip Cancellation (S$)']:
                self.data[col] = self.data[col].apply(self.clean_numeric_value)

            # generating embeddings for FAISS
            texts = [f"{row['Insurance Provider']} {row['Plan Name']}" for _, row in self.data.iterrows()]
            self.embeddings = self.sentence_model.encode(texts, convert_to_tensor=True).cpu().numpy()
            self.embeddings = self.embeddings / np.linalg.norm(self.embeddings, axis=1, keepdims=True)

            # FAISS indexing - inverted file index w clustering
            nlist = min(5, len(self.embeddings) // 3)  #cluster==1/3rd of data points
            self.faiss_index = faiss.IndexIVFFlat(
                faiss.IndexFlatL2(self.embeddings.shape[1]),  #inner flat index
                self.embeddings.shape[1],
                nlist,  #dynamic clustering based on data size
                faiss.METRIC_L2
            )

            # training only if enough points exist for clustering
            if len(self.embeddings) >= nlist:
                self.faiss_index.train(self.embeddings)
                self.faiss_index.add(self.embeddings)
            else:
                logger.warning("Not enough data points to train FAISS clusters. Using Flat index.")
                self.faiss_index = faiss.IndexFlatL2(self.embeddings.shape[1])  #fallback to flat index
                self.faiss_index.add(self.embeddings)

            # fine-tuning search parameters
            self.faiss_index.nprobe = 10  #search 10 clusters to balance between speed and recall

            logger.info(f"FAISS index size: {self.faiss_index.ntotal}")
            logger.info(f"Sample Data: {self.data.head()}")

            return "Data loaded successfully!"

        except Exception as e:
            logger.error(f"Error loading data: {e}")
            return f"Error loading data: {str(e)}"

    def fine_tune_t5(self, training_data: List[Dict[str, str]], epochs: int = 3, batch_size: int = 4, lr: float = 5e-5):
        """Fine-tune T5 model."""
        logger.info("Starting T5 fine-tuning...")

        dataset = InsuranceDataset(self.t5_tokenizer, training_data)
        dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

        # optimizer and scheduler
        optimizer = AdamW(self.t5_model.parameters(), lr=lr)
        scheduler = get_scheduler("linear", optimizer=optimizer, num_warmup_steps=0, num_training_steps=len(dataloader) * epochs)

        # Move model to device
        self.t5_model.train()

        for epoch in range(epochs):
            for batch in dataloader:
                optimizer.zero_grad()

                # Move data to device
                inputs = batch['input_ids'].to(self.device)
                attention_mask = batch['attention_mask'].to(self.device)
                labels = batch['labels'].to(self.device)

                # Forward pass
                outputs = self.t5_model(input_ids=inputs, attention_mask=attention_mask, labels=labels)
                loss = outputs.loss

                # Backward pass
                loss.backward()
                optimizer.step()
                scheduler.step()

            logger.info(f"Epoch {epoch + 1}/{epochs} Loss: {loss.item()}")

        # Save fine-tuned model
        self.t5_model.save_pretrained('fine_tuned_t5')
        self.t5_tokenizer.save_pretrained('fine_tuned_t5')
        logger.info("Fine-tuning completed and model saved.")

    def format_filtered_response(self, filtered_data: pd.DataFrame) -> str:
        if filtered_data.empty:
            return "⚠️ No matching plans found based on your query."

        response = "📋 Matching Plans:\n\n"
        for _, row in filtered_data.iterrows():
            response += (
                f"📋 {row['Insurance Provider']} - {row['Plan Name']}\n"
                f"💰 Premium: {self.format_currency(row['Premium Price (S$)'])}\n"
                f"🛡️ Coverage: {self.format_currency(row['Overseas Medical Expenses (S$)'])}\n"
                f"❌ Cancellation: {self.format_currency(row['Trip Cancellation (S$)'])}\n\n"
            )
        return response

    def format_faiss_response(self, results: List[Dict[str, Any]]) -> str:
        if not results:
            return "⚠️ No relevant results found. Please refine your query."

        response = "📋 Similar Plans:\n\n"
        for r in results:
            response += (
                f"📋 {r['Provider']} - {r['Plan']}\n"
                f"💰 Premium: {r['Premium']}\n"
                f"🛡️ Coverage: {r['Coverage']}\n"
                f"❌ Cancellation: {r['Cancellation']}\n\n"
            )
        return response

    # filtering data by a specific provider mentioned in the query
    def filter_by_provider(self, provider: str) -> pd.DataFrame:
        filtered = self.data[self.data['Insurance Provider'].str.contains(provider, case=False, na=False)]
        return filtered

    # processing the query and routing it to appropriate functions based on intent
    def process_query(self, query: str) -> str:
        try:
            logger.info(f"Processing query: {query}")
            if self.data is None:
                return "⚠️ Please upload the insurance data file before asking questions."

            # Detect intent
            intent = self.detect_intent(query)
            logger.info(f"Intent identified: {intent}")

            # Route query based on detected intent
            if intent == "affordable_plans":
                return self.most_affordable_plans()
            elif intent == "medical_coverage":
                return self.highest_coverage()
            elif intent == "compare_providers":
                return self.compare_providers(query)
            elif intent == "travel_recommendation":
                return self.recommend_travel_plan(query)

            # Price range filters
            price_match = re.findall(r'\$?(\d+)', query)
            if len(price_match) == 2:  # Specific range
                low, high = float(price_match[0]), float(price_match[1])
                filtered = self.data[
                    (self.data['Premium Price (S$)'] >= low) & (self.data['Premium Price (S$)'] <= high)
                ]
                return self.format_filtered_response(filtered)

            elif len(price_match) == 1 and ('below' in query or 'under' in query):
                limit = float(price_match[0])
                filtered = self.data[self.data['Premium Price (S$)'] < limit]
                return self.format_filtered_response(filtered)

            elif len(price_match) == 1 and ('above' in query or 'over' in query):
                limit = float(price_match[0])
                filtered = self.data[self.data['Premium Price (S$)'] > limit]
                return self.format_filtered_response(filtered)

            # Fallback to FAISS similarity search
            results = self.retrieve_similar(query, top_k=5)
            return self.format_faiss_response(results)

        except Exception as e:
            logger.error(f"Error processing query: {e}")
            return "⚠️ Unable to process your query. Please try again."

    # detects intent dynamically using FAISS or keyword-based matching
    def detect_intent(self, query: str) -> str:
        try:
            logger.info(f"Detecting intent for query: {query}")

            # Expanded predefined intents and examples
            intent_examples = {
                'cancellation_coverage': ["compare trip cancellation coverage", "cancellation protection"],
                'baggage_protection': ["baggage loss protection", "compare baggage coverage"],
                'affordable_plans': ["affordable plans", "cheap travel insurance", "plans under $100", "cheapest plans", "budget plans", "lowest price"],
                'value_for_money': ["best value for money plans", "cost-effective plans"],
                'compare_providers': ["compare providers", "compare plans", "difference between providers"],
                'medical_coverage': ["highest medical coverage", "compare medical coverage"],
                'travel_recommendation': ["recommend a plan for travel", "best travel plan for vacation"],
                'top_plans': ["top 3 plans", "best plans", "ranked plans"],
            }

            # Prepare embeddings for examples
            example_texts = []
            intent_labels = []
            for intent, examples in intent_examples.items():
                example_texts.extend(examples)
                intent_labels.extend([intent] * len(examples))

            # Encode query and example embeddings
            query_embedding = self.sentence_model.encode([query])[0]
            example_embeddings = self.sentence_model.encode(example_texts)
            similarities = np.dot(example_embeddings, query_embedding) / (
                np.linalg.norm(example_embeddings, axis=1) * np.linalg.norm(query_embedding)
            )

            # Get best match and confidence
            best_match_idx = np.argmax(similarities)
            best_match_intent = intent_labels[best_match_idx]
            confidence = similarities[best_match_idx]

            logger.info(f"Detected Intent: {best_match_intent}, Confidence: {confidence:.2f}")

            # Confidence threshold
            if confidence > 0.6:
                return best_match_intent
            else:
                return 'general'

        except Exception as e:
            logger.error(f"Error detecting intent: {e}")
            return 'general'

    def initialise_models(self):
        logger.info("Initializing models...")
        self.sentence_model = SentenceTransformer('all-MiniLM-L6-v2')
        self.sentence_model.to(self.device)

        model_name = 't5-small'
        self.t5_tokenizer = T5Tokenizer.from_pretrained(model_name)
        self.t5_model = T5ForConditionalGeneration.from_pretrained(model_name)
        self.t5_model.to(self.device)
        logger.info("Models initialized successfully.")

    def clean_numeric_value(self, value: Any) -> Union[float, str]:
        try:
            if pd.isna(value):
                return 0.0
            if isinstance(value, str) and 'unlimited' in value.lower():  #treat 'Unlimited' as high value
                return 1e12
            cleaned = re.sub(r'[^\d.]', '', str(value))
            return float(cleaned) if cleaned else 0.0
        except Exception as e:
            logger.warning(f"Error cleaning value {value}: {e}")
            return 0.0

    def format_currency(self, value: Union[float, str]) -> str:
        if isinstance(value, str) and value.lower() == 'unlimited':
            return 'Unlimited'
        if isinstance(value, (int, float)):
            return f"S${value:,.2f}"
        return 'S$0.00'

    # get insurance plans based on query, price filters and contextual notes
    def retrieve_similar(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]:
        try:
            logger.info(f"Retrieving similar plans for query: {query}")

            # Generate query embedding
            query_embedding = self.sentence_model.encode([query], convert_to_tensor=True).cpu().numpy()
            query_embedding = query_embedding / np.linalg.norm(query_embedding, axis=1, keepdims=True)
            query_embedding = query_embedding.reshape(1, -1)  # Ensure correct shape

            # Normalize embeddings for FAISS
            faiss.normalize_L2(self.embeddings)
            faiss.normalize_L2(query_embedding)

            # Search FAISS index
            distances, indices = self.faiss_index.search(query_embedding, top_k)

            # Collect matching results
            results = []
            for idx in indices[0]:
                if idx < len(self.data):
                    row = self.data.iloc[idx]
                    results.append({
                        'Provider': row['Insurance Provider'],
                        'Plan': row['Plan Name'],
                        'Premium': self.format_currency(row['Premium Price (S$)']),
                        'Coverage': self.format_currency(row['Overseas Medical Expenses (S$)']),
                        'Cancellation': self.format_currency(row['Trip Cancellation (S$)']),
                        'Notes': row.get('Special Notes', "No additional notes.")
                    })

            # Sort results by price for affordability-related queries
            if "cheap" in query.lower() or "affordable" in query.lower():
                results = sorted(results, key=lambda x: float(re.sub(r'[^\d.]', '', x['Premium'])))

            return results
        except Exception as e:
            logger.error(f"Error retrieving similar plans: {e}")
            return []

    ### --- Coverage Analysis --- ###

    def cancellation_coverage(self) -> str:
        """Compares trip cancellation coverage across providers."""
        try:
            sorted_plans = self.data.sort_values('Trip Cancellation (S$)', ascending=False).head(5)
            response = "✈️ Top Plans for Cancellation Coverage:\n\n"
            for _, plan in sorted_plans.iterrows():
                response += (f"📋 {plan['Plan Name']} ({plan['Insurance Provider']})\n"
                            f"💰 Premium: {self.format_currency(plan['Premium Price (S$)'])}\n"
                            f"❌ Cancellation: {self.format_currency(plan['Trip Cancellation (S$)'])}\n\n")
            return response
        except Exception as e:
            logger.error(f"Error in cancellation coverage comparison: {e}")
            return "⚠️ Unable to retrieve cancellation coverage details."

    def baggage_protection(self) -> str:
        """Compares plans for baggage loss protection."""
        try:
            if 'Baggage Loss (S$)' in self.data.columns:
                sorted_plans = self.data.sort_values('Baggage Loss (S$)', ascending=False).head(5)
                response = "🛄 Top Plans for Baggage Loss Protection:\n\n"
                for _, plan in sorted_plans.iterrows():
                    response += (f"📋 {plan['Plan Name']} ({plan['Insurance Provider']})\n"
                                f"💰 Premium: {self.format_currency(plan['Premium Price (S$)'])}\n"
                                f"🛄 Baggage Loss: {self.format_currency(plan['Baggage Loss (S$)'])}\n\n")
                return response
            return "⚠️ Baggage loss coverage information is not available in the data."
        except Exception as e:
            logger.error(f"Error in baggage protection comparison: {e}")
            return "⚠️ Unable to retrieve baggage protection details."


    ### --- Price Analysis --- ###

    def most_affordable_plans(self) -> str:
        """Lists the most affordable plans."""
        try:
            plans = self.data.nsmallest(5, 'Premium Price (S$)')
            response = "💰 Most Affordable Plans:\n\n"
            for _, plan in plans.iterrows():
                response += (f"📋 {plan['Plan Name']} ({plan['Insurance Provider']})\n"
                            f"💰 Premium: {self.format_currency(plan['Premium Price (S$)'])}\n\n")
            return response
        except Exception as e:
            logger.error(f"Error in affordable plans retrieval: {e}")
            return "⚠️ Unable to retrieve affordable plans."

    def value_for_money(self) -> str:
        self.data['Value Ratio'] = self.data['Overseas Medical Expenses (S$)'] / self.data['Premium Price (S$)']
        best_value = self.data.nlargest(3, 'Value Ratio')

        response = "💎 Best Value-for-Money Plans:\n\n"
        for _, plan in best_value.iterrows():
            response += (
                f"📋 {plan['Plan Name']} - Value Ratio: {plan['Value Ratio']:.2f}\n"
                f"💰 Premium: {self.format_currency(plan['Premium Price (S$)'])}\n"
                f"🛡️ Coverage: {self.format_currency(plan['Overseas Medical Expenses (S$)'])}\n"
            )
        return response


    ### --- Plan Comparisons --- ###

    def compare_providers(self, query: str) -> str:
        """Compare two or more providers based on key metrics."""
        try:
            providers = [p for p in self.data['Insurance Provider'].unique() if p.lower() in query.lower()]
            if len(providers) < 2:
                return "⚠️ Please specify at least two providers to compare."

            comparisons = []
            for provider in providers:
                provider_data = self.data[self.data['Insurance Provider'] == provider]
                avg_premium = provider_data['Premium Price (S$)'].mean()
                max_coverage = provider_data['Overseas Medical Expenses (S$)'].max()
                max_cancellation = provider_data['Trip Cancellation (S$)'].max()

                comparisons.append(
                    f"📊 {provider}\n💰 Avg Premium: {self.format_currency(avg_premium)}\n"
                    f"🛡️ Max Coverage: {self.format_currency(max_coverage)}\n"
                    f"❌ Max Cancellation: {self.format_currency(max_cancellation)}\n\n"
                )
            return "\n".join(comparisons)
        except Exception as e:
            logger.error(f"Error comparing providers: {e}")
            return "⚠️ Unable to process your comparison query. Please try again."


    ### --- Travel Recommendations --- ###

    def recommend_travel_plan(self, query: str) -> str:
        try:
            duration_match = re.search(r'(\d+)\s*(weeks?|days?)', query.lower())
            duration = int(duration_match.group(1)) if duration_match else 1

            if duration >= 3:
                plans = self.data.nlargest(3, 'Overseas Medical Expenses (S$)')
            else:
                plans = self.data.nsmallest(3, 'Premium Price (S$)')


            response = "🌍 Recommended Plans for Travel:\n"
            for _, plan in plans.iterrows():
                response += (f"📋 {plan['Plan Name']} - Premium: {self.format_currency(plan['Premium Price (S$)'])}, "
                            f"Coverage: {self.format_currency(plan['Overseas Medical Expenses (S$)'])}\n")

            return response

        except Exception as e:
            logger.error(f"Error recommending travel plans: {e}")
            return "⚠️ Unable to recommend travel plans. Please try again."

    def compare_providers(self, query: str) -> str:
        try:
            # search providers mentioned in the query
            providers = [p for p in self.data['Insurance Provider'].unique() if p.lower() in query.lower()]
            if len(providers) < 2:
                return "⚠️ Please specify at least two providers to compare."

            # get comparison data
            comparisons = []
            for provider in providers:
                provider_data = self.data[self.data['Insurance Provider'] == provider]

                avg_premium = provider_data['Premium Price (S$)'].mean()
                max_coverage = provider_data['Overseas Medical Expenses (S$)'].max()
                max_cancellation = provider_data['Trip Cancellation (S$)'].max()

                # add provider summary
                comparisons.append(
                    f"📊 {provider}\n💰 Avg Premium: {self.format_currency(avg_premium)}\n"
                    f"🛡️ Max Coverage: {self.format_currency(max_coverage)}\n"
                    f"❌ Max Cancellation: {self.format_currency(max_cancellation)}"
                )
            return "\n\n".join(comparisons)

        except Exception as e:
            logger.error(f"Error comparing providers: {e}", exc_info=True)
            return "⚠️ Unable to process your comparison query. Please try again."

    # highest medical coverage
    def highest_coverage(self) -> str:
        plans = self.data.nlargest(3, 'Overseas Medical Expenses (S$)')
        response = "🏥 Plans with Highest Medical Coverage:\n\n"
        for _, plan in plans.iterrows():
            response += f"📋 {plan['Plan Name']} - Coverage: {self.format_currency(plan['Overseas Medical Expenses (S$)'])}\n"
        return response

class InsuranceDataset(Dataset):
    def __init__(self, tokenizer, data):
        self.data = data
        self.tokenizer = tokenizer

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        item = self.data[idx]
        inputs = self.tokenizer(item['query'], max_length=512, padding="max_length", truncation=True, return_tensors="pt")
        outputs = self.tokenizer(item['response'], max_length=150, padding="max_length", truncation=True, return_tensors="pt")
        return {
            'input_ids': inputs['input_ids'].squeeze(0),
            'attention_mask': inputs['attention_mask'].squeeze(0),
            'labels': outputs['input_ids'].squeeze(0)
        }

def handle_query(query: str, file: Optional[gr.File]) -> str:
    # Check if a file is uploaded and load it
    if file:
        result = bot.load_data(file.name)
        if "successfully" not in result.lower():
            return f"❌ Data loading failed: {result}"
    return bot.process_query(query)


bot = InsuranceBot()

interface = gr.Interface(
    fn=handle_query,
    inputs=[
        gr.Textbox(label="Type your question below."),
        gr.File(label="Upload Excel File", file_types=[".xlsx", ".xls"])
    ],
    outputs=gr.Textbox(label="Response"),
    title="Travel Insurance Chatbot",
    description="Ask questions about insurance plans, providers, pricing, coverage and comparisons.",
    examples=[
        ["Tell me about FWD travel insurance plans"],
        ["Compare trip cancellation coverage across providers"],
        ["What are the best plans to take for a family trip"],
        ["Compare AXA and AIA travel insurance plans"],
        ["What are the cheapest travel insurance options?"]
    ],
    theme="default",
)

interface.launch(debug=True, share=True)

Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
* Running on public URL: https://b15489dc569c62b099.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


Keyboard interruption in main thread... closing server.
Killing tunnel 127.0.0.1:7860 <> https://b15489dc569c62b099.gradio.live


