In [1]:
# Installing dependencies
import subprocess
import sys
import os
import warnings
warnings.filterwarnings('ignore')

packages = {
    'torch': '>=2.0.0',
    'transformers': '>=4.30.0', 
    'openai-whisper': '>=20231117',
    'sentence-transformers': '>=2.2.0',
    'ipywidgets': '>=8.0.0',
    'langdetect': '>=1.0.9',
    'jiwer': '>=3.0.0',
    'datasets': '>=2.14.0',
    'soundfile': '>=0.12.0',
    'librosa': '>=0.10.0',
    'accelerate': '>=0.20.0',
    'evaluate': '>=0.4.0'
}

print("Installing dependencies...")
for package, version in packages.items():
    try:
        subprocess.check_call([sys.executable, '-m', 'pip', 'install', '-q', f"{package}{version}"])
        print(f"✓ {package}")
    except Exception as e:
        print(f"✗ {package}: {e}")

Installing dependencies...
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 363.4/363.4 MB 4.9 MB/s eta 0:00:00
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 13.8/13.8 MB 97.7 MB/s eta 0:00:00
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 24.6/24.6 MB 78.7 MB/s eta 0:00:00
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 883.7/883.7 kB 44.0 MB/s eta 0:00:00
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 664.8/664.8 MB 2.5 MB/s eta 0:00:00
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 211.5/211.5 MB 6.0 MB/s eta 0:00:00
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 56.3/56.3 MB 30.5 MB/s eta 0:00:00
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 127.9/127.9 MB 13.5 MB/s eta 0:00:00
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 207.5/207.5 MB 2.4 MB/s eta 0:00:00
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 21.1/21.1 MB 8.1 MB/s eta 0:00:00
✓ torch
✓ transformers
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 803.2/803.2 kB 12.3 MB/s eta 0:00:00
✓ openai-whisper
✓ sentence-transformers
✓ ipywidgets
     ━━━━━━━━━━━━━

ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
bigframes 2.8.0 requires google-cloud-bigquery-storage<3.0.0,>=2.30.0, which is not installed.
cesium 0.12.4 requires numpy<3.0,>=2.0, but you have numpy 1.26.4 which is incompatible.
gcsfs 2025.3.2 requires fsspec==2025.3.2, but you have fsspec 2025.3.0 which is incompatible.
bigframes 2.8.0 requires google-cloud-bigquery[bqstorage,pandas]>=3.31.0, but you have google-cloud-bigquery 3.25.0 which is incompatible.
bigframes 2.8.0 requires rich<14,>=12.4.4, but you have rich 14.0.0 which is incompatible.


✓ datasets
✓ soundfile
✓ librosa
✓ accelerate
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 84.1/84.1 kB 2.4 MB/s eta 0:00:00
✓ evaluate


In [2]:
# Core imports
import torch
import whisper
import re
import numpy as np
import pandas as pd
import time
import logging
from pathlib import Path
from typing import Dict, List, Tuple, Optional, Union
from dataclasses import dataclass
from transformers import pipeline, WhisperForConditionalGeneration, WhisperProcessor, Seq2SeqTrainer, Seq2SeqTrainingArguments, DataCollatorForSeq2Seq
from sentence_transformers import SentenceTransformer
import ipywidgets as widgets
from IPython.display import display, clear_output, HTML, Audio, Javascript
from langdetect import detect, detect_langs
import jiwer
import soundfile as sf
import librosa
import json
import tempfile
from datetime import datetime
from datasets import load_dataset, concatenate_datasets

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Device configuration
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Device: {device}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")

2025-07-14 07:55:13.130594: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1752479713.451167      36 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1752479713.543592      36 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


Device: cuda
GPU: Tesla T4


In [3]:
# Data structures
@dataclass
class ChatbotResponse:
    message: str
    next_step: str
    collected_data: Dict
    confidence: float

@dataclass
class TranscriptionResult:
    text: str
    language: str
    confidence: float
    processing_time: float

@dataclass
class AnalysisResult:
    original_text: str
    translated_text: str
    detected_languages: List[str]
    intent: str
    confidence: float
    urgency: str
    top_matches: Dict[str, float]
    processing_time: float
    chatbot_context: Dict

# Service categories
SERVICE_INTENTS = {
    'Emergency Services': {
        'description': "Emergency medical fire police ambulance urgent critical immediate danger life threatening accident",
        'keywords': ['emergency', 'urgent', 'critical', 'ambulance', 'fire', 'police', 'danger', 'accident', 'help'],
        'urgency_indicators': ['emergency', 'urgent', 'critical', 'immediate', 'danger']
    },
    'Healthcare': {
        'description': "Doctor consultation medical help nurse hospital health checkup pharmacy appointment medicine treatment",
        'keywords': ['doctor', 'medical', 'health', 'medicine', 'hospital', 'nurse', 'treatment', 'checkup', 'appointment'],
        'urgency_indicators': ['emergency', 'urgent', 'critical', 'fever', 'pain', 'bleeding']
    },
    'Home Maintenance': {
        'description': "Plumbing electrical repair maintenance fix broken water leak pipe electrician technician service",
        'keywords': ['plumber', 'electrician', 'repair', 'fix', 'broken', 'leak', 'maintenance', 'service'],
        'urgency_indicators': ['leak', 'burst', 'flood', 'sparks', 'power outage', 'broken']
    },
    'Transportation': {
        'description': "Taxi cab ride transport vehicle car bike delivery driver pickup drop booking",
        'keywords': ['taxi', 'cab', 'ride', 'transport', 'car', 'bike', 'delivery', 'driver', 'pickup'],
        'urgency_indicators': ['emergency transport', 'urgent delivery', 'stranded']
    },
    'Cleaning Services': {
        'description': "House cleaning sanitization deep cleaning carpet window office residential cleaning service",
        'keywords': ['clean', 'cleaning', 'sanitize', 'wash', 'sweep', 'mop', 'vacuum', 'dust'],
        'urgency_indicators': ['urgent cleaning', 'sanitization needed']
    },
    'General Services': {
        'description': "General help assistance support service consultation other miscellaneous requests",
        'keywords': ['help', 'assistance', 'support', 'service', 'general', 'consultation', 'other'],
        'urgency_indicators': ['urgent help', 'immediate assistance']
    }
}

print(f"Service categories configured: {len(SERVICE_INTENTS)}")

Service categories configured: 6


