# FiQA Sentiment Analysis

In this project, we aim to analyze the sentiment of news headlines related to the financial field, utilizing cutting-edge NLP techniques.

## Importing Data

In [1]:
import pandas as pd

In [2]:
import os # Import os module

def load_fiqa_data(data_dir):
    """Loads headline and post data from the FiQA dataset."""
    headline_file = os.path.join(data_dir, "task1_headline_ABSA_train.json")
    post_file = os.path.join(data_dir, "task1_post_ABSA_train.json")

    headline_df = pd.read_json(headline_file)
    post_df = pd.read_json(post_file)
    return headline_df, post_df

# Example usage (assuming your data is in a 'data' subdirectory):
headline_df, post_df = load_fiqa_data("data/FiQA_ABSA_task1")

headline_df.head(5)
post_df.head(5)

In [3]:
import json
import ast
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from wordcloud import WordCloud
from matplotlib import rcParams
import scipy.sparse # Import scipy.sparse

In [4]:
# Configuration
rcParams.update({'figure.autolayout': True})
# plt.style.use('seaborn-whitegrid')

In [5]:
class FiqaDataLoading:
    
    def __init__(self, filepath):
        self.filepath = filepath
        self.df = None
    
    def load_and_preprocess(self):
        """Load JSON data and convert to structured DataFrame"""
        with open(self.filepath, encoding='utf-8') as f:
            raw_data = json.load(f)

        records = []
        for entry_id, content in raw_data.items():
            for info in content['info']:
                record = {
                    'id': entry_id,
                    'sentence': content['sentence'],
                    'snippets': info['snippets'],
                    'target': info['target'],
                    'sentiment_score': float(info['sentiment_score']),
                    'aspects': self.safe_literal_eval(info['aspects'])
                }
                records.append(record)

        self.df = pd.DataFrame(records)
        self._enhance_data()

        return self.df
    
    def safe_literal_eval(self, x):
        try:
            if isinstance(x, str):
                return ast.literal_eval(x)
            else:
                return []
        except (SyntaxError, ValueError):
            if isinstance(x, str):
                return [x]
            else:
                return []

    def _enhance_data(self):
        """Create additional features and clean data"""

        # Sentiment classification
        bins = [-1, -0.33, 0.33, 1]
        labels = ['negative', 'neutral', 'positive']
        self.df['sentiment_class'] = pd.cut(self.df['sentiment_score'], bins=bins, labels=labels, right=True, include_lowest=True)

        # Aspect hierarchy processing
        def extract_aspect(aspects, level):
            try:
                if aspects and isinstance(aspects[0], str):
                    return aspects[0].split('/')[level] if len(aspects[0].split('/')) > level else None
                else:
                    return None
            except (IndexError, TypeError):
                return None

        self.df['primary_aspect'] = self.df['aspects'].apply(lambda x: extract_aspect(x, 0))
        self.df['secondary_aspect'] = self.df['aspects'].apply(lambda x: extract_aspect(x, 1))

        self.df['snippet_text'] = self.df['snippets'].apply(lambda x: ' '.join(self.safe_literal_eval(x)))

