In [18]:
import asyncio
import json
import logging
import time
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple, Union
from dataclasses import dataclass, field
from enum import Enum
import hashlib
import numpy as np
import pandas as pd
from collections import Counter, defaultdict
import pickle
import os
import sys
from pathlib import Path

# Core ML and NLP libraries
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score
from sklearn.preprocessing import LabelEncoder
from imblearn.over_sampling import SMOTE
import spacy
from langdetect import detect, detect_langs

from transformers import (
    AutoTokenizer, AutoModel, AutoModelForSequenceClassification,
    get_linear_schedule_with_warmup, pipeline
)
from torch.optim import AdamW

# Infrastructure and monitoring
import redis
import boto3
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import pydantic
from pydantic import BaseModel, Field
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
import uvicorn

# Async and concurrency
import aiohttp
from redis.asyncio import Redis
from concurrent.futures import ThreadPoolExecutor
from threading import Lock

# Specialized libraries
import plotly.graph_objects as go
import plotly.express as px
from textblob import TextBlob
import openai  # For GPT integration
import anthropic  # For Claude integration

# Database
import sqlite3
import pymongo
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, Boolean, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

In [22]:
# CONFIGURATION & DATA MODELS
# Database Models
from sqlalchemy.orm import declarative_base  # Updated import
class SentimentLabel(Enum):
    NEGATIVE = 0
    NEUTRAL = 1  
    POSITIVE = 2

@dataclass
class PipelineConfig:
    """Central configuration for the entire pipeline"""
    # Model Configuration
    foundation_model: str = "roberta-large"
    custom_model_path: str = "models/amazon-food-sentiment-v2"
    embedding_model: str = "sentence-transformers/all-MiniLM-L6-v2"
    
    # Processing Configuration
    max_sequence_length: int = 512
    batch_size: int = 64
    num_workers: int = 8
    
    # Business Configuration
    confidence_threshold: float = 0.85
    human_review_threshold: float = 0.7
    priority_categories: List[str] = field(default_factory=lambda: ["organic", "baby food", "dietary supplements"])
    
    # Infrastructure Configuration
    redis_url: str = "redis://localhost:6379"
    model_registry_url: str = "s3://amazon-ml-models/food-sentiment/"
    feature_store_url: str = "s3://amazon-feature-store/"
    db_url: str = "sqlite:///sentiment_pipeline.db"
    
    # Monitoring Configuration
    metrics_port: int = 8000
    alert_thresholds: Dict[str, float] = field(default_factory=lambda: {
        "accuracy_drop": 0.05,
        "latency_p95": 200,  # milliseconds
        "error_rate": 0.01
    })

class ReviewInput(BaseModel):
    """Input schema for review processing"""
    review_id: str
    user_id: str
    product_id: str
    text: str
    rating: int = Field(ge=1, le=5)
    product_category: Optional[str] = None
    reviewer_history: Optional[Dict] = None
    timestamp: datetime = Field(default_factory=datetime.utcnow)
    language: Optional[str] = None
    is_verified_purchase: bool = True

class SentimentOutput(BaseModel):
    """Output schema for sentiment analysis"""
    review_id: str
    sentiment: str
    confidence: float
    sentiment_scores: Dict[str, float]
    business_impact_score: float
    requires_human_review: bool
    key_phrases: List[str]
    product_specific_insights: Dict[str, Union[str, float]]
    processing_metadata: Dict[str, Union[str, float, int]]

# Database Models
Base = declarative_base()

class ReviewRecord(Base):
    __tablename__ = "reviews"
    
    id = Column(Integer, primary_key=True)
    review_id = Column(String, unique=True, index=True)
    user_id = Column(String, index=True)
    product_id = Column(String, index=True)
    text = Column(Text)
    rating = Column(Integer)
    product_category = Column(String)
    timestamp = Column(DateTime)
    is_verified_purchase = Column(Boolean)
    