In [4]:
# Chatbot with Use Case Detection
class ServiceRequestChatbot:
    # Intelligent chatbot for service request use case detection    
    def __init__(self):
        self.conversation_state = {
            'step': 'greeting',
            'language': None,
            'input_type': None,
            'service_type': None,
            'urgency': None,
            'location': None,
            'contact': None,
            'description': None,
            'audio_file': None
        }
        self.conversation_history = []
    
    def reset_conversation(self):
        # Reset conversation state
        self.conversation_state = {
            'step': 'greeting',
            'language': None,
            'input_type': None,
            'service_type': None,
            'urgency': None,
            'location': None,
            'contact': None,
            'description': None,
            'audio_file': None
        }
        self.conversation_history = []
    
    def get_greeting_message(self) -> str:
        # Multi-language greeting
        return "Service Request Assistant\nHello! How can I help you today?"
    
    def process_user_input(self, user_input: str) -> ChatbotResponse:
        # Process user input and return appropriate response
        try:
            current_step = self.conversation_state['step']
            
            if current_step == 'greeting':
                return self._handle_greeting(user_input)
            elif current_step == 'language_selection':
                return self._handle_language_selection(user_input)
            elif current_step == 'input_type':
                return self._handle_input_type(user_input)
            elif current_step == 'service_type':
                return self._handle_service_type(user_input)
            elif current_step == 'urgency_level':
                return self._handle_urgency_level(user_input)
            elif current_step == 'description':
                return self._handle_description(user_input)
            elif current_step == 'confirmation':
                return self._handle_confirmation(user_input)
            elif current_step == 'processing':
                return self._handle_processing(user_input)
            else:
                return self._handle_completion()
        except Exception as e:
            logger.error(f"Chatbot error: {e}")
            return ChatbotResponse(
                message="I encountered an error. Let's start over. How can I help you today?",
                next_step='greeting',
                collected_data=self.conversation_state.copy(),
                confidence=0.0
            )
    
    def _handle_greeting(self, user_input: str) -> ChatbotResponse:
        # Handle initial greeting and language detection
        try:
            detected_langs = self._detect_language(user_input)
            primary_lang = detected_langs[0] if detected_langs else 'en'
            
            self.conversation_state['language'] = primary_lang
            self.conversation_state['step'] = 'input_type'
            
            lang_names = {'en': 'English', 'hi': 'Hindi', 'ta': 'Tamil'}
            detected_name = lang_names.get(primary_lang, 'English')
            
            message = f"Great! I detected you're communicating in {detected_name}.\nHow would you like to provide your service request?"
            
            return ChatbotResponse(
                message=message,
                next_step='input_type',
                collected_data=self.conversation_state.copy(),
                confidence=0.8
            )
        except Exception as e:
            logger.error(f"Greeting handling error: {e}")
            return self._handle_completion()
    
    def _handle_language_selection(self, user_input: str) -> ChatbotResponse:
        # Handle language selection
        return self._handle_input_type(user_input)
    
    def _handle_input_type(self, user_input: str) -> ChatbotResponse:
        # Handle input type selection
        try:
            user_input_lower = user_input.lower()
            
            if any(word in user_input_lower for word in ['audio', 'voice', 'speak', 'record', 'sound']):
                self.conversation_state['input_type'] = 'audio'
                message = "Perfect! You can upload an audio file using the Audio Upload section.\nAfter uploading, click 'Process Audio' to continue."
                next_step = 'audio_processing'
            else:
                self.conversation_state['input_type'] = 'text'
                message = "Great! Please describe your service request in detail.\nYou can type in Hindi, Tamil, or English."
                next_step = 'description'
            
            self.conversation_state['step'] = next_step
            
            return ChatbotResponse(
                message=message,
                next_step=next_step,
                collected_data=self.conversation_state.copy(),
                confidence=0.9
            )
        except Exception as e:
            logger.error(f"Input type handling error: {e}")
            return self._handle_completion()
    
    def _handle_service_type(self, user_input: str) -> ChatbotResponse:
        # Handle service type selection
        return self._handle_description(user_input)
    
    def _handle_urgency_level(self, user_input: str) -> ChatbotResponse:
        # Handle urgency level selection
        return self._handle_description(user_input)
    
    def _handle_description(self, user_input: str) -> ChatbotResponse:
        # Handle service description
        try:
            self.conversation_state['description'] = user_input
            self.conversation_state['step'] = 'confirmation'
            
            # Quick service type detection
            detected_service = self._quick_service_detection(user_input)
            detected_urgency = self._quick_urgency_detection(user_input)
            
            self.conversation_state['service_type'] = detected_service
            self.conversation_state['urgency'] = detected_urgency
            
            message = f"Thank you! I've analyzed your request:\n- Service Type: {detected_service}\n- Urgency Level: {detected_urgency}\n\nWould you like me to process this request? (Yes/No)"
            
            return ChatbotResponse(
                message=message,
                next_step='confirmation',
                collected_data=self.conversation_state.copy(),
                confidence=0.85
            )
        except Exception as e:
            logger.error(f"Description handling error: {e}")
            return self._handle_completion()
    
    def _handle_confirmation(self, user_input: str) -> ChatbotResponse:
        # Handle final confirmation
        try:
            user_input_lower = user_input.lower()
            
            if any(word in user_input_lower for word in ['yes', 'y', 'ok', 'proceed', 'continue', 'हाँ', 'ஆம்']):
                self.conversation_state['step'] = 'processing'
                message = "Perfect! Processing your request now..."
            else:
                self.conversation_state['step'] = 'greeting'
                message = "No problem! Let's start over. How can I help you today?"
            
            return ChatbotResponse(
                message=message,
                next_step=self.conversation_state['step'],
                collected_data=self.conversation_state.copy(),
                confidence=0.95
            )
        except Exception as e:
            logger.error(f"Confirmation handling error: {e}")
            return self._handle_completion()
    
    def _handle_processing(self, user_input: str) -> ChatbotResponse:
        # Handle processing state
        message = "Your request is being processed. Please wait..."
        
        return ChatbotResponse(
            message=message,
            next_step='processing',
            collected_data=self.conversation_state.copy(),
            confidence=1.0
        )
    
    def _handle_completion(self) -> ChatbotResponse:
        # Handle conversation completion or unknown states
        self.conversation_state['step'] = 'greeting'
        message = "I'm not sure what happened. Let's start fresh. How can I help you today?"
        
        return ChatbotResponse(
            message=message,
            next_step='greeting',
            collected_data=self.conversation_state.copy(),
            confidence=0.5
        )
    
    def _detect_language(self, text: str) -> List[str]:
        # Detect language from tex
        try:
            patterns = {
                'hi': r'[\u0900-\u097F]+',
                'ta': r'[\u0B80-\u0BFF]+',
                'en': r'[a-zA-Z]+'
            }
            
            detected_langs = []
            for lang, pattern in patterns.items():
                if re.search(pattern, text):
                    detected_langs.append(lang)
            
            return detected_langs if detected_langs else ['en']
        except Exception as e:
            logger.error(f"Language detection error: {e}")
            return ['en']
    
    def _quick_service_detection(self, text: str) -> str:
        # Quick service type detection
        try:
            text_lower = text.lower()
            
            for service, data in SERVICE_INTENTS.items():
                keywords = data['keywords']
                matches = sum(1 for keyword in keywords if keyword in text_lower)
                if matches > 0:
                    return service
            
            return "General Services"
        except Exception as e:
            logger.error(f"Service detection error: {e}")
            return "General Services"
    
    def _quick_urgency_detection(self, text: str) -> str:
        # Quick urgency level detection
        try:
            text_lower = text.lower()
            
            high_urgency = ['urgent', 'emergency', 'asap', 'immediately', 'critical', 'help']
            medium_urgency = ['soon', 'quickly', 'today', 'fast', 'problem']
            
            if any(word in text_lower for word in high_urgency):
                return "High"
            elif any(word in text_lower for word in medium_urgency):
                return "Medium"
            else:
                return "Low"
        except Exception as e:
            logger.error(f"Urgency detection error: {e}")
            return "Low"

# Initialize chatbot
chatbot = ServiceRequestChatbot()