In [6]:
class FiqaEDA:

    def __init__(self, df):
        self.df = df

    def _validate_data(self):
        """Data quality checks"""
        print("=== Data Validation Report ===")
        print(f"Total entries: {len(self.df)}")
        print("\nMissing values:")
        print(self.df.isnull().sum())

        # Check for invalid sentiment scores (outside -1 to 1 range)
        invalid_scores = self.df[(self.df['sentiment_score'] < -1) | (self.df['sentiment_score'] > 1)]
        print(f"\nInvalid sentiment scores (outside -1 to 1): {len(invalid_scores)}")

        # Check for empty snippets or sentences
        print(f"\nEmpty snippets: {len(self.df[self.df['snippet_text'] == ''])}")
        print(f"Empty sentences: {len(self.df[self.df['sentence'] == ''])}")


    def analyze_sentiment_distribution(self):
        """Generate sentiment visualizations"""
        plt.figure(figsize=(12, 5))
        
        plt.subplot(1, 2, 1)
        sns.histplot(self.df['sentiment_score'], bins=20, kde=True)
        plt.title('Sentiment Score Distribution')
        
        plt.subplot(1, 2, 2)
        self.df['sentiment_class'].value_counts().plot(kind='bar')
        plt.title('Sentiment Class Distribution')
        
        plt.tight_layout()
        plt.show()


    def analyze_aspects(self):
        """Aspect category analysis"""
        fig, ax = plt.subplots(1, 2, figsize=(14, 6))
        
        # Primary aspects
        primary_counts = self.df['primary_aspect'].value_counts()
        sns.barplot(y=primary_counts.index, x=primary_counts.values, ax=ax[0])
        ax[0].set_title('Primary Aspect Distribution')
        
        # Secondary aspects (top 15)
        secondary_counts = self.df['secondary_aspect'].value_counts().head(15)
        sns.barplot(y=secondary_counts.index, x=secondary_counts.values, ax=ax[1])
        ax[1].set_title('Top 15 Secondary Aspects')
        
        plt.tight_layout()
        plt.show()


    def generate_word_clouds(self):
        """Generate sentiment-specific word clouds with stopword removal"""
        fig, ax = plt.subplots(1, 3, figsize=(18, 6))
        stopwords = set(["a", "as", "and", "by", "on", "of", "to", "the", "in", "for", "with", "Â"])

        for i, sentiment in enumerate(['positive', 'neutral', 'negative']):
            text = ' '.join(self.df[self.df['sentiment_class'] == sentiment]['snippet_text'])
            wc = WordCloud(width=1200, height=800, 
                          background_color='white', stopwords=stopwords,
                          colormap='viridis' if sentiment == 'neutral' else
                          'Greens' if sentiment == 'positive' else 'Reds',
                          max_words=200).generate(text)

            ax[i].imshow(wc)
            ax[i].set_title(f'{sentiment.capitalize()} Sentiment Terms')
            ax[i].axis('off')

        plt.tight_layout()
        plt.show()


    def analyze_targets(self):
        """Company/organization analysis"""
        plt.figure(figsize=(10, 6))
        
        target_counts = self.df['target'].value_counts().head(10)
        sns.barplot(y=target_counts.index, x=target_counts.values)
        plt.title('Top 10 Frequently Mentioned Targets')
        plt.show()


    def run_full_analysis(self):
        """Execute complete EDA pipeline"""
        self._validate_data()
        
        print("\n=== Basic Statistics ===")
        print(self.df.describe(include='all'))
        
        self.analyze_sentiment_distribution()
        self.analyze_aspects()
        self.generate_word_clouds()
        self.analyze_targets()

In [7]:
loader = FiqaDataLoading("data/FiQA_ABSA_task1/task1_headline_ABSA_train.json")
df = loader.load_and_preprocess()

In [8]:
import re

import spacy

from sklearn.utils import resample
from bs4 import BeautifulSoup
from sklearn.base import BaseEstimator, TransformerMixin

from nltk.sentiment import SentimentIntensityAnalyzer
from spacy.matcher import Matcher

In [9]:
import nltk
nltk.download('vader_lexicon')

In [10]:
nlp = spacy.load("en_core_web_sm", disable=["parser", "ner"])
DEFAULT_STOPWORDS = nlp.Defaults.stop_words

In [11]:
class MissingValueHandler:
    """Handles all missing value operations"""
    
    def handle_missing_values(self, df):
        print("\n=== Handling Missing Values ===")
        original_rows = len(df)
        
        df = self._remove_critical_nulls(df)
        df = self._handle_aspect_nulls(df)
        # Removed snippet null handler as snippets are generated later
        
        print(f"Removed {original_rows - len(df)} rows with missing critical data")
        print(f"Final dataset shape: {df.shape}")
        return df
    
    def _remove_critical_nulls(self, df):
        return df.dropna(subset=['sentence', 'sentiment_score', 'target'])
    
    def _handle_aspect_nulls(self, df):
        df['aspects'] = df['aspects'].apply(lambda x: ['Unknown'] if len(x) == 0 else x)
        return df

In [12]:
class TextColumnProcessor:
    def __init__(self, text_cleaner):
        self.text_cleaner = text_cleaner
    
    def process_text_columns(self, df):
        print("\n=== Cleaning Text Data ===")
        df = df.copy()
        
        # Clean the main sentence while preserving structure
        df['clean_sentence'] = df['sentence'].apply(self.text_cleaner.clean_text)
        
        # Extract meaningful snippets
        df['snippets'] = df['clean_sentence'].apply(self._extract_snippets)
        
        print("Text cleaning complete")
        return df
    
    def _extract_snippets(self, text):
        """Extract meaningful snippets based on text patterns"""
        # Split on common business news patterns
        splits = re.split(r'(?:\.|;|:|!|\\?|\\n| - )', text) # Improved split regex\n,
        
        # Clean up splits
        snippets = []
        for snippet in splits:
            snippet = snippet.strip()
            if snippet and len(snippet.split()) >= 2:  # Minimum 2 words
                snippets.append(snippet)
        
        # Ensure we have at least one snippet
        if not snippets and text:
            snippets = [text]
        
        return snippets