class SentimentRecord(Base):
    __tablename__ = "sentiment_results"
    
    id = Column(Integer, primary_key=True)
    review_id = Column(String, index=True)
    sentiment = Column(String)
    confidence = Column(Float)
    positive_score = Column(Float)
    neutral_score = Column(Float)
    negative_score = Column(Float)
    business_impact_score = Column(Float)
    requires_human_review = Column(Boolean)
    processing_time = Column(Float)
    model_version = Column(String)
    created_at = Column(DateTime, default=datetime.utcnow)

In [23]:
# DATA QUALITY & PREPROCESSING
# ================================

class AdvancedDataQualityEngine:
    """World-class data quality assessment and cleaning"""
    
    def __init__(self, config: PipelineConfig):
        self.config = config
        self.nlp = spacy.load("en_core_web_sm")
        self.quality_model = self._load_quality_model()
        self.fake_review_detector = self._load_fake_detector()
        self.redis_client = redis.from_url(config.redis_url)
        
    def assess_review_quality(self, review: ReviewInput) -> Dict[str, float]:
        """Comprehensive quality assessment"""
        text = review.text
        
        # Basic quality metrics
        word_count = len(text.split())
        char_count = len(text)
        sentence_count = len(list(self.nlp(text).sents))
        
        # Advanced quality metrics
        quality_scores = {
            'length_score': self._calculate_length_score(word_count),
            'coherence_score': self._calculate_coherence_score(text),
            'informativeness_score': self._calculate_informativeness_score(text),
            'authenticity_score': self._calculate_authenticity_score(review),
            'sentiment_rating_consistency': self._check_sentiment_rating_consistency(text, review.rating),
            'grammar_score': self._calculate_grammar_score(text),
            'spam_score': self._calculate_spam_score(text),
            'duplicate_likelihood': self._calculate_duplicate_likelihood(text, review.user_id)
        }
        
        # Overall quality score
        quality_scores['overall_quality'] = np.mean([
            quality_scores['length_score'],
            quality_scores['coherence_score'], 
            quality_scores['informativeness_score'],
            quality_scores['authenticity_score'],
            quality_scores['sentiment_rating_consistency'],
            quality_scores['grammar_score'],
            (1 - quality_scores['spam_score']),  # Invert spam score
            (1 - quality_scores['duplicate_likelihood'])  # Invert duplicate score
        ])
        
        return quality_scores
    
    def _calculate_length_score(self, word_count: int) -> float:
        """Optimal review length: 20-200 words"""
        if word_count < 5:
            return 0.1
        elif word_count < 20:
            return word_count / 20 * 0.7
        elif word_count <= 200:
            return 1.0
        else:
            return max(0.5, 1.0 - (word_count - 200) / 300)
    
    def _calculate_coherence_score(self, text: str) -> float:
        """Measure text coherence using sentence embeddings"""
        doc = self.nlp(text)
        sentences = [sent.text for sent in doc.sents if len(sent.text.strip()) > 10]
        
        if len(sentences) < 2:
            return 0.8  # Single sentence, assume coherent
        
        # Calculate sentence similarity (simplified)
        coherence_scores = []
        for i in range(len(sentences) - 1):
            sent1 = TextBlob(sentences[i])
            sent2 = TextBlob(sentences[i + 1])
            # Simplified coherence using common words
            common_words = set(sent1.words) & set(sent2.words)
            coherence = len(common_words) / max(len(sent1.words), len(sent2.words))
            coherence_scores.append(coherence)
        
        return np.mean(coherence_scores) if coherence_scores else 0.5
    
    def _calculate_informativeness_score(self, text: str) -> float:
        """Measure how informative the review is"""
        doc = self.nlp(text)
        
        # Look for informative elements
        informative_elements = {
            'entities': len(doc.ents),
            'product_mentions': len([token for token in doc if token.pos_ == "NOUN"]),
            'descriptive_adjectives': len([token for token in doc if token.pos_ == "ADJ"]),
            'comparison_words': len([token for token in doc if token.lemma_ in 
                                   ["better", "worse", "best", "worst", "compare", "than"]])
        }
        
        # Normalize scores
        total_tokens = len(doc)
        if total_tokens == 0:
            return 0.0
            
        informativeness = (
            informative_elements['entities'] / total_tokens * 2 +
            informative_elements['product_mentions'] / total_tokens * 1.5 +
            informative_elements['descriptive_adjectives'] / total_tokens * 1.2 +
            informative_elements['comparison_words'] / total_tokens * 2
        )
        
        return min(1.0, informativeness * 5)  # Scale to 0-1
    
    def _calculate_authenticity_score(self, review: ReviewInput) -> float:
        """Detect potentially fake reviews"""
        text = review.text
        
        # Red flags for fake reviews
        red_flags = 0
        
        # Generic language patterns
        generic_phrases = [
            "highly recommend", "amazing product", "love this", "perfect",
            "exactly what I needed", "fast shipping", "great value"
        ]
        generic_count = sum(1 for phrase in generic_phrases if phrase.lower() in text.lower())
        if generic_count > 3:
            red_flags += 0.3
        
        # Excessive punctuation or caps
        caps_ratio = sum(1 for c in text if c.isupper()) / len(text) if text else 0
        if caps_ratio > 0.3:
            red_flags += 0.2
            
        # Very short or very positive reviews for low ratings (suspicious)
        if review.rating <= 2 and len(text.split()) < 10:
            red_flags += 0.4
            
        # Check for verified purchase
        if not review.is_verified_purchase:
            red_flags += 0.3
        
        return max(0.0, 1.0 - red_flags)
    
    def _check_sentiment_rating_consistency(self, text: str, rating: int) -> float:
        """Check if sentiment matches the rating"""
        # Simple sentiment analysis for consistency check
        blob = TextBlob(text)
        sentiment_polarity = blob.sentiment.polarity  # -1 to 1
        
        # Convert rating to expected sentiment
        if rating <= 2:
            expected_sentiment = -0.5  # Negative
        elif rating == 3:
            expected_sentiment = 0.0   # Neutral
        else:
            expected_sentiment = 0.5   # Positive
        
        # Calculate consistency
        sentiment_diff = abs(sentiment_polarity - expected_sentiment)
        consistency = max(0.0, 1.0 - sentiment_diff)
        
        return consistency
    
    def _calculate_grammar_score(self, text: str) -> float:
        """Simplified grammar scoring"""
        doc = self.nlp(text)
        
        # Count grammatical elements
        total_tokens = len(doc)
        if total_tokens == 0:
            return 0.0
            
        proper_sentences = len([sent for sent in doc.sents if len(sent) > 2])
        total_sentences = len(list(doc.sents))
        
        if total_sentences == 0:
            return 0.5
            
        grammar_score = proper_sentences / total_sentences
        return grammar_score
    
    def _calculate_spam_score(self, text: str) -> float:
        """Detect spam patterns"""
        spam_indicators = [
            "click here", "visit our", "website", "discount", "sale",
            "buy now", "limited time", "offer", "deal", "promo"
        ]
        
        spam_count = sum(1 for indicator in spam_indicators if indicator.lower() in text.lower())
        spam_score = min(1.0, spam_count / 3)  # Normalize
        
        return spam_score
    
    def _calculate_duplicate_likelihood(self, text: str, user_id: str) -> float:
        """Detect potential duplicate reviews"""
        # Create text hash for similarity checking
        text_hash = hashlib.md5(text.lower().strip().encode()).hexdigest()
        
        # Check Redis cache for similar reviews from same user
        cache_key = f"user_reviews:{user_id}"
        try:
            user_hashes = self.redis_client.smembers(cache_key)
            if text_hash.encode() in user_hashes:
                return 0.9  # High duplicate likelihood
            
            # Add current hash to cache (expire after 30 days)
            self.redis_client.sadd(cache_key, text_hash)
            self.redis_client.expire(cache_key, 30 * 24 * 3600)
            
        except Exception as e:
            logging.warning(f"Redis error in duplicate detection: {e}")
        
        return 0.1  # Low duplicate likelihood
    
    def _load_quality_model(self):
        """Load pre-trained quality assessment model"""
        try:
            # In production, this would load from model registry
            model_path = Path("models/quality_model.pkl")
            if model_path.exists():
                with open(model_path, 'rb') as f:
                    return pickle.load(f)
        except Exception as e:
            logging.warning(f"Failed to load quality model: {e}")
        return None
    
    def _load_fake_detector(self):
        """Load fake review detection model"""
        try:
            model_path = Path("models/fake_detector.pkl")
            if model_path.exists():
                with open(model_path, 'rb') as f:
                    return pickle.load(f)
        except Exception as e:
            logging.warning(f"Failed to load fake detector: {e}")
        return None