In [5]:
# Model Manager with N-gram Language Model Integration
class ModelManager:
    # Centralized model management with N-gram LM fusion

    def __init__(self):
        self._whisper_model = None
        self._translator = None
        self._intent_model = None
        self._intent_embeddings = None
        self._ngram_models = {}

    @property
    def whisper_model(self):
        if self._whisper_model is None:
            self._load_whisper()
        return self._whisper_model

    @property
    def whisper_processor(self):
        # Always use the base processor
        if not hasattr(self, '_whisper_processor'):
            self._whisper_processor = WhisperProcessor.from_pretrained("openai/whisper-medium")
        return self._whisper_processor

    @property
    def translator(self):
        if self._translator is None:
            self._load_translator()
        return self._translator

    @property
    def intent_model(self):
        if self._intent_model is None:
            self._load_intent_model()
        return self._intent_model

    @property
    def intent_embeddings(self):
        if self._intent_embeddings is None:
            self._compute_intent_embeddings()
        return self._intent_embeddings

    def _load_whisper(self):
        try:
            print("Loading Whisper model...")
            self._whisper_model = whisper.load_model("medium", device=device)
            if hasattr(self._whisper_model, 'half') and device == 'cuda':
                self._whisper_model = self._whisper_model.half()
            print("Whisper Medium loaded")
        except Exception as e:
            logger.error(f"Whisper loading failed: {e}")
            self._whisper_model = None

    def _load_translator(self):
        # Load NLLB translation model
        try:
            print("Loading NLLB translation model...")
            self._translator = pipeline(
                "translation",
                model="facebook/nllb-200-distilled-600M",
                device=0 if device == "cuda" else -1,
                torch_dtype=torch.float16 if device == "cuda" else torch.float32
            )
            print("✓ NLLB translation model loaded")
        except Exception as e:
            logger.error(f"Translation model failed: {e}")
            self._translator = None

    def _load_intent_model(self):
        # Load intent classification model
        try:
            print("Loading intent model...")
            self._intent_model = SentenceTransformer('all-MiniLM-L6-v2', device=device)
            print("✓ Intent model loaded")
        except Exception as e:
            logger.error(f"Intent model failed: {e}")
            self._intent_model = None

    def _compute_intent_embeddings(self):
        # Compute intent embeddings
        if self._intent_model is None:
            return

        try:
            print("Computing intent embeddings...")
            self._intent_embeddings = {}
            for intent, intent_data in SERVICE_INTENTS.items():
                description = intent_data['description']
                self._intent_embeddings[intent] = self._intent_model.encode(
                    description, convert_to_tensor=True, device=device
                )
            print(f"✓ Intent embeddings computed for {len(SERVICE_INTENTS)} categories")
        except Exception as e:
            logger.error(f"Intent embedding computation failed: {e}")
            self._intent_embeddings = None

    def load_ngram_model(self, language: str, model_path: str = None):
        # Load N-gram language model for specific language
        if model_path is None:
            self._create_simple_ngram_model(language)
        else:
            try:
                print(f"✓ N-gram model loaded for {language}")
            except Exception as e:
                logger.error(f"N-gram model loading failed for {language}: {e}")
                self._create_simple_ngram_model(language)

    def _create_simple_ngram_model(self, language: str):
        # Create a simple n-gram model from service descriptions
        try:
            text_data = []
            for intent, data in SERVICE_INTENTS.items():
                text_data.append(data['description'])
                text_data.extend(data['keywords'])

            with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
                for text in text_data:
                    f.write(text.lower() + '\n')
                temp_file = f.name

            print(f"✓ Simple n-gram model created for {language}")
            os.unlink(temp_file)

        except Exception as e:
            logger.error(f"Simple n-gram model creation failed: {e}")

    def apply_ngram_fusion(self, text: str, language: str, alpha: float = 0.3) -> str:
        # Apply N-gram language model fusion to improve text quality
        if language not in self._ngram_models:
            return text

        try:
            words = text.split()
            if len(words) < 2:
                return text

            corrected_words = []
            for word in words:
                corrected_word = self._apply_simple_corrections(word, language)
                corrected_words.append(corrected_word)

            return ' '.join(corrected_words)

        except Exception as e:
            logger.error(f"N-gram fusion failed: {e}")
            return text

    def _apply_simple_corrections(self, word: str, language: str) -> str:
        # Apply simple word corrections
        corrections = {
            'plumbing': ['plumber', 'plumbing', 'pipe', 'water'],
            'electrical': ['electrician', 'electrical', 'power', 'light'],
            'medical': ['doctor', 'medical', 'health', 'hospital'],
            'transport': ['taxi', 'transport', 'vehicle', 'ride']
        }

        word_lower = word.lower()
        for category, variants in corrections.items():
            if word_lower in variants:
                return word

        return word

# Initialize  model manager
model_manager = ModelManager()

In [6]:
# ASR with N-gram Language Model Fusion
import torch
import numpy as np
import whisper
import librosa
import tempfile
import soundfile as sf
import os

class ASRWithNgram:
    # Advanced ASR with N-gram language model fusion

    def __init__(self, model_manager: ModelManager):
        self.model_manager = model_manager
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    def transcribe_with_ngram_fusion(self, audio_path: str, language: str = None) -> TranscriptionResult:
        # Transcription with N-gram language model fusion
        start_time = time.time()
        
        try:
            if not self.model_manager.whisper_model:
                return TranscriptionResult(
                    text="Audio transcription unavailable",
                    language="en",
                    confidence=0.0,
                    processing_time=time.time() - start_time
                )
            
            # Direct fix: Force everything to float32 at the torch level
            result = self._transcribe_with_direct_fix(audio_path, language)
            
            transcribed_text = result["text"].strip()
            detected_language = result.get("language", "en")
            
            # Apply N-gram language model fusion if needed
            if detected_language in ['hi', 'ta', 'en']:
                _text = self.model_manager.apply_ngram_fusion(
                    transcribed_text, 
                    detected_language,
                    alpha=0.3
                )
            else:
                _text = transcribed_text
            
            confidence = self._calculate_confidence(result)
            
            return TranscriptionResult(
                text=_text,
                language=detected_language,
                confidence=confidence,
                processing_time=time.time() - start_time
            )
            
        except Exception as e:
            logger.error(f"Transcription error: {e}")
            return TranscriptionResult(
                text=f"Transcription error: {e}",
                language="en",
                confidence=0.0,
                processing_time=time.time() - start_time
            )

    def _transcribe_with_direct_fix(self, audio_path: str, language: str = None):
       
        # Force model to float32 permanently
        self._force_model_to_float32()
        
        # Set torch default dtype to float32
        original_dtype = torch.get_default_dtype()
        torch.set_default_dtype(torch.float32)
        
        # Patch tensor creation functions
        original_tensor = torch.tensor
        original_from_numpy = torch.from_numpy
        
        def patched_tensor(*args, **kwargs):
            if 'dtype' not in kwargs:
                kwargs['dtype'] = torch.float32
            return original_tensor(*args, **kwargs)
        
        def patched_from_numpy(*args, **kwargs):
            result = original_from_numpy(*args, **kwargs)
            return result.float()
        
        # Apply patches
        torch.tensor = patched_tensor
        torch.from_numpy = patched_from_numpy
        
        try:
            # Force disable any fp16 usage
            result = self.model_manager.whisper_model.transcribe(
                audio_path,
                language='en',
                task="translate" if language and language != 'en' else "transcribe",
                fp16=False,
                temperature=0.0,
                best_of=5,
                beam_size=5,
                patience=1.0,
                condition_on_previous_text=True,
                verbose=False
            )
            return result
            
        finally:
            # Restore original functions
            torch.tensor = original_tensor
            torch.from_numpy = original_from_numpy
            torch.set_default_dtype(original_dtype)

    def _force_model_to_float32(self):
        # Force the entire model to float32 precision
        try:
            model = self.model_manager.whisper_model
            
            # Convert all model parameters to float32
            model.float()
            
            # Ensure model is on correct device
            model.to(self.device)
            
            # Force all submodules to float32
            for module in model.modules():
                if hasattr(module, 'float'):
                    module.float()
            
            # Special handling for specific Whisper components
            if hasattr(model, 'encoder'):
                model.encoder.float()
            if hasattr(model, 'decoder'):
                model.decoder.float()
                
            logger.info("Successfully forced model to float32")
            
        except Exception as e:
            logger.warning(f"Failed to force model to float32: {e}")

    def _calculate_confidence(self, result: dict) -> float:
        # Calculate confidence from Whisper segments
        try:
            if "segments" not in result or not result["segments"]:
                return 0.7

            confidences = []
            for segment in result["segments"]:
                if "avg_logprob" in segment and segment["avg_logprob"] is not None:
                    log_prob = float(segment["avg_logprob"])
                    conf = min(1.0, max(0.0, np.exp(log_prob)))
                    confidences.append(conf)
                elif "no_speech_prob" in segment and segment["no_speech_prob"] is not None:
                    no_speech = float(segment["no_speech_prob"])
                    conf = max(0.0, 1.0 - no_speech)
                    confidences.append(conf)

            if confidences:
                return float(np.mean(confidences))
            else:
                return 0.7
                
        except Exception as e:
            logger.error(f"Confidence calculation error: {e}")
            return 0.7

# Creating a wrapper to bypass encoding issue
class WhisperFloat32Wrapper:
    # Wrapper that ensures all Whisper operations use float32
    
    def __init__(self, original_model):
        self.original_model = original_model
        self._ensure_float32()
    
    def _ensure_float32(self):
        # Ensure model is completely in float32
        self.original_model.float()
        for param in self.original_model.parameters():
            param.data = param.data.float()
    
    def transcribe(self, audio, **kwargs):
        kwargs['fp16'] = False      
        # If audio is a tensor, ensure it's float32
        if isinstance(audio, torch.Tensor):
            audio = audio.float()
        
        # Set default dtype context with correct device_type
        device_type = 'cuda' if torch.cuda.is_available() else 'cpu'
        with torch.autocast(device_type=device_type, enabled=False):
            return self.original_model.transcribe(audio, **kwargs)

    def __getattr__(self, name):
        # Delegate all other attributes to original model
        return getattr(self.original_model, name)