In [13]:
class TextCleaner:
    """Handles all text cleaning operations"""
    
    def __init__(self, nlp_model, lemmatize=True, remove_stopwords=True):
        self.nlp_model = nlp_model
        self.lemmatize = lemmatize
        self.remove_stopwords = remove_stopwords

    def clean_text(self, text):
        text = self._remove_html(text)
        text = self._remove_special_chars(text)
        text = self._normalize_whitespace(text)
        return self._process_with_spacy(text.lower())

    def _remove_html(self, text):
        return BeautifulSoup(text, "html.parser").get_text()

    def _remove_special_chars(self, text):
        # Keeps periods, commas, and other meaningful punctuation
        return re.sub(r'[^\w\s.,!?-]', '', text)

    def _normalize_whitespace(self, text):
        return re.sub(r"\s+", " ", text).strip()

    def _process_with_spacy(self, text):
        doc = self.nlp_model(text)
        tokens = [
            token.lemma_.strip() if self.lemmatize else token.text
            for token in doc
            if not (self.remove_stopwords and token.is_stop) and token.text.strip() != '' # Added condition to filter empty strings
        ]
        return " ".join(tokens)


In [14]:
class DataValidator:
    """Handles data validation operations"""
    
    def __init__(self):
        self.sia = SentimentIntensityAnalyzer()

    def validate_sentiment_scores(self, df):
        self._check_score_range(df)
        return self._cross_validate_with_vader(df)

    def _check_score_range(self, df):
        invalid_scores = df[~df['sentiment_score'].between(-1, 1)]
        if not invalid_scores.empty:
            raise ValueError(f"Found {len(invalid_scores)} invalid sentiment scores")

    def _cross_validate_with_vader(self, df):
        df['score_discrepancy'] = df.apply(self._calculate_discrepancy, axis=1)
        self._report_discrepancies(df)
        return df.drop(columns=['score_discrepancy'])

    def _calculate_discrepancy(self, row):
        text = " ".join(row['snippets'])
        vader_score = self.sia.polarity_scores(text)['compound']
        return abs(row['sentiment_score'] - vader_score)
    
    def _report_discrepancies(self, df):
        # Consider adjusting this threshold if 0.5 is too strict
        high_discrepancy = df[df['score_discrepancy'] > 0.5]
        if not high_discrepancy.empty:
            print(f"Warning: {len(high_discrepancy)} entries with significant score discrepancy")
            print(high_discrepancy[['sentence', 'sentiment_score', 'score_discrepancy']].head())

In [15]:
class DataBalancer:
    """Handles class balancing operations"""
    
    def __init__(self, random_state=42):
        self.random_state = random_state

    def balance_classes(self, df):
        print("\n=== Balancing Classes ===")
        self._print_distribution(df, "Original")
        
        balanced_df = self._perform_multiclass_balancing(df)
        self._print_distribution(balanced_df, "Balanced")
        
        return balanced_df

    def _perform_multiclass_balancing(self, df):
        class_counts = df['sentiment_class'].value_counts()
        target_size = int(class_counts.median())  # Use median as target
        
        balanced_dfs = []
        for class_name in class_counts.index:
            class_df = df[df['sentiment_class'] == class_name]
            if len(class_df) > target_size:
                class_df = class_df.sample(n=target_size, random_state=self.random_state)
            elif len(class_df) < target_size:
                class_df = resample(
                    class_df,
                    replace=True,
                    n_samples=target_size,
                    random_state=self.random_state
                )
            balanced_dfs.append(class_df)
        
        return pd.concat(balanced_dfs)

    def _print_distribution(self, df, stage):
        print(f"\n{stage} Class Distribution:")
        print(df['sentiment_class'].value_counts())

In [16]:
class DataPreprocessor(BaseEstimator, TransformerMixin):
    """Orchestrates the entire preprocessing pipeline"""
    
    def __init__(self, 
                 text_clean=True,
                 lemmatize=True,
                 remove_stopwords=True,
                 handle_imbalance=False,
                 random_state=42):
        self.text_clean = text_clean
        self.lemmatize = lemmatize
        self.remove_stopwords = remove_stopwords
        self.handle_imbalance = handle_imbalance
        self.random_state = random_state
        
        # Initialize components
        self.nlp = spacy.load('en_core_web_sm')
        self.text_cleaner = TextCleaner(self.nlp, lemmatize, remove_stopwords)
        self.text_processor = TextColumnProcessor(self.text_cleaner)
        self.missing_handler = MissingValueHandler()
        self.validator = DataValidator()
        self.balancer = DataBalancer(random_state)

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        df = X.copy()
        df = self.missing_handler.handle_missing_values(df)
        
        if self.text_clean:
            df = self.text_processor.process_text_columns(df)
            
        df = self.validator.validate_sentiment_scores(df)
        
        if self.handle_imbalance:
            df = self.balancer.balance_classes(df)
            
        return df.reset_index(drop=True)