class IntelligentPreprocessor:
    """Advanced preprocessing with context awareness"""
    
    def __init__(self, config: PipelineConfig):
        self.config = config
        self.nlp = spacy.load("en_core_web_sm")
        self.language_models = self._load_language_models()
        
    def preprocess_review(self, review: ReviewInput) -> Dict[str, str]:
        """Comprehensive preprocessing pipeline"""
        text = review.text
        
        # Language detection
        language = self._detect_language(text)
        
        # Clean and normalize text
        cleaned_text = self._clean_text(text)
        
        # Product-specific preprocessing
        product_enhanced_text = self._enhance_with_product_context(
            cleaned_text, review.product_category
        )
        
        # Generate multiple representations
        return {
            'original': text,
            'cleaned': cleaned_text,
            'enhanced': product_enhanced_text,
            'language': language,
            'normalized': self._normalize_text(cleaned_text),
            'tokenized': self._smart_tokenize(product_enhanced_text)
        }
    
    def _detect_language(self, text: str) -> str:
        """Robust language detection"""
        try:
            lang_probs = detect_langs(text)
            if lang_probs[0].prob > 0.8:
                return lang_probs[0].lang
        except:
            pass
        return 'en'  # Default to English
    
    def _clean_text(self, text: str) -> str:
        """Advanced text cleaning"""
        import re
        
        # Remove excessive whitespace
        text = re.sub(r'\s+', ' ', text)
        
        # Fix common issues
        text = text.replace('&amp;', '&')
        text = text.replace('&lt;', '<')
        text = text.replace('&gt;', '>')
        
        # Remove excessive punctuation while preserving meaning
        text = re.sub(r'[!]{2,}', '!', text)
        text = re.sub(r'[?]{2,}', '?', text)
        text = re.sub(r'[.]{3,}', '...', text)
        
        return text.strip()
    
    def _enhance_with_product_context(self, text: str, product_category: Optional[str]) -> str:
        """Add product-specific context"""
        if not product_category:
            return text
            
        # Add category-specific context (this would be more sophisticated in production)
        category_contexts = {
            'organic': 'organic natural healthy',
            'baby food': 'safe nutrition infant toddler',
            'snacks': 'taste crunch flavor',
            'beverages': 'taste refreshing drink'
        }
        
        context = category_contexts.get(product_category.lower(), '')
        if context:
            return f"{text} [CONTEXT: {context}]"
        
        return text
    
    def _normalize_text(self, text: str) -> str:
        """Normalize text for consistency"""
        # Convert to lowercase
        text = text.lower()
        
        # Expand contractions
        contractions = {
            "won't": "will not",
            "can't": "cannot",
            "n't": " not",
            "'re": " are",
            "'ve": " have",
            "'ll": " will",
            "'d": " would",
            "'m": " am"
        }
        
        for contraction, expansion in contractions.items():
            text = text.replace(contraction, expansion)
        
        return text
    
    def _smart_tokenize(self, text: str) -> str:
        """Intelligent tokenization preserving important patterns"""
        doc = self.nlp(text)
        
        # Preserve important phrases and entities
        tokens = []
        for token in doc:
            if token.ent_type_:  # Keep entities together
                tokens.append(token.text)
            elif token.pos_ in ['NOUN', 'ADJ', 'VERB', 'ADV']:  # Keep content words
                tokens.append(token.lemma_)
            else:
                tokens.append(token.text)
        
        return ' '.join(tokens)
    
    def _load_language_models(self):
        """Load language-specific models"""
        return {
            'en': 'en_core_web_sm',
            'es': 'es_core_news_sm',
            # Add more languages as needed
        }