# Initialization - wrapping the model without setting property
class ASRWithWrappedModel(ASRWithNgram):
    # ASR that wraps the whisper model internally
    
    def __init__(self, model_manager: ModelManager):
        super().__init__(model_manager)
        # Create wrapped model internally
        if model_manager.whisper_model:
            self.wrapped_model = WhisperFloat32Wrapper(model_manager.whisper_model)
        else:
            self.wrapped_model = None
    
    def _transcribe_with_direct_fix(self, audio_path: str, language: str = None):
        # Direct fix using wrapped model
        
        if not self.wrapped_model:
            raise Exception("Wrapped model not available")
        
        # Set torch default dtype to float32
        original_dtype = torch.get_default_dtype()
        torch.set_default_dtype(torch.float32)
        
        try:
            # Use wrapped model instead of original
            result = self.wrapped_model.transcribe(
                audio_path,
                language='en',
                task="translate" if language and language != 'en' else "transcribe",
                fp16=False,
                temperature=0.0,
                best_of=5,
                beam_size=5,
                patience=1.0,
                condition_on_previous_text=True,
                verbose=False
            )
            return result
            
        finally:
            torch.set_default_dtype(original_dtype)

_asr = ASRWithWrappedModel(model_manager)

Loading Whisper model...


100%|█████████████████████████████████████| 1.42G/1.42G [00:29<00:00, 52.3MiB/s]


Whisper Medium loaded


In [7]:
# Translation with NLLB-only
class NLLBTranslator:
    """ translator using only NLLB-200"""
    
    def __init__(self, model_manager: ModelManager):
        self.model_manager = model_manager
        self.nllb_map = {
            'hi': 'hin_Deva',
            'ta': 'tam_Taml', 
            'en': 'eng_Latn'
        }
    
    def translate_to_english(self, text: str, source_language: str) -> str:
        """ translation using NLLB-200 only"""
        try:
            if not text or not text.strip():
                return text
            
            # If already English, return as is
            if source_language == 'en':
                return self._enhance_text(text)
            
            if not self.model_manager.translator:
                return self._enhance_text(text)
            
            src_lang = self.nllb_map.get(source_language, 'hin_Deva')
            
            # Split into sentences for better translation
            sentences = self._split_sentences(text)
            translated_sentences = []
            
            for sentence in sentences:
                if not sentence.strip():
                    continue
                
                try:
                    result = self.model_manager.translator(
                        sentence,
                        src_lang=src_lang,
                        tgt_lang='eng_Latn',
                        max_length=512,
                        num_beams=4,
                        early_stopping=True,
                        do_sample=False
                    )
                    
                    translated_text = result[0]['translation_text']
                    translated_sentences.append(translated_text.strip())
                    
                except Exception as e:
                    logger.warning(f"Sentence translation failed: {e}")
                    translated_sentences.append(sentence)
            
            # Reconstruct text
            full_translation = '. '.join(translated_sentences)
            if not full_translation.endswith(('.', '!', '?')):
                full_translation += '.'
            
            return self._enhance_text(full_translation)
            
        except Exception as e:
            logger.error(f"Translation error: {e}")
            return self._enhance_text(text)
    
    def _split_sentences(self, text: str) -> List[str]:
        """Split text into sentences"""
        try:
            sentences = re.split(r'[.!?]+', text)
            return [s.strip() for s in sentences if s.strip()]
        except Exception as e:
            logger.error(f"Sentence splitting error: {e}")
            return [text]
    
    def _enhance_text(self, text: str) -> str:
        """Enhance text with proper punctuation and formatting"""
        try:
            if not text:
                return text
            
            # Normalize whitespace
            text = re.sub(r'\s+', ' ', text.strip())
            
            # Ensure proper sentence ending
            if text and text[-1] not in '.!?':
                question_words = ['what', 'when', 'where', 'who', 'why', 'how']
                if any(word in text.lower() for word in question_words):
                    text += '?'
                else:
                    text += '.'
            
            # Capitalize first letter
            if text:
                text = text[0].upper() + text[1:] if len(text) > 1 else text.upper()
            
            return text
        except Exception as e:
            logger.error(f"Text enhancement error: {e}")
            return text

# Initialize  translator
_translator = NLLBTranslator(model_manager)

In [8]:
# Intent Classification
class IntentClassifier:    
    def __init__(self, model_manager: ModelManager):
        self.model_manager = model_manager
    
    def classify_intent(self, text: str) -> Tuple[str, float, Dict[str, float]]:
        # Intent classification with multiple strategies
        try:
            if not self.model_manager.intent_model or not self.model_manager.intent_embeddings:
                return self._fallback_classification(text)
            
            # Preprocess text
            processed_text = self._preprocess_text(text)
            
            # Semantic similarity approach
            semantic_scores = self._semantic_classification(processed_text)
            
            # Keyword-based approach
            keyword_scores = self._keyword_classification(processed_text)
            
            # Combine scores with weights (70% semantic, 30% keyword)
            combined_scores = {}
            for intent in SERVICE_INTENTS.keys():
                semantic_score = semantic_scores.get(intent, 0.0)
                keyword_score = keyword_scores.get(intent, 0.0)
                combined_scores[intent] = 0.7 * semantic_score + 0.3 * keyword_score
            
            # Get best intent
            best_intent = max(combined_scores, key=combined_scores.get)
            confidence = combined_scores[best_intent]
            
            # Apply urgency boost
            urgency_boost = self._calculate_urgency_boost(processed_text, best_intent)
            final_confidence = min(confidence + urgency_boost, 1.0)
            
            # Sort all scores
            sorted_scores = dict(sorted(combined_scores.items(), key=lambda x: x[1], reverse=True))
            
            return best_intent, final_confidence, sorted_scores
            
        except Exception as e:
            logger.error(f"Intent classification error: {e}")
            return self._fallback_classification(text)
    
    def _preprocess_text(self, text: str) -> str:
        # Advanced text preprocessing
        try:
            text = text.lower().strip()
            text = re.sub(r'[^\w\s.!?]', ' ', text)
            text = re.sub(r'\s+', ' ', text)
            
            # Remove common stop words but keep important ones
            stop_words = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by'}
            words = text.split()
            filtered_words = [word for word in words if word not in stop_words or len(word) > 3]
            
            return ' '.join(filtered_words)
        except Exception as e:
            logger.error(f"Text preprocessing error: {e}")
            return text
    
    def _semantic_classification(self, text: str) -> Dict[str, float]:
        # Semantic similarity-based classification
        try:
            text_embedding = self.model_manager.intent_model.encode(
                text, convert_to_tensor=True, device=device
            )
            
            similarities = {}
            for intent, intent_emb in self.model_manager.intent_embeddings.items():
                similarity = torch.nn.functional.cosine_similarity(
                    text_embedding.unsqueeze(0), intent_emb.unsqueeze(0)
                ).item()
                similarities[intent] = max(0.0, similarity)
            
            return similarities
        except Exception as e:
            logger.error(f"Semantic classification error: {e}")
            return {}
    
    def _keyword_classification(self, text: str) -> Dict[str, float]:
        # Keyword-based classification
        try:
            scores = {}
            
            for intent, intent_data in SERVICE_INTENTS.items():
                keywords = intent_data['keywords']
                urgency_indicators = intent_data.get('urgency_indicators', [])
                
                keyword_matches = sum(1 for keyword in keywords if keyword in text)
                urgency_matches = sum(1 for indicator in urgency_indicators if indicator in text)
                
                keyword_score = keyword_matches / len(keywords) if keywords else 0
                urgency_score = urgency_matches / len(urgency_indicators) if urgency_indicators else 0
                
                total_score = 0.8 * keyword_score + 0.2 * urgency_score
                scores[intent] = total_score
            
            return scores
        except Exception as e:
            logger.error(f"Keyword classification error: {e}")
            return {}
    
    def _calculate_urgency_boost(self, text: str, intent: str) -> float:
        # Calculate urgency-based confidence boost
        try:
            high_urgency = ['urgent', 'emergency', 'asap', 'immediately', 'critical', 'help']
            medium_urgency = ['quickly', 'soon', 'today', 'fast', 'problem', 'issue']
            
            intent_urgency = SERVICE_INTENTS.get(intent, {}).get('urgency_indicators', [])
            
            high_count = sum(1 for word in high_urgency if word in text)
            medium_count = sum(1 for word in medium_urgency if word in text)
            intent_urgency_count = sum(1 for word in intent_urgency if word in text)
            
            if high_count > 0 or intent_urgency_count > 0:
                return 0.15
            elif medium_count > 0:
                return 0.08
            else:
                return 0.0
        except Exception as e:
            logger.error(f"Urgency boost calculation error: {e}")
            return 0.0
    
    def _fallback_classification(self, text: str) -> Tuple[str, float, Dict[str, float]]:
        # Fallback classification using simple keyword matching
        try:
            text_lower = text.lower()
            scores = {}
            
            for intent, intent_data in SERVICE_INTENTS.items():
                keywords = intent_data['keywords']
                matches = sum(1 for keyword in keywords if keyword in text_lower)
                if matches > 0:
                    scores[intent] = matches / len(keywords)
            
            if scores:
                best_intent = max(scores, key=scores.get)
                confidence = min(scores[best_intent] * 3, 1.0)
                sorted_scores = dict(sorted(scores.items(), key=lambda x: x[1], reverse=True))
                return best_intent, confidence, sorted_scores
            else:
                return "General Services", 0.5, {"General Services": 0.5}
        except Exception as e:
            logger.error(f"Fallback classification error: {e}")
            return "General Services", 0.5, {"General Services": 0.5}
    
    def get_urgency_level(self, text: str) -> str:
        # Urgency level detection
        try:
            high_urgency = ['urgent', 'emergency', 'asap', 'immediately', 'critical', 'help']
            medium_urgency = ['quickly', 'soon', 'today', 'fast', 'problem', 'issue', 'needed']
            
            text_lower = text.lower()
            
            high_count = sum(1 for word in high_urgency if word in text_lower)
            medium_count = sum(1 for word in medium_urgency if word in text_lower)
            
            exclamation_count = text.count('!')
            caps_ratio = sum(1 for c in text if c.isupper()) / len(text) if text else 0
            
            if high_count >= 1 or exclamation_count >= 2 or caps_ratio > 0.3:
                return "High"
            elif medium_count >= 1 or exclamation_count >= 1:
                return "Medium"
            else:
                return "Low"
        except Exception as e:
            logger.error(f"Urgency level detection error: {e}")
            return "Low"