In [17]:
preprocessor = DataPreprocessor(
        text_clean=True,
        lemmatize=True,
        remove_stopwords=True,
        handle_imbalance=True
    )
    
cleaned_data = preprocessor.transform(df)

In [18]:
# Configuration
EMBEDDING_DIM = 300  # Set your desired embedding dimension
EMBEDDING_FILE = 'glove.6B.300d.txt'
MODEL_NAME = 'bert-base-uncased'

In [19]:
import os
from abc import ABC, abstractmethod
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from gensim.models import KeyedVectors
from gensim.scripts.glove2word2vec import glove2word2vec
from spacy.lang.en import English
from transformers import AutoTokenizer, AutoModel
import torch
from sklearn.decomposition import PCA # Import PCA

In [20]:
class BaseFeatureExtractor(ABC, TransformerMixin):
    """Abstract base class for feature extractors"""
    
    @abstractmethod
    def transform(self, X):
        pass
    
    def fit(self, X, y=None):
        return self

In [21]:
class TextVectorizer(BaseFeatureExtractor):
    """Handles traditional text vectorization methods"""
    
    def __init__(self, method='tfidf', max_features=5000, ngram_range=(1,2)):
        self.method = method
        self.max_features = max_features
        self.ngram_range = ngram_range
        self.vectorizer = None
        
    def fit(self, X, y=None):
        texts = self._preprocess_text(X)
        
        if self.method == 'tfidf':
            self.vectorizer = TfidfVectorizer(max_features=self.max_features, 
                                            ngram_range=self.ngram_range)
        else:
            self.vectorizer = CountVectorizer(max_features=self.max_features, 
                                            ngram_range=self.ngram_range)
        self.vectorizer.fit(texts)
        return self
    
    def transform(self, X):
        texts = self._preprocess_text(X) 
        return self.vectorizer.transform(texts)
    
    def _preprocess_text(self, X):
        def clean_text(snippets):
            # Join snippets
            text = ' '.join(snippets)
            
            # Basic cleaning
            text = text.lower()
            
            # Remove numbers but keep important financial numbers
            # Improved regex to handle broader range of financial numbers
            text = re.sub(r'\b(?<!\$)\d+(?!\s*(?:million|billion|trillion|percent|%))\b', '', text)
            
            # Normalize financial terms
            text = re.sub(r'(\$|£|€)', 'currency_symbol', text)
            text = re.sub(r'\b(million|mn)\b', 'million', text)
            text = re.sub(r'\b(billion|bn)\b', 'billion', text)
            
            # Remove extra whitespace
            text = re.sub(r'\s+', ' ', text).strip()
            return text
            
        return X['snippets'].apply(clean_text)

In [22]:
class EmbeddingVectorizer(BaseFeatureExtractor):
    """Handles word embeddings (GloVe/Word2Vec/FastText)"""
    
    def __init__(self, embedding_path, embedding_dim=300):
        self.embedding_path = embedding_path
        self.embedding_dim = embedding_dim
        self.embedding_model = None
        self.nlp = English()
        
    def fit(self, X, y=None):
        if not os.path.exists(self.embedding_path):
            # Download the GloVe embeddings if not present
            import gensim.downloader as api
            
            print(f"Downloading GloVe embeddings from {self.embedding_path}")
            self.embedding_model = api.load(self.embedding_path)
            # Save the downloaded embeddings for future use
            self.embedding_model.save_word2vec_format(self.embedding_path + ".word2vec") # Correct file extension
        elif 'glove' in self.embedding_path.lower():
            # Convert GloVe format to word2vec format if not already done
            word2vec_path = self.embedding_path + '.word2vec' # Correct file extension
            if not os.path.exists(word2vec_path):
                print(f"Converting GloVe to Word2Vec format: {self.embedding_path}")
                glove2word2vec(self.embedding_path, word2vec_path)
            self.embedding_model = KeyedVectors.load_word2vec_format(word2vec_path)
        else:
            self.embedding_model = KeyedVectors.load_word2vec_format(self.embedding_path)
        return self
    
    def transform(self, X):
        def document_to_vector(texts):
            vectors = []
            for text in texts:
                tokens = [token.text for token in self.nlp(text)]
                word_vectors = [self.embedding_model[word] for word in tokens 
                               if word in self.embedding_model]
                if len(word_vectors) > 0:
                    vectors.append(np.mean(word_vectors, axis=0))
                else:
                    vectors.append(np.zeros(self.embedding_dim))
            return np.array(vectors)
        
        texts = X['snippets'].apply(lambda x: ' '.join(x))
        return document_to_vector(texts)