In [24]:
# ADVANCED SENTIMENT MODELS
# ================================

class HybridSentimentModel(nn.Module):
    """State-of-the-art hybrid sentiment model for food reviews"""
    
    def __init__(self, config: PipelineConfig):
        super().__init__()
        self.config = config
        
        # Foundation model backbone
        self.transformer = AutoModel.from_pretrained(config.foundation_model)
        hidden_size = self.transformer.config.hidden_size
        
        # Multi-head attention for aspect-based sentiment
        self.aspect_attention = nn.MultiheadAttention(
            embed_dim=hidden_size,
            num_heads=8,
            dropout=0.1
        )
        
        # Food-specific aspect extractors
        self.taste_extractor = nn.Linear(hidden_size, hidden_size // 2)
        self.quality_extractor = nn.Linear(hidden_size, hidden_size // 2)
        self.value_extractor = nn.Linear(hidden_size, hidden_size // 2)
        self.packaging_extractor = nn.Linear(hidden_size, hidden_size // 2)
        
        # Hierarchical classification
        self.dropout = nn.Dropout(0.3)
        self.classifier = nn.Sequential(
            nn.Linear(hidden_size + hidden_size // 2 * 4, hidden_size),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(hidden_size, hidden_size // 2),
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Linear(hidden_size // 2, 3)  # Negative, Neutral, Positive
        )
        
        # Confidence estimation
        self.confidence_estimator = nn.Sequential(
            nn.Linear(hidden_size, hidden_size // 4),
            nn.ReLU(),
            nn.Linear(hidden_size // 4, 1),
            nn.Sigmoid()
        )
        
        # Business impact predictor
        self.business_impact_predictor = nn.Sequential(
            nn.Linear(hidden_size + 3, hidden_size // 2),  # +3 for sentiment logits
            nn.ReLU(),
            nn.Linear(hidden_size // 2, 1),
            nn.Sigmoid()
        )
    
    def forward(self, input_ids, attention_mask, return_aspects=False, return_confidence=True):
        # Get transformer outputs
        outputs = self.transformer(input_ids=input_ids, attention_mask=attention_mask)
        sequence_output = outputs.last_hidden_state
        pooled_output = outputs.pooler_output
        
        # Apply aspect attention
        attn_output, attn_weights = self.aspect_attention(
            sequence_output.transpose(0, 1),
            sequence_output.transpose(0, 1), 
            sequence_output.transpose(0, 1)
        )
        
        # Extract food-specific aspects
        taste_features = torch.relu(self.taste_extractor(pooled_output))
        quality_features = torch.relu(self.quality_extractor(pooled_output))
        value_features = torch.relu(self.value_extractor(pooled_output))
        packaging_features = torch.relu(self.packaging_extractor(pooled_output))
        
        # Combine all features
        combined_features = torch.cat([
            pooled_output,
            taste_features,
            quality_features,
            value_features,
            packaging_features
        ], dim=1)
        
        combined_features = self.dropout(combined_features)
        
        # Main sentiment classification
        sentiment_logits = self.classifier(combined_features)
        
        results = {'logits': sentiment_logits}
        
        if return_confidence:
            confidence = self.confidence_estimator(pooled_output)
            results['confidence'] = confidence
        
        if return_aspects:
            results['aspects'] = {
                'taste': taste_features,
                'quality': quality_features,
                'value': value_features,
                'packaging': packaging_features,
                'attention_weights': attn_weights
            }
        
        # Business impact prediction
        business_input = torch.cat([pooled_output, sentiment_logits], dim=1)
        business_impact = self.business_impact_predictor(business_input)
        results['business_impact'] = business_impact
        
        return results

class EnsembleOrchestrator:
    """Orchestrate multiple models for optimal performance"""
    
    def __init__(self, config: PipelineConfig):
        self.config = config
        self.models = self._initialize_models()
        self.model_weights = self._calculate_optimal_weights()
        self.tokenizer = AutoTokenizer.from_pretrained(config.foundation_model)
        
    def _initialize_models(self):
        """Initialize ensemble of specialized models"""
        models = {}
        
        try:
            # Primary hybrid model
            models['primary'] = HybridSentimentModel(self.config)
            models['primary'].load_state_dict(
                torch.load(f"{self.config.custom_model_path}/primary_model.pt", 
                          map_location='cpu')
            )
            models['primary'].eval()
            
            # Specialized models for different aspects
            models['roberta_large'] = self._load_pretrained_roberta()
            models['food_specialist'] = self._load_food_specialist_model()
            models['bert_uncased'] = self._load_bert_model()
            
        except Exception as e:
            logging.error(f"Error initializing models: {e}")
            # Fallback to basic models
            models['fallback'] = pipeline(
                "sentiment-analysis",
                model="cardiffnlp/twitter-roberta-base-sentiment-latest",
                tokenizer="cardiffnlp/twitter-roberta-base-sentiment-latest"
            )
        
        return models
    
    async def predict_ensemble(self, input_data: str, return_individual=False):
        """Ensemble prediction with intelligent weighting"""
        predictions = {}
        confidences = {}
        
        # Tokenize input
        encoded = self.tokenizer(
            input_data,
            truncation=True,
            padding=True,
            max_length=self.config.max_sequence_length,
            return_tensors='pt'
        )
        
        # Get predictions from all models
        for model_name, model in self.models.items():
            try:
                result = await self._predict_single_model(model, encoded, input_data)
                predictions[model_name] = result['prediction']
                confidences[model_name] = result['confidence']
            except Exception as e:
                logging.warning(f"Model {model_name} failed: {e}")
                predictions[model_name] = None
                confidences[model_name] = 0.0
        
        # Intelligent ensemble combination
        final_prediction = self._combine_predictions(predictions, confidences)
        
        if return_individual:
            return {
                'ensemble_prediction': final_prediction,
                'individual_predictions': predictions,
                'individual_confidences': confidences
            }
        
        return final_prediction
    
    def _combine_predictions(self, predictions, confidences):
        """Intelligently combine predictions based on confidence and model performance"""
        valid_predictions = {k: v for k, v in predictions.items() if v is not None}
        
        if not valid_predictions:
            return {'sentiment': 'neutral', 'confidence': 0.0, 'scores': {'neutral': 1.0}}
        
        # Weight by confidence and model performance
        weighted_scores = defaultdict(float)
        total_weight = 0
        
        for model_name, prediction in valid_predictions.items():
            model_weight = self.model_weights.get(model_name, 0.1)
            confidence_weight = confidences.get(model_name, 0.1)
            
            final_weight = model_weight * confidence_weight
            
            for sentiment, score in prediction.get('scores', {}).items():
                weighted_scores[sentiment] += score * final_weight
            
            total_weight += final_weight
        
        # Normalize scores
        if total_weight > 0:
            normalized_scores = {
                sentiment: score / total_weight 
                for sentiment, score in weighted_scores.items()
            }
        else:
            normalized_scores = {'neutral': 1.0}
        
        # Get final prediction
        final_sentiment = max(normalized_scores, key=normalized_scores.get)
        final_confidence = normalized_scores[final_sentiment]
        
        return {
            'sentiment': final_sentiment,
            'confidence': final_confidence,
            'scores': normalized_scores
        }
    
    def _calculate_optimal_weights(self):
        """Calculate optimal weights for ensemble (would be learned from validation data)"""
        return {
            'primary': 0.4,
            'roberta_large': 0.25,
            'food_specialist': 0.25,
            'bert_uncased': 0.1,
            'fallback': 0.3
        }
    
    def _load_pretrained_roberta(self):
        """Load pre-trained RoBERTa model"""
        try:
            return pipeline(
                "sentiment-analysis",
                model="cardiffnlp/twitter-roberta-base-sentiment-latest",
                tokenizer="cardiffnlp/twitter-roberta-base-sentiment-latest"
            )
        except Exception as e:
            logging.warning(f"Failed to load RoBERTa: {e}")
            return None
    
    def _load_food_specialist_model(self):
        """Load food-domain specialist model"""
        try:
            # This would be a custom trained model for food reviews
            model_path = f"{self.config.custom_model_path}/food_specialist.pt"
            if os.path.exists(model_path):
                return torch.load(model_path, map_location='cpu')
        except Exception as e:
            logging.warning(f"Failed to load food specialist: {e}")
        return None
    
    def _load_bert_model(self):
        """Load BERT model"""
        try:
            return pipeline(
                "sentiment-analysis",
                model="nlptown/bert-base-multilingual-uncased-sentiment"
            )
        except Exception as e:
            logging.warning(f"Failed to load BERT: {e}")
            return None
    
    async def _predict_single_model(self, model, encoded_input, raw_text):
        """Predict using a single model"""
        try:
            if hasattr(model, 'forward'):  # PyTorch model
                with torch.no_grad():
                    outputs = model(**encoded_input)
                    logits = outputs['logits']
                    probabilities = torch.softmax(logits, dim=-1)
                    
                    # Map to sentiment labels
                    sentiment_mapping = {0: 'negative', 1: 'neutral', 2: 'positive'}
                    predicted_class = torch.argmax(probabilities, dim=-1).item()
                    confidence = probabilities.max().item()
                    
                    scores = {
                        'negative': probabilities[0][0].item(),
                        'neutral': probabilities[0][1].item(),
                        'positive': probabilities[0][2].item()
                    }
                    
                    return {
                        'prediction': {
                            'sentiment': sentiment_mapping[predicted_class],
                            'scores': scores
                        },
                        'confidence': confidence
                    }
                    
            elif hasattr(model, '__call__'):  # HuggingFace pipeline
                result = model(raw_text)
                if isinstance(result, list):
                    result = result[0]
                
                # Normalize labels
                label_mapping = {
                    'NEGATIVE': 'negative',
                    'NEUTRAL': 'neutral', 
                    'POSITIVE': 'positive',
                    'LABEL_0': 'negative',
                    'LABEL_1': 'neutral',
                    'LABEL_2': 'positive'
                }
                
                sentiment = label_mapping.get(result['label'], 'neutral')
                confidence = result['score']
                
                # Create score distribution (simplified)
                scores = {'negative': 0.0, 'neutral': 0.0, 'positive': 0.0}
                scores[sentiment] = confidence
                # Distribute remaining probability
                remaining = (1.0 - confidence) / 2
                for key in scores:
                    if scores[key] == 0.0:
                        scores[key] = remaining
                
                return {
                    'prediction': {
                        'sentiment': sentiment,
                        'scores': scores
                    },
                    'confidence': confidence
                }
                
        except Exception as e:
            logging.error(f"Model prediction error: {e}")
            
        # Fallback prediction
        return {
            'prediction': {
                'sentiment': 'neutral',
                'scores': {'negative': 0.33, 'neutral': 0.34, 'positive': 0.33}
            },
            'confidence': 0.1
        }