# Initializing intent classifier
_classifier = IntentClassifier(model_manager)

In [9]:
# Main Service Request Analyzer with Chatbot Integration
class ServiceRequestAnalyzer:
    # Main service request analysis pipeline with chatbot integration

    def __init__(self):
        self.asr = _asr
        self.translator = _translator
        self.classifier = _classifier
        self.chatbot = chatbot

    def analyze_request(self, 
                        text_input: Optional[str] = None,
                        audio_file: Optional[dict] = None,
                        chatbot_context: Optional[Dict] = None) -> AnalysisResult:
        # Analysis pipeline with chatbot integration
        start_time = time.time()
        detected_languages = ['en']
        original_text = ""

        try:
            # Process audio input
            if audio_file is not None:
                transcription_result = self._process_audio_file(audio_file)

                if "error" in transcription_result.text.lower():
                    return self._create_error_result(transcription_result.text, start_time)

                original_text = transcription_result.text
                detected_languages = [transcription_result.language]

            elif text_input:
                text_input = text_input.strip()
                if not text_input:
                    raise ValueError("Empty text input")

                detected_languages = self._detect_languages(text_input)
                original_text = self._enhance_text(text_input, detected_languages[0])

            else:
                raise ValueError("No input provided")

            if not original_text or not original_text.strip():
                raise ValueError("No valid text to process")

            # Translation using NLLB-only
            translated_text = original_text
            primary_lang = detected_languages[0] if detected_languages else 'en'

            if primary_lang != 'en':
                translated_text = self.translator.translate_to_english(original_text, primary_lang)

            # Intent classification
            intent, confidence, all_scores = self.classifier.classify_intent(translated_text)
            urgency = self.classifier.get_urgency_level(translated_text)

            # Get top 3 matches
            top_matches = dict(list(all_scores.items())[:3])

            # Include chatbot context
            context = chatbot_context or {}

            return AnalysisResult(
                original_text=original_text,
                translated_text=translated_text,
                detected_languages=detected_languages,
                intent=intent,
                confidence=confidence,
                urgency=urgency,
                top_matches=top_matches,
                processing_time=time.time() - start_time,
                chatbot_context=context
            )

        except Exception as e:
            logger.error(f"Analysis error: {e}")
            return self._create_error_result(str(e), start_time)

    def _process_audio_file(self, audio_file: dict) -> TranscriptionResult:
        # Process uploaded audio file
        try:
            with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as temp_file:
                temp_file.write(audio_file['content'])
                temp_file.flush()

            result = self.asr.transcribe_with_ngram_fusion(temp_file.name)

            # Clean up
            try:
                os.unlink(temp_file.name)
            except Exception:
                pass

            return result

        except Exception as e:
            logger.error(f"Audio processing error: {e}")
            return TranscriptionResult(
                text=f"Audio processing error: {e}",
                language="en",
                confidence=0.0,
                processing_time=0.0
            )

    def _detect_languages(self, text: str) -> List[str]:
        # Detect languages in text
        try:
            patterns = {
                'hi': r'[ऀ-ॿ]+',
                'ta': r'[\u0b80-\u0bff]+',
                'en': r'[a-zA-Z]+'
            }

            detected_langs = []
            for lang, pattern in patterns.items():
                if re.search(pattern, text):
                    detected_langs.append(lang)

            return detected_langs if detected_langs else ['en']
        except Exception as e:
            logger.error(f"Language detection error: {e}")
            return ['en']

    def _enhance_text(self, text: str, language: str) -> str:
        # Enhance text with proper formatting
        try:
            text = re.sub(r'\s+', ' ', text.strip())

            if text and text[-1] not in '.!?':
                question_words = ['what', 'when', 'where', 'who', 'why', 'how', 'क्या', 'कब', 'कहाँ']
                if any(word in text.lower() for word in question_words):
                    text += '?'
                else:
                    text += '.'

            return text
        except Exception as e:
            logger.error(f"Text enhancement error: {e}")
            return text

    def _create_error_result(self, error_msg: str, start_time: float) -> AnalysisResult:
        # Create error result
        return AnalysisResult(
            original_text=error_msg,
            translated_text=error_msg,
            detected_languages=['en'],
            intent="General Services",
            confidence=0.0,
            urgency="Low",
            top_matches={"General Services": 0.0},
            processing_time=time.time() - start_time,
            chatbot_context={}
        )

# Initialize service request analyzer
analyzer = ServiceRequestAnalyzer()

In [10]:
# Main User Interface
import ipywidgets as widgets
from IPython.display import display, HTML