In [23]:
class TransformerEmbeddings(BaseFeatureExtractor):
    """Generates contextual embeddings using transformer models"""
    
    def __init__(self, model_name='bert-base-uncased', batch_size=32, max_length=128, device='cpu'): # Added device
        self.model_name = model_name
        self.batch_size = batch_size
        self.max_length = max_length
        self.tokenizer = None
        self.model = None
        self.device = device # Added device
        
    def fit(self, X, y=None):
        self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
        self.model = AutoModel.from_pretrained(self.model_name).to(self.device) # Added to(self.device)
        return self
    
    def transform(self, X):
        texts = X['snippets'].apply(lambda x: ' '.join(x)).tolist()
        embeddings = []
        
        for i in range(0, len(texts), self.batch_size):
            batch = texts[i:i+self.batch_size]
            inputs = self.tokenizer(batch, return_tensors='pt', 
                                  padding=True, truncation=True,
                                  max_length=self.max_length).to(self.device) # Added to(self.device)
            
            with torch.no_grad():
                outputs = self.model(**inputs)
            
            batch_embeddings = outputs.last_hidden_state.mean(dim=1).cpu().numpy()
            embeddings.append(batch_embeddings)
        
        return np.concatenate(embeddings, axis=0)


In [24]:
class LinguisticFeatureExtractor(BaseFeatureExtractor):
    """Extracts syntactic and lexical features"""
    
    def __init__(self):
        self.nlp = English()
        self.sia = SentimentIntensityAnalyzer()
        self.pos_tags = ['NOUN', 'VERB', 'ADJ', 'ADV']
        
    def transform(self, X):
        features = []
        texts = X['snippets'].apply(lambda x: ' '.join(x))
        
        for text in texts:
            doc = self.nlp(text)
            feature_dict = {
                'num_chars': len(text),
                'num_words': len(list(doc)),
                'num_sentences': len(list(doc.sents)),
                **self._get_pos_counts(doc),
                **self._get_sentiment_scores(text)
            }
            features.append(feature_dict)
            
        return pd.DataFrame(features)
    
    def _get_pos_counts(self, doc):
        counts = {f"pos_{tag}": 0 for tag in self.pos_tags}
        for token in doc:
            if token.pos_ in counts:
                counts[f"pos_{token.pos_}"] += 1
        return counts
    
    def _get_sentiment_scores(self, text):
        scores = self.sia.polarity_scores(text)
        return {
            'sentiment_neg': scores['neg'],
            'sentiment_neu': scores['neu'],
            'sentiment_pos': scores['pos'],
            'sentiment_compound': scores['compound']
        }


In [25]:
class BusinessFeatureExtractor(BaseFeatureExtractor):
    """Extracts business-specific features"""
    
    def __init__(self):
        self.nlp = spacy.load("en_core_web_sm")
        # Add financial patterns matcher
        self.matcher = Matcher(self.nlp.vocab)
        self._add_patterns()
        
        # Common financial metrics
        self.financial_metrics = ['revenue', 'profit', 'earnings', 'ebitda', 'sales',
                                'margin', 'growth', 'market share', 'dividend']
        
        # Business actions
        self.business_actions = ['merger', 'acquisition', 'partnership', 'investment',
                               'divestment', 'restructuring', 'layoff', 'expansion']
        
        # Market indicators
        self.market_indicators = ['bull', 'bear', 'volatile', 'surge', 'plunge',
                                'rally', 'correction', 'uptick', 'downturn']
    
    def _add_patterns(self):
        # Add patterns for common financial expressions
        money_patterns = [
            [{'LIKE_NUM': True}, {'TEXT': {'IN': ['million', 'billion', 'trillion']}}],
            [{'SYMBOL': {'IN': ['$', '£', '€']}}, {'LIKE_NUM': True}]
        ]
        self.matcher.add('MONEY_AMOUNT', money_patterns)
        
    def transform(self, X):
        features = []
        texts = X['snippets'].apply(lambda x: ' '.join(x))
        
        for text in texts:
            doc = self.nlp(text)
            feature_dict = {
                **self._get_financial_metrics(doc),
                **self._get_business_actions(doc),
                **self._get_market_indicators(doc),
                **self._get_monetary_mentions(doc),
                **self._get_temporal_features(doc),
                **self._get_company_features(doc)
            }
            features.append(feature_dict)
            
        return pd.DataFrame(features)
    
    def _get_financial_metrics(self, doc):
        return {f"metric_{metric.replace(' ', '_')}": 1 if metric in doc.text.lower() else 0
                for metric in self.financial_metrics}
    
    def _get_business_actions(self, doc):
        return {f"action_{action}": 1 if action in doc.text.lower() else 0
                for action in self.business_actions}
    
    def _get_market_indicators(self, doc):
        return {f"indicator_{indicator}": 1 if indicator in doc.text.lower() else 0
                for indicator in self.market_indicators}
    
    def _get_monetary_mentions(self, doc):
        matches = self.matcher(doc)
        return {
            'has_monetary_value': len(matches) > 0,
            'monetary_mention_count': len(matches)
        }
    
    def _get_temporal_features(self, doc):
        # Extract temporal expressions
        time_indicators = ['year', 'quarter', 'month', 'week', 'day']
        return {
            'has_temporal_reference': any(indicator in doc.text.lower() for indicator in time_indicators),
            'future_reference': any(word in doc.text.lower() for word in ['will', 'plan', 'expect', 'forecast'])
        }
    
    def _get_company_features(self, doc):
        return {
            'company_count': len([ent for ent in doc.ents if ent.label_ == 'ORG']),
            'has_multiple_companies': len([ent for ent in doc.ents if ent.label_ == 'ORG']) > 1
        }