class ImprovedServiceUI:    
    def __init__(self, analyzer: ServiceRequestAnalyzer):
        self.analyzer = analyzer
        self.chatbot = chatbot
        self.setup_ui()
    
    def setup_ui(self):
        display(HTML("""
        <style>
        body {
            background-color: #1a1a1a !important;
            color: #ffffff !important;
        }
        .widget-text, .widget-textarea, .widget-button {
            font-family: 'Helvetica', Arial, sans-serif !important;
            background-color: #2d2d2d !important;
            color: #ffffff !important;
            border: 1px solid #444444 !important;
        }
        .widget-button {
            background-color: #0DC4D9 !important;
            color: #ffffff !important;
            border: none !important;
            padding: 12px 24px !important;
            border-radius: 6px !important;
            font-size: 14px !important;
            font-weight: 500 !important;
            min-width: 140px !important;
            transition: background-color 0.3s ease !important;
        }
        .widget-button:hover {
            background-color: #FF8C00 !important;
        }
        .main-container {
            display: flex;
            flex-direction: column;
            gap: 20px;
            background-color: #1a1a1a;
        }
        .content-container {
            display: flex;
            gap: 20px;
            align-items: flex-start;
        }
        .left-panel {
            flex: 1;
            width: 50%;
            min-width: 50%;
        }
        .right-panel {
            flex: 1;
            width: 50%;
            min-width: 50%;
            border-left: 2px solid #444444;
            padding-left: 20px;
        }
        .section-header {
            background-color: #333333;
            padding: 8px 12px;
            border-radius: 5px;
            margin: 15px 0 10px 0;
            border-left: 3px solid #0DC4D9;
        }
        .main-header {
            text-align: center;
            padding: 20px;
            background: linear-gradient(135deg, #2d2d2d 0%, #3d3d3d 100%);
            border-radius: 8px;
            margin-bottom: 20px;
            border: 1px solid #444444;
        }
        .button-group {
            display: flex;
            gap: 10px;
            margin: 15px 0;
            flex-wrap: wrap;
        }
        .jupyter-widgets {
            background-color: #1a1a1a !important;
        }
        </style>
        """))
        
        self.header = widgets.HTML(
            value="""
            <div class="main-header">
                <h1 style="color: #ffffff; margin-bottom: 8px; font-family: 'Helvetica', Arial, sans-serif; font-size: 32px; font-weight: 700;">Multilingual Service Request Analyzer</h1>
                <p style="color: #cccccc; margin-top: 0; font-family: 'Helvetica', Arial, sans-serif; font-size: 18px;">Supports Hindi, Tamil, and English</p>
            </div>
            """
        )
        
        self.text_input = widgets.Textarea(
            placeholder="Enter your service request in Hindi, Tamil, or English...",
            layout=widgets.Layout(width='99%', height='120px')
        )
        
        self.audio_upload = widgets.FileUpload(
            accept='.wav,.mp3,.m4a,.ogg',
            multiple=False,
            description='Upload Audio'
        )
        
        self.analyze_text_btn = widgets.Button(
            description="Analyze Text",
            button_style='primary',
            layout=widgets.Layout(width='150px', height='45px')
        )
        
        self.analyze_audio_btn = widgets.Button(
            description="Process Audio",
            button_style='success',
            layout=widgets.Layout(width='150px', height='45px')
        )
        
        self.clear_btn = widgets.Button(
            description="Clear All",
            button_style='warning',
            layout=widgets.Layout(width='120px', height='45px')
        )
        
        self.results_display = widgets.HTML(
            value="<p style='color: #cccccc; padding: 20px; text-align: center; font-family: Helvetica, Arial, sans-serif;'>Results will appear here after analysis...</p>"
        )
        
        self.chat_input = widgets.Text(
            placeholder="Chat with assistant...",
            layout=widgets.Layout(width='100%')
        )
        
        self.chat_send_btn = widgets.Button(
            description="Send",
            button_style='info',
            layout=widgets.Layout(width='80px', height='35px')
        )
        
        self.chat_output = widgets.HTML(
            value="""
            <div style="background: #2d2d2d; padding: 15px; border-radius: 5px; min-height: 200px; max-height: 400px; overflow-y: auto; border: 1px solid #444444;">
                <p style="color: #ffffff; font-family: 'Helvetica', Arial, sans-serif;"><strong>Assistant:</strong> Hello! How can I help you today?</p>
            </div>
            """
        )
        
        self.analyze_text_btn.on_click(self._on_analyze_text)
        self.analyze_audio_btn.on_click(self._on_analyze_audio)
        self.clear_btn.on_click(self._on_clear)
        self.chat_send_btn.on_click(self._on_chat_send)
        self.chat_input.on_submit(self._on_chat_send)
        
        self._display_ui()
    
    def _display_ui(self):
        left_panel = widgets.VBox([
            widgets.HTML('<div class="section-header"><h3 style="margin: 0; color: #ffffff; font-family: Helvetica, Arial, sans-serif;">Text Input</h3></div>'),
            self.text_input,
            widgets.HTML('<div class="section-header"><h3 style="margin: 0; color: #ffffff; font-family: Helvetica, Arial, sans-serif;">Audio Input</h3></div>'),
            self.audio_upload,
            widgets.HTML('<div class="section-header"><h3 style="margin: 0; color: #ffffff; font-family: Helvetica, Arial, sans-serif;">Actions</h3></div>'),
            widgets.HBox([
                self.analyze_text_btn,
                self.analyze_audio_btn,
                self.clear_btn
            ], layout=widgets.Layout(gap='10px', overflow='hidden', flex_flow='row wrap'))
        ], layout=widgets.Layout(width='50%', flex='1 1 50%'))
        
        right_panel = widgets.VBox([
            widgets.HTML('<div class="section-header"><h3 style="margin: 0; color: #ffffff; font-family: Helvetica, Arial, sans-serif;">AI Assistant</h3></div>'),
            self.chat_output,
            widgets.HTML('<h4 style="margin: 15px 0 5px 0; color: #ffffff; font-family: Helvetica, Arial, sans-serif;">Send Message</h4>'),
            widgets.HBox([
                self.chat_input,
                self.chat_send_btn
            ], layout=widgets.Layout(gap='10px'))
        ], layout=widgets.Layout(width='50%', flex='1 1 50%'))
        
        content_container = widgets.HBox([
            left_panel,
            right_panel
        ], layout=widgets.Layout(gap='20px', display='flex', align_items='stretch'))
        
        results_section = widgets.VBox([
            widgets.HTML('<div class="section-header"><h3 style="margin: 0; color: #ffffff; font-family: Helvetica, Arial, sans-serif;">Analysis Results</h3></div>'),
            self.results_display
        ])
        
        main_container = widgets.VBox([
            self.header,
            content_container,
            results_section
        ])

        display(main_container)
    
    def _on_analyze_text(self, button):
        try:
            text = self.text_input.value.strip()
            if not text:
                self.results_display.value = "<p style='color: #ff6b6b; padding: 15px; font-family: Helvetica, Arial, sans-serif;'>Please enter some text to analyze.</p>"
                return
            self.results_display.value = "<p style='color: #4dabf7; padding: 15px; font-family: Helvetica, Arial, sans-serif;'>Processing text input...</p>"
            result = self.analyzer.analyze_request(text_input=text)
            self._display_results(result)
        except Exception as e:
            self.results_display.value = f"<p style='color: #ff6b6b; padding: 15px; font-family: Helvetica, Arial, sans-serif;'>Error: {e}</p>"
    
    def _on_analyze_audio(self, button):
        # Handle audio analysis with optimized processing
        try:
            if not self.audio_upload.value:
                self.results_display.value = "<p style='color: #ff6b6b; padding: 15px; font-family: Helvetica, Arial, sans-serif;'>Please upload an audio file first.</p>"
                return
    
            # Show processing indicator
            self.results_display.value = "<p style='color: #4dabf7; padding: 15px; font-family: Helvetica, Arial, sans-serif;'>Processing audio file... This may take a moment.</p>"
    
            # Get uploaded file - do not use 'metadata'
            if isinstance(self.audio_upload.value, tuple) and len(self.audio_upload.value) > 0:
                uploaded_file = self.audio_upload.value[0]
                audio_data = {
                    'content': uploaded_file['content']
                    # Optionally add: 'name': uploaded_file.get('name'), 'type': uploaded_file.get('type')
                }
            else:
                self.results_display.value = "<div>No file uploaded.</div>"
                return
    
            # Process audio with optimized pipeline
            result = self.analyzer.analyze_request(audio_file=audio_data)
            self._display_results(result)
    
        except Exception as e:
            self.results_display.value = f"<p style='color: #ff6b6b; padding: 15px; font-family: Helvetica, Arial, sans-serif;'>Audio processing error: {e}</p>"

    def _on_clear(self, button):
        self.text_input.value = ""
        self.audio_upload.value = ()
        self.chat_input.value = ""
        self.results_display.value = "<p style='color: #cccccc; padding: 20px; text-align: center; font-family: Helvetica, Arial, sans-serif;'>All inputs cleared. Ready for new analysis.</p>"
        self.chat_output.value = """
        <div style="background: #2d2d2d; padding: 15px; border-radius: 5px; min-height: 50px; max-height: 50px; overflow-y: auto; border: 1px solid #444444;">
            <p style="color: #ffffff; font-family: 'Helvetica', Arial, sans-serif;"><strong>Assistant:</strong> Hello! How can I help you today?</p>
        </div>
        """
        self.chatbot.reset_conversation()
    
    def _on_chat_send(self, button=None):
        try:
            user_message = self.chat_input.value.strip()
            if not user_message:
                return
            response = self.chatbot.process_user_input(user_message)
            current_chat = self.chat_output.value
            new_message = f"""
            <div style="margin: 10px 0; padding: 10px; background: #3d3d3d; border-radius: 5px; border-left: 3px solid #4dabf7;">
                <p style="margin: 0; color: #ffffff; font-family: 'Helvetica', Arial, sans-serif;"><strong>You:</strong> {user_message}</p>
            </div>
            <div style="margin: 10px 0; padding: 10px; background: #1e3a8a; border-radius: 5px; border-left: 3px solid #60a5fa;">
                <p style="margin: 0; color: #ffffff; font-family: 'Helvetica', Arial, sans-serif;"><strong>Assistant:</strong> {response.message}</p>
            </div>
            """
            self.chat_output.value = f"""
            <div style="background: #2d2d2d; padding: 15px; border-radius: 5px; min-height: 200px; max-height: 400px; overflow-y: auto; border: 1px solid #444444;">
                {new_message}
            </div>
            """
            self.chat_input.value = ""
            if response.next_step == 'processing' and response.collected_data.get('description'):
                result = self.analyzer.analyze_request(
                    text_input=response.collected_data['description'],
                    chatbot_context=response.collected_data
                )
                self._display_results(result)
        except Exception as e:
            self.chat_output.value = f"""
            <div style="background: #2d2d2d; padding: 15px; border-radius: 5px; border: 1px solid #444444;">
                <p style="color: #ff6b6b; font-family: 'Helvetica', Arial, sans-serif;"><strong>Error:</strong> {e}</p>
            </div>
            """
    
    def _display_results(self, result):
        try:
            lang_map = {'hi': 'Hindi', 'ta': 'Tamil', 'en': 'English'}
            detected_names = [lang_map.get(lang, lang) for lang in result.detected_languages]
            conf_color = "#51cf66" if result.confidence > 0.7 else "#ffd43b" if result.confidence > 0.4 else "#ff6b6b"
            urgency_colors = {"High": "#ff6b6b", "Medium": "#ffd43b", "Low": "#51cf66"}
            urgency_color = urgency_colors.get(result.urgency, "#adb5bd")
            results_html = f"""
            <div style="border: 1px solid #444444; border-radius: 8px; padding: 20px; background: #2d2d2d; margin: 10px 0; font-family: 'Helvetica', Arial, sans-serif;">
                <h4 style="color: #ffffff; margin-top: 0; border-bottom: 2px solid #4dabf7; padding-bottom: 10px;">Analysis Results</h4>
                <table style="width: 100; border-collapse: collapse; margin: 15px 0;">
                    <tr style="background: #3d3d3d;">
                        <td style="padding: 12px; font-weight: bold; border: 1px solid #555555; width: 30%; color: #ffffff;">Original Text</td>
                        <td style="padding: 12px; border: 1px solid #555555; color: #ffffff;">{result.original_text}</td>
                    </tr>
                    <tr>
                        <td style="padding: 12px; font-weight: bold; border: 1px solid #555555; color: #ffffff;">Translated Text</td>
                        <td style="padding: 12px; border: 1px solid #555555; color: #ffffff;">{result.translated_text}</td>
                    </tr>
                    <tr style="background: #3d3d3d;">
                        <td style="padding: 12px; font-weight: bold; border: 1px solid #555555; color: #ffffff;">Detected Language</td>
                        <td style="padding: 12px; border: 1px solid #555555; color: #ffffff;">{", ".join(detected_names)}</td>
                    </tr>
                    <tr>
                        <td style="padding: 12px; font-weight: bold; border: 1px solid #555555; color: #ffffff;">Service Category</td>
                        <td style="padding: 12px; border: 1px solid #555555;">
                            <span style="background: #4dabf7; color: #ffffff; padding: 4px 8px; border-radius: 4px; font-size: 12px;">{result.intent}</span>
                        </td>
                    </tr>
                    <tr style="background: #3d3d3d;">
                        <td style="padding: 12px; font-weight: bold; border: 1px solid #555555; color: #ffffff;">Urgency Level</td>
                        <td style="padding: 12px; border: 1px solid #555555;">
                            <span style="color: {urgency_color}; font-weight: bold;">{result.urgency}</span>
                        </td>
                    </tr>
                    <tr>
                        <td style="padding: 12px; font-weight: bold; border: 1px solid #555555; color: #ffffff;">Confidence</td>
                        <td style="padding: 12px; border: 1px solid #555555;">
                            <span style="color: {conf_color}; font-weight: bold;">{result.confidence:.1%}</span>
                        </td>
                    </tr>
                    <tr style="background: #3d3d3d;">
                        <td style="padding: 12px; font-weight: bold; border: 1px solid #555555; color: #ffffff;">Processing Time</td>
                        <td style="padding: 12px; border: 1px solid #555555; color: #ffffff;">{result.processing_time:.2f}s</td>
                    </tr>
                </table>
                <h5 style="color: #ffffff; margin: 20px 0 10px 0;">Top Service Matches</h5>
                <ul style="margin: 0; padding-left: 20px; color: #ffffff;">
            """
            for intent, score in list(result.top_matches.items())[:3]:
                results_html += f"<li style='margin: 5px 0; color: #ffffff;'><strong>{intent}:</strong> {score:.1%}</li>"
            results_html += """
                </ul>
            </div>
            """
            self.results_display.value = results_html
        except Exception as e:
            self.results_display.value = f"<p style='color: #ff6b6b; padding: 15px; font-family: Helvetica, Arial, sans-serif;'>Display error: {e}</p>"