In [26]:
class FeatureCombiner(BaseFeatureExtractor):
    """Combines different feature sets and handles feature selection"""
    
    def __init__(self, feature_extractors, n_components=0.95):
        self.feature_extractors = feature_extractors
        self.n_components = n_components
        self.pca = None
        self.feature_names = None
        
    def fit(self, X, y=None):
        # Fit all extractors
        features = []
        for extractor in self.feature_extractors:
            if hasattr(extractor, 'fit'):
                extractor.fit(X, y)
            feat = extractor.transform(X)
            features.append(feat)
            
        # Combine features
        combined = self._combine_features(features)
        
        # Apply PCA if needed
        if self.n_components:
            self.pca = PCA(n_components=self.n_components)
            self.pca.fit(combined)
            
        return self
    
    def transform(self, X):
        features = []
        for extractor in self.feature_extractors:
            feat = extractor.transform(X)
            features.append(feat)
            
        combined = self._combine_features(features)
        
        if self.pca:
            combined = self.pca.transform(combined)
            
        return combined
    
    def _combine_features(self, features):
        # Convert sparse matrices to dense if needed
        dense_features = []
        for feat in features:
            if isinstance(feat, scipy.sparse.spmatrix):
                dense_features.append(feat.toarray())
            elif isinstance(feat, pd.DataFrame):
                dense_features.append(feat.values)
            elif isinstance(feat, np.ndarray): # Added this condition
                dense_features.append(feat)
            else:
                raise ValueError(f"Unsupported feature type: {type(feat)}") # Added type error for unsupported types
                
        return np.hstack(dense_features)

In [27]:
class FeatureEngineeringPipeline:
    """Orchestrates multiple feature extraction methods"""
    
    def __init__(self, feature_config):
        self.feature_extractors = []
        self._validate_config(feature_config)
        
    def _validate_config(self, config):
        for extractor_type, params in config.items():
            if extractor_type == 'bow':
                self.feature_extractors.append(
                    TextVectorizer(method='bow', **params)
                )
            elif extractor_type == 'tfidf': 
                self.feature_extractors.append(
                    TextVectorizer(method='tfidf', **params)
                )
            elif extractor_type == 'embeddings':
                self.feature_extractors.append(
                    EmbeddingVectorizer(**params)
                )
            elif extractor_type == 'transformers':
                self.feature_extractors.append(
                    TransformerEmbeddings(**params)
                )
            elif extractor_type == 'linguistic':
                self.feature_extractors.append(
                    LinguisticFeatureExtractor(**params)
                )
            elif extractor_type == 'business':
                self.feature_extractors.append(
                    BusinessFeatureExtractor(**params)
                )
            else:
                raise ValueError(f"Invalid extractor type: {extractor_type}")
    
    def fit_transform(self, X):
        features = []
        for extractor in self.feature_extractors:
            transformed = extractor.fit_transform(X)
            if isinstance(transformed, np.ndarray):
                df = pd.DataFrame(transformed)
            elif isinstance(transformed, pd.DataFrame):
                df = transformed
            elif isinstance(transformed, scipy.sparse.spmatrix): # Corrected condition
                df = pd.DataFrame(transformed.toarray())
            else:
                raise ValueError(f"Unsupported feature type: {type(transformed)}") # Added type error for unsupported types
            features.append(df.add_prefix(f"{type(extractor).__name__}_"))
            
        return pd.concat(features, axis=1)

In [28]:
feature_config = {
    'tfidf': {
        'max_features': 1000,
        'ngram_range': (1, 2)
    },
    'embeddings': {
        'embedding_path': EMBEDDING_FILE,
        'embedding_dim': EMBEDDING_DIM
    },
    'transformers': {
        'model_name': MODEL_NAME,
        'device': 'cuda' if torch.cuda.is_available() else 'cpu'  # Use GPU if available
    },
    'linguistic': {},
    'business': {}
}

In [29]:
feature_pipeline = FeatureEngineeringPipeline(feature_config)

In [30]:
feature_matrix = feature_pipeline.fit_transform(cleaned_data)

In [None]:
class ModelTrainer:
    def __init__(self, features, targets, test_size=0.2, random_state=42): # Added random_state
        self.features = features
        self.targets = targets
        self.test_size = test_size
        self.random_state = random_state # Added random_state
        self.models = {}
        self.results = {}
        
        # Split data
        self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(
            features, targets,
            test_size=test_size,
            stratify=targets,
            random_state=self.random_state # Use random_state here
        )
        
    def add_model(self, name, model):
        self.models[name] = model
        
    def _evaluate(self, y_true, y_pred):
        print(classification_report(y_true, y_pred))
        return {
            'f1_weighted': f1_score(y_true, y_pred, average='weighted'),
            'accuracy': np.mean(y_true == y_pred)
        }
    
    def run_cross_validation(self, model, cv=5):
        skf = StratifiedKFold(cv, shuffle=True, random_state=self.random_state) # Added shuffle and random_state
        scores = []
        
        for train_idx, val_idx in skf.split(self.X_train, self.y_train):
            X_fold_train = self.X_train.iloc[train_idx] # Use iloc for DataFrames
            y_fold_train = self.y_train.iloc[train_idx] # Use iloc for DataFrames
            X_val = self.X_train.iloc[val_idx] # Use iloc for DataFrames
            y_val = self.y_train.iloc[val_idx] # Use iloc for DataFrames
            
            model.train(X_fold_train, y_fold_train, X_val, y_val)
            preds = model.predict(X_val)
            scores.append(self._evaluate(y_val, preds))
            
        return np.mean(scores)
    
    def train_all(self, use_cross_validation=True):
        for name, model in self.models.items():
            print(f"\n=== Training {name} ===")
            
            if use_cross_validation:
                cv_score = self.run_cross_validation(model)
                print(f"CV Score: {cv_score}")
                
            # Final training on full data
            final_score = model.train(self.X_train, self.y_train, self.X_test, self.y_test) # Use test data for validation
            test_preds = model.predict(self.X_test)
            test_metrics = self._evaluate(self.y_test, test_preds)
            
            self.results[name] = {
                'cv_score': cv_score if use_cross_validation else None,
                'test_metrics': test_metrics,
                'model': model
            }
            
    def save_best_model(self, metric='f1_weighted', file_prefix = "best_model"):
        """Saves the best model based on a specified metric."""
        if not self.results:
            raise ValueError("No models trained yet. Call 'train_all' first.")

        best_name = max(self.results, key=lambda k: self.results[k]['test_metrics'][metric])
        best_model = self.results[best_name]['model']
        filepath = f"{file_prefix}_{best_name}.pkl"
        joblib.dump(best_model, filepath) # Use joblib for saving models
        print(f"Saved best model ({metric}): {best_name} to {filepath}")
        
        # Save associated data for reproducibility
        best_model_data = self.results[best_name]
        if best_model_data.get('cv_score'):
            print(f"Best model's CV score: {best_model_data.get('cv_score')}")
        print(f"Best model's test metrics: {best_model_data['test_metrics']}")
        print(f"Best model's parameters: {best_model.best_params_ if hasattr(best_model, 'best_params_') else 'N/A'}")

In [31]:
class TraditionalMLModel(BaseModel):
    """Handles traditional machine learning models"""
    
    MODEL_TYPES = {
        'svm': SVC,
        'random_forest': RandomForestClassifier,
        'gradient_boosting': GradientBoostingClassifier
    }
    
    def __init__(self, model_type='random_forest', param_space=None, **kwargs):
        super().__init__(**kwargs)
        self.model_type = model_type
        self.param_space = param_space or self._default_param_space()
        self.scoring = 'f1_weighted'
        
    def _default_param_space(self):
        if self.model_type == 'svm':
            return {
                'C': (1e-3, 1e3, 'log-uniform'),
                'gamma': (1e-4, 1e-1, 'log-uniform')
            }
        else:
            return {
                'n_estimators': (100, 1000),
                'max_depth': (3, 10),
                'learning_rate': (0.01, 0.3) if self.model_type == 'gradient_boosting' else (None, None) # Conditional hyperparameter
            }
    
    def build_model(self, **kwargs):
        model_class = self.MODEL_TYPES[self.model_type]
        return model_class(
            random_state=self.random_state,
            **kwargs
        )
    
    def train(self, X_train, y_train, X_val=None, y_val=None):
        opt = BayesSearchCV(
            estimator=self.build_model(),
            search_spaces=self.param_space,
            n_iter=30,
            cv=3,
            scoring=self.scoring,
            random_state=self.random_state,
            n_jobs=-1 # Use all available cores for faster training
        )
        
        opt.fit(X_train, y_train)
        self.model = opt.best_estimator_
        self.best_params_ = opt.best_params_
        return opt.best_score_