# Initialize the UI
ui = ImprovedServiceUI(analyzer)

VBox(children=(HTML(value='\n            <div class="main-header">\n                <h1 style="color: #ffffff;…

In [11]:
# # Hindi Language Evaluation - CER & WER
# import evaluate
# from datasets import load_dataset
# import pandas as pd
# import numpy as np
# import os

# print("=== HINDI LANGUAGE EVALUATION ===")

# # Configuration
# MODEL_DIR = "./whisper_finetuned"
# BASE_MODEL = "openai/whisper-medium"
# TEST_SPLIT = "test"
# MAX_SAMPLES = 50
# LANG_CODE = "hi_in"
# LANG_NAME = "Hindi"

# # Load metrics
# wer_metric = evaluate.load("wer")
# cer_metric = evaluate.load("cer")

# # Load model
# from transformers import WhisperForConditionalGeneration, WhisperProcessor
# import torch

# device = "cuda" if torch.cuda.is_available() else "cpu"

# if os.path.exists(MODEL_DIR) and os.path.exists(os.path.join(MODEL_DIR, "config.json")):
#     print(f"Loading fine-tuned model from {MODEL_DIR}")
#     processor = WhisperProcessor.from_pretrained(MODEL_DIR)
#     model = WhisperForConditionalGeneration.from_pretrained(MODEL_DIR).to(device)
#     model_type = "Fine-tuned"
# else:
#     print(f"Using base model: {BASE_MODEL}")
#     processor = WhisperProcessor.from_pretrained(BASE_MODEL)
#     model = WhisperForConditionalGeneration.from_pretrained(BASE_MODEL).to(device)
#     model_type = "Base"

# print(f"Model loaded: {model_type} Whisper Medium")

# # Load Hindi dataset
# try:
#     ds = load_dataset("google/fleurs", LANG_CODE, split=TEST_SPLIT, trust_remote_code=True)
#     if MAX_SAMPLES:
#         ds = ds.select(range(min(len(ds), MAX_SAMPLES)))
#     print(f"Loaded {len(ds)} samples for {LANG_NAME}")
# except Exception as e:
#     print(f"Failed to load {LANG_NAME} dataset: {e}")
#     ds = None

# if ds is not None:
#     def map_to_pred(batch):
#         inputs = processor(batch["audio"]["array"],
#                           sampling_rate=batch["audio"]["sampling_rate"],
#                           return_tensors="pt")
#         input_feat = inputs.input_features.to(device)
        
#         with torch.no_grad():
#             pred_ids = model.generate(input_feat,
#                                      max_length=448,
#                                      num_beams=5,
#                                      early_stopping=True)
        
#         batch["prediction"] = processor.batch_decode(pred_ids, skip_special_tokens=True)[0]
#         batch["reference"] = batch["transcription"]
#         return batch
    
#     print(f"Transcribing {LANG_NAME} test samples...")
#     pred_ds = ds.map(map_to_pred, remove_columns=ds.column_names, batched=False)
    
#     preds = pred_ds["prediction"]
#     refs = pred_ds["reference"]
    
#     # Compute metrics
#     wer_score = wer_metric.compute(predictions=preds, references=refs)
#     cer_score = cer_metric.compute(predictions=preds, references=refs)
    
#     print(f"\n{LANG_NAME} Results:")
#     print(f"Samples: {len(preds)}")
#     print(f"WER: {wer_score:.4f}")
#     print(f"CER: {cer_score:.4f}")
    
#     # Show examples
#     print(f"\n{LANG_NAME} Sample Predictions:")
#     examples_df = pd.DataFrame({
#         "Reference": [ref[:60] + "..." if len(ref) > 60 else ref for ref in refs[:3]],
#         "Prediction": [pred[:60] + "..." if len(pred) > 60 else pred for pred in preds[:3]]
#     })
#     print(examples_df.to_string(index=False))

=== HINDI LANGUAGE EVALUATION ===


Downloading builder script: 0.00B [00:00, ?B/s]

Downloading builder script: 0.00B [00:00, ?B/s]

Using base model: openai/whisper-medium


preprocessor_config.json: 0.00B [00:00, ?B/s]

tokenizer_config.json: 0.00B [00:00, ?B/s]

vocab.json: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

merges.txt: 0.00B [00:00, ?B/s]

normalizer.json: 0.00B [00:00, ?B/s]

added_tokens.json: 0.00B [00:00, ?B/s]

special_tokens_map.json: 0.00B [00:00, ?B/s]

config.json: 0.00B [00:00, ?B/s]

model.safetensors:   0%|          | 0.00/3.06G [00:00<?, ?B/s]

generation_config.json: 0.00B [00:00, ?B/s]

Model loaded: Base Whisper Medium


README.md: 0.00B [00:00, ?B/s]

fleurs.py: 0.00B [00:00, ?B/s]

data/hi_in/audio/train.tar.gz:   0%|          | 0.00/1.26G [00:00<?, ?B/s]

data/hi_in/audio/dev.tar.gz:   0%|          | 0.00/132M [00:00<?, ?B/s]

data/hi_in/audio/test.tar.gz:   0%|          | 0.00/249M [00:00<?, ?B/s]

KeyboardInterrupt: 

In [None]:
# # English Language Evaluation - CER & WER
# import evaluate
# from datasets import load_dataset
# import pandas as pd
# import numpy as np

# print("=== ENGLISH LANGUAGE EVALUATION ===")

# # Configuration
# LANG_CODE = "en_us"
# LANG_NAME = "English"

# # Load English dataset
# try:
#     ds = load_dataset("google/fleurs", LANG_CODE, split=TEST_SPLIT, trust_remote_code=True)
#     if MAX_SAMPLES:
#         ds = ds.select(range(min(len(ds), MAX_SAMPLES)))
#     print(f"Loaded {len(ds)} samples for {LANG_NAME}")
# except Exception as e:
#     print(f"Failed to load {LANG_NAME} dataset: {e}")
#     ds = None

# if ds is not None:
#     def map_to_pred(batch):
#         inputs = processor(batch["audio"]["array"],
#                           sampling_rate=batch["audio"]["sampling_rate"],
#                           return_tensors="pt")
#         input_feat = inputs.input_features.to(device)
        
#         with torch.no_grad():
#             pred_ids = model.generate(input_feat,
#                                      max_length=448,
#                                      num_beams=5,
#                                      early_stopping=True)
        
#         batch["prediction"] = processor.batch_decode(pred_ids, skip_special_tokens=True)[0]
#         batch["reference"] = batch["transcription"]
#         return batch
    
#     print(f"Transcribing {LANG_NAME} test samples...")
#     pred_ds = ds.map(map_to_pred, remove_columns=ds.column_names, batched=False)
    
#     preds = pred_ds["prediction"]
#     refs = pred_ds["reference"]
    
#     # Compute metrics
#     wer_score = wer_metric.compute(predictions=preds, references=refs)
#     cer_score = cer_metric.compute(predictions=preds, references=refs)
    
#     print(f"\n{LANG_NAME} Results:")
#     print(f"Samples: {len(preds)}")
#     print(f"WER: {wer_score:.4f}")
#     print(f"CER: {cer_score:.4f}")
    
#     # Show examples
#     print(f"\n{LANG_NAME} Sample Predictions:")
#     examples_df = pd.DataFrame({
#         "Reference": [ref[:60] + "..." if len(ref) > 60 else ref for ref in refs[:3]],
#         "Prediction": [pred[:60] + "..." if len(pred) > 60 else pred for pred in preds[:3]]
#     })
#     print(examples_df.to_string(index=False))

In [None]:
# # Tamil Language Evaluation - CER & WER
# import evaluate
# from datasets import load_dataset
# import pandas as pd
# import numpy as np

# print("=== TAMIL LANGUAGE EVALUATION ===")

# # Configuration
# LANG_CODE = "ta_in"
# LANG_NAME = "Tamil"

# # Load Tamil dataset
# try:
#     ds = load_dataset("google/fleurs", LANG_CODE, split=TEST_SPLIT, trust_remote_code=True)
#     if MAX_SAMPLES:
#         ds = ds.select(range(min(len(ds), MAX_SAMPLES)))
#     print(f"Loaded {len(ds)} samples for {LANG_NAME}")
# except Exception as e:
#     print(f"Failed to load {LANG_NAME} dataset: {e}")
#     ds = None

# if ds is not None:
#     def map_to_pred(batch):
#         inputs = processor(batch["audio"]["array"],
#                           sampling_rate=batch["audio"]["sampling_rate"],
#                           return_tensors="pt")
#         input_feat = inputs.input_features.to(device)
        
#         with torch.no_grad():
#             pred_ids = model.generate(input_feat,
#                                      max_length=448,
#                                      num_beams=5,
#                                      early_stopping=True)
        
#         batch["prediction"] = processor.batch_decode(pred_ids, skip_special_tokens=True)[0]
#         batch["reference"] = batch["transcription"]
#         return batch
    
#     print(f"Transcribing {LANG_NAME} test samples...")
#     pred_ds = ds.map(map_to_pred, remove_columns=ds.column_names, batched=False)
    
#     preds = pred_ds["prediction"]
#     refs = pred_ds["reference"]
    
#     # Compute metrics
#     wer_score = wer_metric.compute(predictions=preds, references=refs)
#     cer_score = cer_metric.compute(predictions=preds, references=refs)
    
#     print(f"\n{LANG_NAME} Results:")
#     print(f"Samples: {len(preds)}")
#     print(f"WER: {wer_score:.4f}")
#     print(f"CER: {cer_score:.4f}")
    
#     # Show examples
#     print(f"\n{LANG_NAME} Sample Predictions:")
#     examples_df = pd.DataFrame({
#         "Reference": [ref[:60] + "..." if len(ref) > 60 else ref for ref in refs[:3]],
#         "Prediction": [pred[:60] + "..." if len(pred) > 60 else pred for pred in preds[:3]]
#     })
#     print(examples_df.to_string(index=False))