In [32]:
class DeepLearningModel(BaseModel):
    """Handles deep learning architectures"""
    
    def __init__(self, model_type='lstm', input_shape=None, **kwargs):
        super().__init__(**kwargs)
        self.model_type = model_type
        self.input_shape = input_shape
        self.history = None
        
    def build_model(self, **params):
        if self.model_type == 'lstm':
            return self._build_lstm(**params)
        elif self.model_type == 'cnn':
            return self._build_cnn(**params)
        elif self.model_type == 'transformer':
            return self._build_transformer()
            
    def _build_lstm(self, units=64, dropout=0.2):
        inputs = Input(shape=self.input_shape)
        x = LSTM(units, return_sequences=True)(inputs)
        x = Dropout(dropout)(x)
        x = Dense(32, activation='relu')(x)
        outputs = Dense(3, activation='softmax')(x)
        
        model = Model(inputs, outputs)
        model.compile(
            optimizer=Adam(0.001),
            loss='sparse_categorical_crossentropy',
            metrics=['accuracy']
        )
        return model
    
    def _build_cnn(self, filters=64, kernel_size=3):
        inputs = Input(shape=self.input_shape)
        x = Conv1D(filters, kernel_size, activation='relu')(inputs)
        x = GlobalMaxPooling1D()(x)
        x = Dense(32, activation='relu')(x)
        outputs = Dense(3, activation='softmax')(x)


        model = Model(inputs, outputs)
        model.compile(
            optimizer=Adam(0.001),
            loss='sparse_categorical_crossentropy',
            metrics=['accuracy']
        )
        return model
    
    def _build_transformer(self):
        try:
            model = TFAutoModelForSequenceClassification.from_pretrained(
                self.model_name,
                num_labels=3
            )
            model.compile(
                optimizer=Adam(3e-5),
                loss='sparse_categorical_crossentropy',
                metrics=['accuracy']
            )
            return model
        except ValueError as e: # Catch potential import errors
            print("Error loading transformer model:\n", e)
            return None

    def train(self, X_train, y_train, X_val=None, y_val=None):
        if self.model_type == 'transformer': # Special handling for transformer training
            if self.model is None: # Check if the model was loaded correctly
                return 0.0 # Return a default score if the model couldn't be loaded
            
            callbacks = [
                EarlyStopping(patience=3, restore_best_weights=True),
                ModelCheckpoint('best_model.h5', save_best_only=True)
            ]

            self.history = self.model.fit(
                X_train, y_train,
                validation_data=(X_val, y_val),
                epochs=20,
                batch_size=32,
                callbacks=callbacks,
                verbose=1
            )
            return max(self.history.history['val_accuracy'])
        else:
            self.model = self.build_model()

            callbacks = [
                EarlyStopping(patience=3, restore_best_weights=True),
                ModelCheckpoint('best_model.h5', save_best_only=True)
            ]

            self.history = self.model.fit(
                X_train, y_train,
                validation_data=(X_val, y_val),
                epochs=20,
                batch_size=32,
                callbacks=callbacks,
                verbose=1
            )
            return max(self.history.history['val_accuracy'])

    def predict(self, X): # Modified predict function to handle different model types
        if self.model_type == 'transformer':
            # Transformer models require tokenization
            inputs = self.tokenizer(X, return_tensors='pt', padding=True, truncation=True)
            with torch.no_grad():
                outputs = self.model(**inputs)
            predictions = torch.argmax(outputs.logits, dim=-1).cpu().numpy()
            return predictions
        else:
            return self.model.predict(X)
    
    def save(self, path): # Modified saving method for transformer models
        if self.model_type == 'transformer':
            self.model.save_pretrained(path)
            self.tokenizer.save_pretrained(path)
        else:
            joblib.dump(self.model, path)

    def load(self, path): # Modified loading method for transformer models
        if self.model_type == 'transformer':
            self.tokenizer = AutoTokenizer.from_pretrained(path)
            self.model = TFAutoModelForSequenceClassification.from_pretrained(path)
        else:
            self.model = joblib.load(path)
