# Dependencies

In [1]:
import re
import tqdm
import nltk
import spacy
import emoji
import warnings
import numpy as np
import pandas as pd
import seaborn as sns
from tqdm import tqdm
from pathlib import Path
from sklearn.svm import SVC
from textblob import TextBlob
from scipy.sparse import hstack
from wordcloud import WordCloud
from collections import Counter
import matplotlib.pyplot as plt
from scipy.sparse import spmatrix
from nltk.corpus import stopwords
from gensim.models import Word2Vec
from scipy.sparse import csr_matrix
from lightgbm import LGBMClassifier
from sklearn.base import BaseEstimator
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer
from sklearn.base import TransformerMixin
from sklearn.metrics import accuracy_score
from sklearn.feature_selection import chi2
from sklearn.feature_selection import RFE
from sklearn.feature_selection import RFECV
from sklearn.metrics import confusion_matrix
from sklearn.naive_bayes import MultinomialNB
from sklearn.naive_bayes import GaussianNB
from sklearn.metrics import mutual_info_score
from sklearn.preprocessing import MinMaxScaler
from sklearn.ensemble import AdaBoostClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.neural_network import MLPClassifier
from sklearn.feature_selection import SelectKBest
from sklearn.metrics import classification_report
from gensim.models.phrases import Phrases, Phraser
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.feature_selection import SelectFromModel
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.feature_selection import mutual_info_classif
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_extraction.text import TfidfVectorizer


# Ignore all runtime warnings
warnings.filterwarnings('ignore')


# Loading Feature Engineering classes

## Classification Features

In [2]:
# Dependencies
import nltk
import numpy as np
from collections import defaultdict
from gensim.models import Word2Vec
from nltk.tokenize import word_tokenize
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.base import TransformerMixin
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import MultiLabelBinarizer

nltk.download('punkt')

class ClassFeatureEngineering:

    """
    A class for implementing various text feature engineering techniques
    
    Attributes:
    -----------
        texts        { list }  : List of preprocessed text documents
        labels       { list }  : List of class labels corresponding to each text document
    """
    
    def __init__(self, texts: list, labels: list) -> None:
        """
        Initialize ClassFeatureEngineering with texts and labels
        
        Arguments:
        ----------
            texts   : List of preprocessed text documents
            labels  : List of class labels corresponding to each text document
            
        Raises:
        -------
            ValueError   : If texts or labels are empty or of different lengths
        """
        if not texts or not labels:
            raise ValueError("Input texts and labels cannot be empty")
        if len(texts) != len(labels):
            raise ValueError("The number of texts and labels must be the same")
        
        self.texts  = texts
        self.labels = labels
        self.classes = np.unique(labels)


    def class_specific_vocabulary(self) -> tuple:
        """
        Generate a vocabulary specific to each class label and return a vectorizer and sparse matrix.
        
        Returns:
        --------
            tuple : (vectorizer, sparse_matrix) 
                - vectorizer : A fitted CountVectorizer instance.
                - sparse_matrix : A sparse matrix with the term frequencies for each class.
        """
        try:
            print("Creating class-specific vocabulary...")
            class_vocabs = defaultdict(set)
            
            # Collect class-specific vocabularies
            for text, label in zip(self.texts, self.labels):
                tokens = word_tokenize(text.lower())  
                class_vocabs[label].update(tokens)
            
            print("Class-specific vocabulary created.")
            
            # Create a unified list of all class-specific vocabularies
            all_tokens = [' '.join(list(vocab)) for vocab in class_vocabs.values()]
            
            # Initialize and fit the CountVectorizer
            vectorizer = CountVectorizer()
            sparse_matrix = vectorizer.fit_transform(all_tokens)
            
            return vectorizer, sparse_matrix
            
        except Exception as e:
            raise e
        

    def label_aware_embeddings(self, embedding_dim=100) -> tuple:
        """
        Generate label-aware embeddings and return vectorizer and sparse matrix.
        
        Parameters:
        -----------
            embedding_dim : int
                Dimensionality of the embedding vectors.
        
        Returns:
        --------
            tuple : (vectorizer, sparse_matrix)
                - vectorizer : Fitted CountVectorizer for label text.
                - sparse_matrix : Sparse matrix representation of label embeddings.
        """
        try:
            print("Generating label-aware embeddings...")
            
            # Tokenize the texts
            tokenized_texts = [word_tokenize(text.lower()) for text in self.texts]
            
            # Train Word2Vec embeddings
            w2v_model = Word2Vec(sentences=tokenized_texts, vector_size=embedding_dim, window=5, min_count=1, workers=4)
            
            # Prepare label-specific texts
            label_texts = {label: ' '.join([word for text, lbl in zip(self.texts, self.labels) if lbl == label for word in word_tokenize(text.lower())]) for label in set(self.labels)}
            
            # Use CountVectorizer to create sparse matrix
            vectorizer = CountVectorizer()
            sparse_matrix = vectorizer.fit_transform(label_texts.values())
            
            print("Label-aware embeddings generated.")
            return vectorizer, sparse_matrix
        
        except Exception as e:
            raise e


    """

    def hierarchical_class_features(self, hierarchy=None) -> tuple:
    
    Generate hierarchical class features and return vectorizer and sparse matrix.
        
    Parameters:
        -----------
            hierarchy : dict
                A dictionary where keys are parent labels and values are lists of child labels.
        
        Returns:
        --------
            tuple : (vectorizer, sparse_matrix)
                - vectorizer : Fitted CountVectorizer for hierarchical features.
                - sparse_matrix : Sparse matrix representation of hierarchical features.
    
        try:
            print("Generating hierarchical class features...")
            
            # Flatten hierarchy into parent-child paths
            parent_child_pairs = []
            for parent, children in hierarchy.items():
                for child in children:
                    parent_child_pairs.append((parent, child))
            
            # Create a mapping of labels to their hierarchical paths
            label_to_hierarchy = defaultdict(list)
            for parent, child in parent_child_pairs:
                label_to_hierarchy[child].append(parent)
                label_to_hierarchy[child].extend(label_to_hierarchy[parent])  # Add parent's hierarchy recursively
            
            # Flatten paths into text representations
            hierarchical_texts = {label: ' '.join(path) for label, path in label_to_hierarchy.items()}
            
            # Use CountVectorizer to create sparse matrix
            vectorizer = CountVectorizer()
            sparse_matrix = vectorizer.fit_transform(hierarchical_texts.values())
            
            print("Hierarchical class features generated.")
            return vectorizer, sparse_matrix
        
        except Exception as e:
            raise e
        
    """
    def multi_label_features(self) -> tuple:
        
        """
            Generate multi-label features and return vectorizer and sparse matrix.
            
            Returns:
            --------
                tuple : (vectorizer, sparse_matrix)
                    - vectorizer : Fitted CountVectorizer for multi-label features.
                    - sparse_matrix : Sparse matrix representation of multi-label features.
        """
        try:
            print("Generating multi-label features...")
                
            # Convert labels to sets if not already
            multi_labels = [set(lbl) if isinstance(lbl, list) else {lbl} for lbl in self.labels]
                
            # Create multi-label strings for each sample
            label_texts = [' '.join(label) for label in multi_labels]
                
            # Use CountVectorizer to create sparse matrix
            vectorizer = CountVectorizer()
            sparse_matrix = vectorizer.fit_transform(label_texts)
                
            print("Multi-label features generated.")
            return vectorizer, sparse_matrix
            
        except Exception as e:
            raise e
        

[nltk_data] Downloading package punkt to /Users/it042307/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


## Contextual_Features

In [3]:
# DONE BY AVANTIKA ROY

# Dependencies
import nltk
import warnings
import numpy as np
import pandas as pd
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_extraction.text import TfidfVectorizer

# Ignore all runtime warnings
warnings.filterwarnings('ignore')

class Contextual_Features:
    """
    A class for implementing various text feature engineering techniques
    
    Attributes:
    -----------
        texts        { list }  : List of preprocessed text documents
        
        max_features  { int }  : Maximum number of features to create
        
        ngram_range  { tuple } : Range of n-grams to consider
    """
    
    def __init__(self, texts: list, max_features: int = None, ngram_range: tuple = (1, 3)) -> None:
        """
        Initialize TextFeatureEngineering with texts and parameters
        
        Arguments:
        ----------
            texts        : List of preprocessed text documents
            
            max_features : Maximum number of features (None for no limit)
            
            ngram_range  : Range of n-grams to consider (min_n, max_n)
            
        Raises:
        -------
            ValueError   : If texts is empty or parameters are invalid
        """
        if not texts:
            raise ValueError("Input texts cannot be empty")
            
        self.texts        = texts
        self.max_features = max_features
        self.ngram_range  = ngram_range
        
    def window_based(self):
        """
        Create Window Based Feature Engineering with texts and parameters

        Arguments:
        ----------
        texts             : List of preprocessed text documents
        """
        try:
            print("Creating Window-Based Contextual Features:...")
            vectorizer = CountVectorizer(max_features = self.max_features,
                                         ngram_range  = self.ngram_range)
            ngrams_features     = vectorizer.fit_transform(self.texts)
            
            return vectorizer, ngrams_features
            
        except Exception as e:
            raise

    '''def position_based(self):
        """
        Create Position Based Feature Engineering with texts and parameters

        Arguments:
        ----------
        texts              : List of preprocessed text documents
        """
        try:
            print("Creating Position-Based Contextual Features:...")
            position_features = []
            
            position_vectorizer = CountVectorizer(max_features = self.max_features,
                                                  ngram_range  = self.ngram_range)

            for doc in self.texts:
                words = doc.split() 
            
                position_features.extend([{"word": word, "position": idx} for idx, word in enumerate(words)])

            return position_vectorizer, position_features

        except Exception as e:
            raise'''
    
    def position_based(self):
        """
        Create Position Based Feature Engineering with texts and parameters

        Arguments:
        ----------
        texts              : List of preprocessed text documents
        """
        try:
            print("Creating Position-Based Contextual Features:...")
            position_vectorizer = CountVectorizer(max_features=self.max_features,
                                                ngram_range=self.ngram_range)
            
            # First, fit and transform the texts normally
            position_features = position_vectorizer.fit_transform(self.texts)
            
            return position_vectorizer, position_features

        except Exception as e:
            raise

    '''def generate_ngrams(self, n=3):
        """
        Generate N-Grams

        Arguments:
        ----------
        words         : List of words taken individually from the preprocessed text documents
        n             : Individual words from the list
        """
        print("Generating N-Grams:...")
        ngrams = []
        
        ngrams_vectorizer = CountVectorizer(max_features = self.max_features,
                                            ngram_range  = self.ngram_range)

        for doc in self.texts:
            words = doc.split() 
            ngrams.extend([tuple(words[i:i+n]) for i in range(len(words)-n+1)]) 

        return ngrams_vectorizer, ngrams'''
    
    def generate_ngrams(self, n=3):
        """
        Generate N-Grams

        Arguments:
        ----------
        words         : List of words taken individually from the preprocessed text documents
        n             : Individual words from the list
        """
        print("Generating N-Grams:...")
        
        ngrams_vectorizer = CountVectorizer(max_features = self.max_features,
                                            ngram_range  = self.ngram_range)

        ngrams_features = ngrams_vectorizer.fit_transform(self.texts)

        return ngrams_vectorizer, ngrams_features


    def cross_document(self):
        """
        Create Cross Document Feature Engineering with texts and parameters

        Arguments:
        ----------
        texts             : List of preprocessed text documents
        """
        try: 
            print("Creating Cross Document Contextual Feature Engineering:...")
            vectorizer   = TfidfVectorizer(max_features = self.max_features, 
                                           ngram_range  = self.ngram_range)
            tfidf_matrix = vectorizer.fit_transform(self.texts)
            return vectorizer, tfidf_matrix

        except Exception as e:
            raise


## Semantic Features

In [4]:
# DEPENDENCIES

from gensim.models import Word2Vec
import numpy as np

class Semantic_Feature_Engineering:
    
    """
    A class for implementing various semantic feature engineering techniques.
    
    Attributes:
    -----------
        texts        { list }  : List of preprocessed text documents.
        
        max_features  { int }  : Maximum number of features to create.
    """
    
    def __init__(self, texts: list, max_features: int = None) -> None:
        
        """
        Initialize Semantic_Feature_Engineering with texts and parameters.
        
        Arguments:
        ----------
            texts        : List of preprocessed text documents.
            
            max_features : Maximum number of features (None for no limit).
            
        Raises:
        -------
            ValueError   : If texts is empty or parameters are invalid.
        """
        
        if not texts:
            raise ValueError("Input texts cannot be empty")
            
        self.texts        = texts
        self.max_features = max_features
    
    def word2vec_cbow(self, vector_size: int = 100, window: int = 5, min_count: int = 1, workers: int = 4) -> tuple:
        
        """
        Generate semantic features using Word2Vec (CBOW) and return the feature matrix and vectorizer.
        
        Arguments:
        ----------
            vector_size : Dimensionality of word embeddings (default: 100).
            
            window      : Context window size (default: 5).
            
            min_count   : Ignores words with frequency lower than this (default: 1).
            
            workers     : Number of worker threads to train the model (default: 4).
        
        Returns:
        --------
            tuple:
                - np.ndarray : Document-level feature matrix (each document represented as the average of its word vectors).
                - Word2Vec   : The trained Word2Vec model (vectorizer).
        """
    
        tokenized_texts          = [doc.split() for doc in self.texts]
        
        w2v_model                = Word2Vec(sentences   = tokenized_texts, 
                                            vector_size = vector_size, 
                                            window      = window, 
                                            min_count   = min_count, 
                                            workers     = workers,
                                            sg          = 0
                                            )
        
        features                 = []
        
        for tokens in tokenized_texts:
            
            vectors              = [w2v_model.wv[word] for word in tokens if word in w2v_model.wv]
            
            if vectors:
                document_vector  = np.mean(vectors, axis=0)
            
            else:
                document_vector  = np.zeros(vector_size)
            
            features.append(document_vector)
        
        feature_matrix           = np.array(features)
        
        if self.max_features is not None and self.max_features < vector_size:
            feature_matrix       = feature_matrix[:, :self.max_features]
        
        return feature_matrix, w2v_model
    
    def glove(self, glove_path: str, embedding_dim: int = 100) -> tuple:
        
        """
        Generate semantic features using GloVe and return the feature matrix and embedding dictionary.
        
        Arguments:
        ----------
            glove_path        : Path to the GloVe embeddings file.
            
            embedding_dim     : Dimensionality of GloVe embeddings (default: 100).
        
        Returns:
        --------
            tuple:
                - np.ndarray  : Document-level feature matrix (each document represented as the average of its word vectors).
                - dict        : The GloVe embedding dictionary.
        """
    
        glove_embeddings                = {}
        
        with open(glove_path, 'r', encoding = 'utf-8') as f:
            
            for line in f:
                values                  = line.split()
                word                    = values[0]
                vector                  = np.asarray(values[1:], dtype='float32')
                glove_embeddings[word]  = vector
        
        tokenized_texts                 = [doc.split() for doc in self.texts]
        
        features                        = []
        
        for tokens in tokenized_texts:
    
            vectors                     = [glove_embeddings[word] for word in tokens if word in glove_embeddings]
            
            if vectors:
                document_vector         = np.mean(vectors, axis=0)
            
            else:
                document_vector         = np.zeros(embedding_dim)
            
            features.append(document_vector)
        
        feature_matrix                  = np.array(features)
        
        if self.max_features is not None and self.max_features < embedding_dim:
            feature_matrix              = feature_matrix[:, :self.max_features]
        
        return feature_matrix, glove_embeddings

## Statistical Features

In [5]:
import textstat
import pandas as pd
import numpy as np
from nltk.tokenize import word_tokenize, sent_tokenize
from sklearn.feature_extraction.text import CountVectorizer

class Statistical_Feature_Engineering():
    """
    A class for statistical feature engineering.
    Attributes:
    ----------
    vectorizer : CountVectorizer instance for text vectorization.
    
    """
    def __init__(self, max_features=1000):
        """
        Intializes the Statistical_Feature_Engineering.
        """
        self.max_features = max_features
        self.vectorizer = CountVectorizer(max_features=self.max_features)

    def document_statistics(self,df):
        """
        Calculates basic documnet statistics i.e. Character Count, Word Count, Sentence Count, Average Word Length(AWL), 
        Average Sentence Length(ASL), Unique Word Ratio(UWR).

        Arguments:
        ----------
        df {DataFrame} : Input Data.

        Returns:
        --------
        df {DataFrame} : Output data with the calculated document statistics.
        
        """
        
        df['char_count'] = df['cleaned_review'].apply(lambda x: len(x))
        df['word_count'] = df['cleaned_review'].apply(lambda x: len(x.split()))
        df['sent_count'] = df['review'].apply(lambda x: len(sent_tokenize(x)))
        df['AWL'] = df['char_count'].div(df['word_count'])
        df['ASL'] = df['word_count'].div(df['sent_count'])
        df['unique_word_count'] = df['cleaned_review'].apply(lambda x: len(set(word_tokenize(x))))
        df['UWR'] = df['unique_word_count'].div(df['word_count'])
        return df


    def readability_score(self, df, score='FRE'):
        """
        Calculates the readability scores i.e. Flesch Readine Ease(FRE), Gunning Fog Index(GFI), SMOG Index(SMOG.

        Arguments:
        ----------
        df {DataFrame} : Input Data.

        score {str} : score type {'FRE', 'GFI', SMOG}.

        Returns:
        ---------
        fre {series} : FRE scores.

        gfi {series} : GFI scores.

        smog {series} : SMOG scores.
        
        """
        if(score == 'FRE'):
            fre = df['cleaned_review'].apply(textstat.flesch_reading_ease)
            return fre
        elif(score == 'GFI'):
            gfi = df['cleaned_review'].apply(textstat.gunning_fog)
            return gfi
        elif(score == 'SMOG'):
            smog = df['cleaned_review'].apply(textstat.smog_index)
            return smog
        else:
            raise ValueError("Unsupported score type. Choose from 'FRE', 'GFI', or 'SMOG'.")
        

    def frequency_distribution(self, df,column, fit_transform=False):
        """
        Calculates the word counts in each document.

        df {DataFrame} : Input Data.

        column {str} : Column name for calculating the frequency distribution.
        """
        if fit_transform:
            X = self.vectorizer.fit_transform(df[column])
        else:
            X = self.vectorizer.transform(df[column])
        bow = pd.DataFrame(X.toarray(), columns=self.vectorizer.get_feature_names_out())
        return bow

## Word Level Features

In [6]:
# Dependencies
import numpy as np
from scipy.sparse import csr_matrix
from sklearn.base import BaseEstimator
from sklearn.base import TransformerMixin
from sklearn.preprocessing import StandardScaler
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_extraction.text import TfidfVectorizer


# Feature Engineering
class TextFeatureEngineering:
    """
    A class for implementing various text feature engineering techniques
    
    Attributes:
    -----------
        texts        { list }  : List of preprocessed text documents
        
        max_features  { int }  : Maximum number of features to create
        
        ngram_range  { tuple } : Range of n-grams to consider
    """
    
    def __init__(self, texts: list, max_features: int = None, ngram_range: tuple = (1, 3)) -> None:
        """
        Initialize TextFeatureEngineering with texts and parameters
        
        Arguments:
        ----------
            texts        : List of preprocessed text documents
            
            max_features : Maximum number of features (None for no limit)
            
            ngram_range  : Range of n-grams to consider (min_n, max_n)
            
        Raises:
        -------
            ValueError   : If texts is empty or parameters are invalid
        """
        if not texts:
            raise ValueError("Input texts cannot be empty")
            
        self.texts        = texts
        self.max_features = max_features
        self.ngram_range  = ngram_range
        
        
    def create_binary_bow(self) -> tuple:
        """
        Create binary bag-of-words features (presence/absence)
        
        Returns:
        --------
            { tuple } : Tuple containing: - Fitted CountVectorizer
                                          - Binary document-term matrix
        """
        try:
            print("Creating binary bag-of-words features...")
            vectorizer = CountVectorizer(binary       = True,
                                         max_features = self.max_features,
                                         ngram_range  = self.ngram_range)
            
            features   = vectorizer.fit_transform(self.texts)
            print(f"Created {features.shape[1]} binary features")
            
            return vectorizer, features
            
        except Exception as e:
            raise
            
            
    def create_count_bow(self) -> tuple:
        """
        Create count-based bag-of-words features
        
        Returns:
        --------
            { tuple } : Tuple containing: - Fitted CountVectorizer
                                          - Count document-term matrix
        """
        try:
            print("Creating count-based bag-of-words features...")
            vectorizer = CountVectorizer(max_features = self.max_features,
                                         ngram_range  = self.ngram_range)
            
            features   = vectorizer.fit_transform(self.texts)
            print(f"Created {features.shape[1]} count-based features")
            
            return vectorizer, features
            
        except Exception as e:
            raise
            
            
    def create_frequency_bow(self) -> tuple:
        """
        Create frequency-based bag-of-words features (term frequency)
        
        Returns:
        --------
            { tuple } : Tuple containing: - Fitted TfidfVectorizer
                                          - Term frequency document-term matrix
        """
        try:
            print("Creating frequency-based bag-of-words features...")
            
            vectorizer = TfidfVectorizer(use_idf      = False,
                                         max_features = self.max_features,
                                         ngram_range  = self.ngram_range)
            
            features   = vectorizer.fit_transform(self.texts)
            print(f"Created {features.shape[1]} frequency-based features")
            
            return vectorizer, features
            
        except Exception as e:
            raise
            
            
    def create_tfidf(self) -> tuple:
        """
        Create TF-IDF features
        
        Returns:
        --------
            { tuple } : Tuple containing: - Fitted TfidfVectorizer
                                          - TF-IDF document-term matrix
        """
        try:
            print("Creating TF-IDF features...")
            vectorizer = TfidfVectorizer(max_features = self.max_features,
                                         ngram_range  = self.ngram_range)
            
            features   = vectorizer.fit_transform(self.texts)
            print(f"Created {features.shape[1]} TF-IDF features")
            
            return vectorizer, features
            
        except Exception as e:
            raise
            
            
    def create_standardized_tfidf(self) -> tuple:
        """
        Create Standardized TF-IDF features
        
        Returns:
        --------
            { tuple } : Tuple containing: - Fitted TfidfVectorizer
                                          - Standardized TF-IDF document-term matrix
        """
        try:
            print("Creating Standardized TF-IDF features...")
            vectorizer          = TfidfVectorizer(max_features = self.max_features, 
                                                  ngram_range  = self.ngram_range)
            
            tfidf_matrix        = vectorizer.fit_transform(self.texts)
            
            scaler              = StandardScaler(with_mean = False)
            
            standardized_matrix = scaler.fit_transform(tfidf_matrix)
            
            print(f"Created {standardized_matrix.shape[1]} standardized TF-IDF features")
            return vectorizer, standardized_matrix
            
        except Exception as e:
            raise
            
            
    def _create_bm25_variant(self, variant: str, k1: float = 1.5, b: float = 0.75, delta: float = 1.0) -> tuple:
        """
        Unified private method to create BM25 variant features.

        Arguments:
        ----------
            variant      : Specify the BM25 variant ("BM25", "BM25F", "BM25L", "BM25+", "BM25T")
            k1           : Term frequency saturation parameter (default: 1.5)
            b            : Length normalization parameter (default: 0.75)
            delta        : Free parameter for certain variants (default: 1.0)

        Returns:
        --------
            { tuple }    : Tuple containing:
                           - Custom transformer for the specified BM25 variant
                           - BM25 variant document-term matrix
        """
        try:
            print(f"Creating {variant} features...")

            class BM25VariantTransformer(BaseEstimator, TransformerMixin):
                def __init__(self, k1=1.5, b=0.75, delta=1.0, variant="BM25", max_features=None):
                    self.k1               = k1
                    self.b                = b
                    self.delta            = delta
                    self.variant          = variant
                    self.max_features     = max_features
                    self.count_vectorizer = CountVectorizer(max_features = self.max_features)

                def fit(self, texts):
                    # Calculate IDF and average document length
                    X                   = self.count_vectorizer.fit_transform(texts)
                    self.avg_doc_length = X.sum(axis=1).mean()
                    n_docs              = len(texts)
                    df                  = np.bincount(X.indices, minlength=X.shape[1])
                    self.idf            = np.log((n_docs - df + 0.5) / (df + 0.5) + 1)
                    return self

                def transform(self, texts):
                    X           = self.count_vectorizer.transform(texts)
                    doc_lengths = X.sum(axis=1).A1
                    rows, cols  = X.nonzero()
                    data        = list()

                    for i, j in zip(rows, cols):
                        tf = X[i, j]

                        if (self.variant == "BM25"):
                            numerator   = tf * (self.k1 + 1)
                            denominator = tf + self.k1 * (1 - self.b + self.b * doc_lengths[i] / self.avg_doc_length)
                            score       = self.idf[j] * numerator / denominator
                        
                        elif (self.variant == "BM25F"):
                            score = self.idf[j] * (tf / (self.k1 + tf))

                        elif (self.variant == "BM25L"):
                            numerator   = tf + self.delta
                            denominator = tf + self.delta + self.k1 * (1 - self.b + self.b * doc_lengths[i] / self.avg_doc_length)
                            score       = self.idf[j] * numerator / denominator
                        
                        elif (self.variant == "BM25+"):
                            numerator   = tf + self.delta
                            denominator = tf + self.k1
                            score       = self.idf[j] * numerator / denominator
                        
                        elif (self.variant == "BM25T"):
                            score = self.idf[j] * (tf * np.log(1 + tf))
                        
                        else:
                            raise ValueError(f"Unknown variant: {self.variant}")
                        
                        data.append(score)

                    return csr_matrix((data, (rows, cols)), shape=X.shape)

                def get_feature_names_out(self):
                    """
                    Return the feature names from the underlying CountVectorizer.
                    """
                    return self.count_vectorizer.get_feature_names_out()

            transformer = BM25VariantTransformer(k1           = k1, 
                                                 b            = b, 
                                                 delta        = delta,
                                                 variant      = variant, 
                                                 max_features = self.max_features)

            features    = transformer.fit_transform(self.texts)
            print(f"Created {features.shape[1]} {variant} features")
            return transformer, features

        except Exception as e:
            raise

    def create_bm25(self, k1: float = 1.5, b: float = 0.75) -> tuple:
        """
        Create BM25 features
        """
        return self._create_bm25_variant(variant = "BM25", 
                                         k1      = k1, 
                                         b       = b)


    def create_bm25f(self, k1: float = 1.5) -> tuple:
        """
        Create BM25F features
        """
        return self._create_bm25_variant(variant = "BM25F", 
                                         k1      = k1)


    def create_bm25l(self, k1: float = 1.5, b: float = 0.75, delta: float = 1.0) -> tuple:
        """
        Create BM25L features
        """
        return self._create_bm25_variant(variant = "BM25L", 
                                         k1      = k1, 
                                         b       = b, 
                                         delta   = delta)


    def create_bm25_plus(self, k1: float = 1.5, delta: float = 1.0) -> tuple:
        """
        Create BM25+ features
        """
        return self._create_bm25_variant(variant = "BM25+", 
                                         k1      = k1, 
                                         delta   = delta)


    def create_bm25t(self, k1: float = 1.5) -> tuple:
        """
        Create BM25T features
        """
        return self._create_bm25_variant(variant = "BM25T", 
                                         k1      = k1)


    def create_skipgrams(self, k: int = 2) -> tuple:
        """
        Create skipgram features
        
        Arguments:
        ----------
            k { int } : Skip distance
            
        Returns:
        --------
            { tuple } : Tuple containing: - Fitted CountVectorizer for skipgrams
                                          - Skipgram document-term matrix
        """
        try:
            print("Creating skipgram features...")
            
            def generate_skipgrams(text: str) -> str:
                words     = text.split()
                skipgrams = list()
                
                for i in range(len(words) - k - 1):
                    skipgram = f"{words[i]}_{words[i + k + 1]}"
                    skipgrams.append(skipgram)
                    
                return ' '.join(skipgrams)
            
            processed_texts = [generate_skipgrams(text) for text in self.texts]
            
            vectorizer      = CountVectorizer(max_features=self.max_features)
            features        = vectorizer.fit_transform(processed_texts)
            
            print(f"Created {features.shape[1]} skipgram features")
            return vectorizer, features
            
        except Exception as e:
            raise
            
            
    def create_positional_ngrams(self) -> tuple:
        """
        Create positional n-gram features
        
        Returns:
        --------
            { tuple } : Tuple containing: - Fitted CountVectorizer for positional n-grams
                                          - Positional n-gram document-term matrix
        """
        try:
            print("Creating positional n-gram features...")
            
            def generate_positional_ngrams(text: str) -> str:
                words      = text.split()
                pos_ngrams = list()
                
                for i in range(len(words)):
                    for n in range(self.ngram_range[0], min(self.ngram_range[1] + 1, len(words) - i + 1)):
                        ngram     = '_'.join(words[i:i+n])
                        pos_ngram = f"pos{i}_{ngram}"
                        pos_ngrams.append(pos_ngram)
                        
                return ' '.join(pos_ngrams)
            
            processed_texts = [generate_positional_ngrams(text) for text in self.texts]
            
            vectorizer      = CountVectorizer(max_features = self.max_features)
            
            features        = vectorizer.fit_transform(processed_texts)
            
            print(f"Created {features.shape[1]} positional n-gram features")
            return vectorizer, features
            
        except Exception as e:
            raise
            
            
    def create_all_features(self) -> dict:
        """
        Create all available feature types
        
        Returns:
        --------
            { dict } : Dictionary mapping feature names to their vectorizer and feature matrix
        """
        try:
            print("Creating all feature types...")
            features                      = dict()
            
            # Create all feature types
            features['binary_bow']        = self.create_binary_bow()
            features['count_bow']         = self.create_count_bow()
            features['frequency_bow']     = self.create_frequency_bow()
            features['tfidf']             = self.create_tfidf()
            features['bm25']              = self.create_bm25()
            features['skipgrams']         = self.create_skipgrams()
            features['positional_ngrams'] = self.create_positional_ngrams()
            
            print("Created all feature types successfully")
            return features
            
        except Exception as e:
            raise

# Load Dataset

In [7]:
imdb_ratings_df = pd.read_csv(filepath_or_buffer = '../data/IMDB_Dataset.csv',
                              index_col          = None)

imdb_ratings_df.head(10)

Unnamed: 0,review,sentiment
0,One of the other reviewers has mentioned that ...,positive
1,A wonderful little production. <br /><br />The...,positive
2,I thought this was a wonderful way to spend ti...,positive
3,Basically there's a family where a little boy ...,negative
4,"Petter Mattei's ""Love in the Time of Money"" is...",positive
5,"Probably my all-time favorite movie, a story o...",positive
6,I sure would like to see a resurrection of a u...,positive
7,"This show was an amazing, fresh & innovative i...",negative
8,Encouraged by the positive comments about this...,negative
9,If you like original gut wrenching laughter yo...,positive


# Creating test Dataset

In [8]:
test_data = ["This movie is speedy enough with plot twists, but hard to understand the connection between plots.",
             "Seriously, this is the best movie I've ever watched! Everything was flawless!",
             "The storyline was okay, but the acting was just not up to the mark.",
             "A complete disaster of a movie. Don't waste your time.",
             "I can't believe how amazing this was. Totally worth it!",
             "The movie had its moments, but overall, it felt like something was missing.",
             "I absolutely loved the cinematography, but the acting was subpar.",
             "The film is an excellent example of how not to make a movie.",
             "It's hard to imagine how anyone could dislike this masterpiece!",
             "The trailer was better than the actual movie. Felt cheated.",
             "A rollercoaster of emotions! Highly recommend watching this.",
             "An average movie with nothing new to offer.",
             "The pacing was terrible, and the climax was predictable.",
             "Wow, just wow. This is how a movie should be made!",
             "A decent watch for a lazy weekend. Not groundbreaking, but enjoyable.",
             "The director has outdone themselves; what a phenomenal movie!",
             "More hype than substance. A complete letdown.",
             "Good visuals, decent music, but lacked a solid script.",
             "A masterpiece in every sense. This will stay with me forever.",
             "Mediocre at best. Not worth the ticket price.",
             "A fresh take on a tired genre. Highly recommend it!",
             "Overrated and boring. Nothing special about it.",
             "This is one of those movies you'll regret missing. A must-watch!",
             "Predictable plot, but the performances were top-notch.",
             "It's a bad movie if you're looking for entertainment.",
             "Can't believe I sat through the entire thing. A waste of time.",
             "Finally, a movie that gets it right. Loved every minute of it!",
             "A forgettable movie with no real impact.",
             "An extraordinary journey that left me speechless. Bravo!",
             "The humor was forced, and the dialogue was cringeworthy.",
             "A solid movie with a gripping narrative. Well done!",
             "The music was fantastic, but the rest of the movie was average.",
             "Ironic how they managed to make something so beautiful look so bland.",
             "An epic conclusion to a fantastic series. Couldn’t have been better!",
             "The movie tries too hard to be funny and fails miserably.",
             "A fresh and engaging story with relatable characters.",
             "All style, no substance. Disappointing.",
             "A breath of fresh air! One of the best movies this year.",
             "The plot was all over the place, but it was fun to watch.",
             "Couldn't make it through the first half. Painful to sit through.",
             "An unexpectedly beautiful film that touched my heart.",
             "Trying to understand why this movie exists is more entertaining than the movie itself.",
             "Every second of this movie was a blessing. Pure cinematic joy.",
             "The lead actor was the only saving grace in an otherwise dull film.",
             "A pretentious attempt at storytelling that falls flat.",
             "I didn’t expect much, but this movie surprised me in the best way.",
             "A series of poorly executed clichés masquerading as a story.",
             "This is not just a movie; it’s an experience. Brilliant!",
             "A slog of a movie with a laughably bad ending.",
            ]


sentiments = ["negative",   
              "positive",   
              "positive",   
              "negative",   
              "positive",   
              "positive",  
              "positive", 
              "negative",  
              "positive",   
              "negative",   
              "positive",   
              "positive",    
              "negative",   
              "positive",   
              "positive",    
              "positive",   
              "negative",   
              "negative",  
              "positive",   
              "negative",   
              "positive",   
              "negative",   
              "positive",  
              "positive",   
              "negative",   
              "negative",  
              "positive",   
              "negative",   
              "positive",   
              "negative",   
              "positive",   
              "positive",  
              "negative", 
              "positive",   
              "negative",   
              "positive",   
              "negative",   
              "positive",   
              "positive",  
              "negative",   
              "positive",   
              "negative",  
              "positive",  
              "positive",    
              "negative",   
              "positive",   
              "negative",  
              "positive",   
              "negative",
             ]


In [9]:
test_data_dict = {'review' : test_data,
                  'sentiment' : sentiments}
test_df        = pd.DataFrame(test_data_dict)
# test_df.to_csv(path_or_buf='../data/test_data', index=False)

In [10]:
test_df.head()

Unnamed: 0,review,sentiment
0,"This movie is speedy enough with plot twists, ...",negative
1,"Seriously, this is the best movie I've ever wa...",positive
2,"The storyline was okay, but the acting was jus...",positive
3,A complete disaster of a movie. Don't waste yo...,negative
4,I can't believe how amazing this was. Totally ...,positive


# Feature Selector

In [11]:
class TextFeatureSelector:
    """
    A class for implementing various feature selection techniques for text data
    
    Attributes:
    -----------
        X           { spmatrix } : Feature matrix
        
        y           { ndarray }  : Target labels

        feature_names { list }   : Names of features
        
        n_features    { int }    : Number of features to select
    """
    
    def __init__(self, X: spmatrix, y: np.ndarray, feature_names: list, n_features: int = None) -> None:
        """
        Initialize TextFeatureSelector with feature matrix and labels
        
        Arguments:
        ----------
            X             : Sparse feature matrix
            
            y             : Target labels
            
            feature_names : List of feature names
            
            n_features    : Number of features to select (default: 10% of features)
            
        Raises:
        -------
            ValueError    : If inputs are invalid or incompatible
        """
        if (X.shape[0] != len(y)):
            raise ValueError("Number of samples in X and y must match")
            
        if (X.shape[1] != len(feature_names)):
            raise ValueError("Number of features must match length of feature_names")
            
        self.X             = X
        self.y             = y
        self.feature_names = feature_names
        self.n_features    = n_features or int(0.1 * X.shape[1])  # Default to 10% of features
        
        
    def chi_square_selection(self) -> tuple:
        """
        Perform chi-square feature selection
        
        Returns:
        --------
            { tuple } : Tuple containing: - Selected feature indices
                                          - Chi-square scores
        """
        try:
            print("Performing chi-square feature selection...")
            
            # Scale features to non-negative for chi-square
            scaler            = MinMaxScaler()
            X_scaled          = scaler.fit_transform(self.X.toarray())
            
            # Apply chi-square selection
            selector          = SelectKBest(score_func = chi2, 
                                            k          = self.n_features)
            
            selector.fit(X_scaled, self.y)
            
            # Get selected features and scores
            selected_features = np.where(selector.get_support())[0]
            scores            = selector.scores_
            
            # Sort features by importance
            sorted_idx        = np.argsort(scores)[::-1]
            selected_features = sorted_idx[:self.n_features]
            
            print(f"Selected {len(selected_features)} features using chi-square")
            
            return selected_features, scores
            
        except Exception as e:
            raise
            
    def information_gain_selection(self) -> tuple:
        """
        Perform information gain feature selection
        
        Returns:
        --------
            { tuple } : Tuple containing: - Selected feature indices
                                          - Information gain scores
        """
        try:
            print("Performing information gain selection...")
            
            # Calculate mutual information scores
            selector          = SelectKBest(score_func = mutual_info_classif, 
                                            k          = self.n_features)
            selector.fit(self.X, self.y)
            
            # Get selected features and scores
            selected_features = np.where(selector.get_support())[0]
            scores            = selector.scores_
            
            # Sort features by importance
            sorted_idx        = np.argsort(scores)[::-1]
            selected_features = sorted_idx[:self.n_features]
            
            print(f"Selected {len(selected_features)} features using information gain")
            
            return selected_features, scores
            
        except Exception as e:
            raise
            
    def correlation_based_selection(self, threshold: float = 0.8) -> np.ndarray:
        """
        Perform correlation-based feature selection
        
        Arguments:
        ----------
            threshold { float } : Correlation threshold for feature removal
            
        Returns:
        --------
               { ndarray }      :  Selected feature indices
        """
        try:
            print("Performing correlation-based selection...")
            
            # Convert sparse matrix to dense for correlation calculation
            X_dense         = self.X.toarray()
            
            # Calculate correlation matrix
            corr_matrix     = np.corrcoef(X_dense.T)
            
            # Find highly correlated feature pairs
            high_corr_pairs = np.where(np.abs(corr_matrix) > threshold)
            
            # Keep track of features to remove
            to_remove       = set()
            
            # For each pair of highly correlated features
            for i, j in zip(*high_corr_pairs):
                if ((i != j) and (i not in to_remove) and (j not in to_remove)):
                    # Calculate correlation with target for both features
                    corr_i = mutual_info_score(X_dense[:, i], self.y)
                    corr_j = mutual_info_score(X_dense[:, j], self.y)
                    
                    # Remove feature with lower correlation to target
                    if (corr_i < corr_j):
                        to_remove.add(i)
                        
                    else:
                        to_remove.add(j)
            
            # Get selected features
            all_features      = set(range(self.X.shape[1]))
            selected_features = np.array(list(all_features - to_remove))
            
            # Select top k features if more than n_features remain
            if (len(selected_features) > self.n_features):
                # Calculate mutual information for remaining features
                mi_scores         = mutual_info_classif(self.X[:, selected_features], self.y)
                top_k_idx         = np.argsort(mi_scores)[::-1][:self.n_features]
                selected_features = selected_features[top_k_idx]
            
            print(f"Selected {len(selected_features)} features using correlation-based selection")
            
            return selected_features
            
        except Exception as e:
            raise
            
    def recursive_feature_elimination(self, estimator = None, cv: int = 5) -> tuple:
        """
        Perform Recursive Feature Elimination with cross-validation
        
        Arguments:
        ----------
            estimator  : Classifier to use (default: LogisticRegression)
            cv         : Number of cross-validation folds
            
        Returns:
        --------
            { tuple }  : Tuple containing: - Selected feature indices
                                           - Feature importance rankings
        """
        try:
            print("Performing recursive feature elimination...")
            
            # Use logistic regression if no estimator provided
            if (estimator is None):
                estimator = LogisticRegression(max_iter=1000)
            
            # Perform RFE with cross-validation
            selector = RFECV(estimator              = estimator,
                             min_features_to_select = self.n_features,
                             cv                     = cv,
                             n_jobs                 = -1)
            
            selector.fit(self.X, self.y)
            
            # Get selected features and rankings
            selected_features = np.where(selector.support_)[0]
            rankings          = selector.ranking_
            
            print(f"Selected {len(selected_features)} features using RFE")
            
            return selected_features, rankings
            
        except Exception as e:
            raise
           
        
    def forward_selection(self, estimator = None, cv: int = 5) -> np.ndarray:
        """
        Perform forward feature selection
        
        Arguments:
        ----------
            estimator : Classifier to use (default: LogisticRegression)
            
            cv        : Number of cross-validation folds
            
        Returns:
        --------
            Selected feature indices
        """
        try:
            print("Performing forward selection...")
            
            if (estimator is None):
                estimator = LogisticRegression(max_iter=1000)
            
            selected_features  = list()
            remaining_features = list(range(self.X.shape[1]))
            
            for i in tqdm(range(self.n_features)):
                best_score   = -np.inf
                best_feature = None
                
                # Try adding each remaining feature
                for feature in remaining_features:
                    current_features = selected_features + [feature]
                    X_subset         = self.X[:, current_features]
                    
                    # Calculate cross-validation score
                    scores = cross_val_score(estimator, 
                                             X_subset, 
                                             self.y,
                                             cv      = cv, 
                                             scoring = 'accuracy')
                    
                    avg_score = np.mean(scores)
                    
                    if (avg_score > best_score):
                        best_score   = avg_score
                        best_feature = feature
                
                if (best_feature is not None):
                    selected_features.append(best_feature)
                    remaining_features.remove(best_feature)
                
            print(f"Selected {len(selected_features)} features using forward selection")
            
            return np.array(selected_features)
            
        except Exception as e:
            raise
            
    def backward_elimination(self, estimator = None, cv: int = 5) -> np.ndarray:
        """
        Perform backward feature elimination
        
        Arguments:
        ----------
            estimator : Classifier to use (default: LogisticRegression)
            
            cv        : Number of cross-validation folds
            
        Returns:
        --------
            Selected feature indices
        """
        try:
            print("Performing backward elimination...")
            
            if (estimator is None):
                estimator = LogisticRegression(max_iter=1000)
            
            remaining_features = list(range(self.X.shape[1]))
            
            while len(remaining_features) > self.n_features:
                best_score    = -np.inf
                worst_feature = None
                
                # Try removing each feature
                for feature in remaining_features:
                    current_features = [f for f in remaining_features if f != feature]
                    X_subset         = self.X[:, current_features]
                    
                    # Calculate cross-validation score
                    scores = cross_val_score(estimator, 
                                             X_subset, 
                                             self.y,
                                             cv      = cv, 
                                             scoring = 'accuracy')
                    
                    avg_score = np.mean(scores)
                    
                    if (avg_score > best_score):
                        best_score    = avg_score
                        worst_feature = feature
                
                if worst_feature is not None:
                    remaining_features.remove(worst_feature)
            
            print(f"Selected {len(remaining_features)} features using backward elimination")
            return np.array(remaining_features)
            
        except Exception as e:
            raise
            

# Text Preprocessing

In [12]:
class TextPreprocessor:
    """
    A class for preprocessing text data through cleaning, tokenization, and normalization
    
    Attributes:
    -----------
        lemmatizer : WordNetLemmatizer instance for word lemmatization
        
        stop_words : Set of stopwords to be removed from text
    """ 
    def __init__(self):
        """
        Initialize the TextPreprocessor with required NLTK resources
        
        Raises:
        -------
            LookupError : If required NLTK resources cannot be downloaded
        """
        try:
            # Download required NLTK data
            nltk.download('punkt', quiet=True)
            nltk.download('stopwords', quiet=True)
            nltk.download('wordnet', quiet=True)
            nltk.download('punkt_tab', quiet=True)
            
            self.lemmatizer = WordNetLemmatizer()
            self.stop_words = set(stopwords.words('english'))
            
        except LookupError as e:
            raise
    
    def clean_text(self, text:str) -> str:
        """
        Clean and normalize input text by removing HTML tags, special characters,
        and applying text normalization techniques
        
        Arguments:
        ----------
            text { str }      : Input text to be cleaned
            
        Raises:
        -------
            ValueError        : If input text is None or empty
            
            TextCleaningError : If any error occurs at any step of text cleaning process
            
        Returns:
        --------
                { str }       : Cleaned and normalized text
        """
        if ((not text) or (not isinstance(text, str))):
            raise ValueError("Input text must be a non-empty string")
            
        try:
            # Remove HTML tags
            text   = re.sub('<[^>]*>', '', text)
            
            # Remove special characters and digits
            text   = re.sub('[^a-zA-Z\s]', '', text)
            
            # Convert to lowercase
            text   = text.lower()
            
            # Tokenization
            tokens = word_tokenize(text)
            
            # Remove stopwords and lemmatize
            tokens = [self.lemmatizer.lemmatize(token) for token in tokens if token not in self.stop_words]
            
            return ' '.join(tokens)
        
        except Exception as TextCleaningError:
            raise

In [13]:
# Initialize the preprocessor
preprocessor                  = TextPreprocessor()
tqdm.pandas()
# Add a new column to the original DataFrame to store the cleaned texts
imdb_ratings_df['clean_text'] = imdb_ratings_df['review'].progress_apply(preprocessor.clean_text)

100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 50000/50000 [00:50<00:00, 989.05it/s]


In [14]:
imdb_ratings_df.head()

Unnamed: 0,review,sentiment,clean_text
0,One of the other reviewers has mentioned that ...,positive,one reviewer mentioned watching oz episode you...
1,A wonderful little production. <br /><br />The...,positive,wonderful little production filming technique ...
2,I thought this was a wonderful way to spend ti...,positive,thought wonderful way spend time hot summer we...
3,Basically there's a family where a little boy ...,negative,basically there family little boy jake think t...
4,"Petter Mattei's ""Love in the Time of Money"" is...",positive,petter matteis love time money visually stunni...


In [15]:
word_level = TextFeatureEngineering(list(imdb_ratings_df['clean_text']), max_features=2000)

# Model selection

In [18]:
models = {
    'logistic'          : LogisticRegression(),
    'SVM_poly'          : SVC(kernel='poly'),
    'SVM_rbf'           : SVC(kernel='rbf'),
    'SVM_sig'           : SVC(kernel='sigmoid'),
    'Random_Forest'     : RandomForestClassifier(),
    'Multi_NaiveBayes'  : MultinomialNB(),
    'Gauss_NaiveBayes'  : GaussianNB(),
    'Gradient_Boost'    : GradientBoostingClassifier(),
    'AdaBoost'          : AdaBoostClassifier(),
    'LightGBM'          : LGBMClassifier(),
    'LogisticDT'        : DecisionTreeClassifier(),
    'MultiLayerPercep'  : MLPClassifier()
}

In [19]:
class ModelSelector:
    """
    A class for selecting the best model for sentiment analysis task
    """

    def __init__(self, X, y, feature_eng, vectorizers, selected_feature_indices,test_size=0.2, random_state=42, **kwargs):
        """
        Initialize the ModelSelector by splitting the data.

        Arguments:
        ----------
            X                        : Feature matrix (sparse matrix or ndarray)
            
            y                        : Target labels (array-like)
            
            feature_eng              : Instance of TextFeatureEngineering
            
            vectorizers              : Tuple of vectorizers used for feature transformation
            
            selected_feature_indices : Indices of selected features after feature selection
            
            test_size                : Proportion of data to use for testing (default: 0.2)
            
            random_state             : Random seed for reproducibility
        """
        self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(X, 
                                                                                y, 
                                                                                test_size    = test_size, 
                                                                                random_state = random_state)
        
        self.feature_eng                                     = feature_eng
        self.vectorizers                                     = vectorizers
        self.selected_feature_indices                        = selected_feature_indices

        
    def train_model(self, model_type:str = "logistic_regression", kernel=None, **kwargs):
        """
        Train a sentiment analysis model.

        Arguments:
        ----------
            model_type { str } : Type of model to train (e.g: "logistic_regression", "svm", "random_forest")
            
            kernel     { str } : Kernel type for SVM (e.g., "linear", "poly", "rbf", "sigmoid")
            
            kwargs             : Additional arguments for the model initialization

        Returns:
        --------
            Trained model
        """
        if (model_type == "logistic_regression"):
            model = LogisticRegression(max_iter = 1000, **kwargs)
            
        elif (model_type == "svm"):
            
            if (kernel is None):
                # Default kernel
                kernel = "rbf"  
                
            model = SVC(kernel = kernel, **kwargs)
            
        elif (model_type == "random_forest"):
            model = RandomForestClassifier(**kwargs)
            
        elif model_type == "naive_bayes":
            model = MultinomialNB(**kwargs)

        elif model_type == "lightgbm":
            model = LGBMClassifier(**kwargs)

        elif model_type == "logistic_model_tree":
            model = DecisionTreeClassifier(**kwargs)
        
        else:
            raise ValueError("Unsupported model_type. Choose from: 'logistic_regression', 'svm', 'random_forest'")

        print(f"Training {model_type}...")
        model.fit(self.X_train, self.y_train)
        
        return model

    def evaluate_model(self, model):
        """
        Evaluate a trained model on the test set

        Arguments:
        ----------
            model : Trained model

        Returns:
        --------
            Dictionary containing evaluation metrics
        """
        print("Evaluating model...")
        y_pred   = model.predict(self.X_test)

        accuracy = accuracy_score(self.y_test, y_pred)
        report   = classification_report(self.y_test, y_pred)
        cm       = confusion_matrix(self.y_test, y_pred)

        print(f"Accuracy: {accuracy:.4f}")
        print("Classification Report:")
        print(report)
        print("Confusion Matrix:")
        print(cm)

        return {"accuracy"              : accuracy,
                "classification_report" : report,
                "confusion_matrix"      : cm,
               }

    
    def test_on_unseen_data(self, model, unseen_texts):
        """
        Test the model on unseen data

        Arguments:
        ----------
            model         : Trained model
            
            unseen_texts  : List of unseen text data

        Returns:
        --------
            Predictions for the unseen data
        """
        print("Processing unseen data...")

        # Preprocess unseen data (implement preprocessing in the feature engineering class)
        binary_features          = self.vectorizers[0].transform(unseen_texts)
        tfidf_features           = self.vectorizers[1].transform(unseen_texts)
        bm25_features            = self.vectorizers[2].transform(unseen_texts)

        # Combine features
        unseen_combined_features = hstack([binary_features, tfidf_features, bm25_features])

        # Select features using the indices chosen during feature selection
        unseen_selected_features = unseen_combined_features[:, self.selected_feature_indices]

        # Predict sentiments
        predictions              = model.predict(unseen_selected_features)

        # Print predictions
        print("Predictions on Unseen Data:")
        for text, pred in zip(unseen_texts, predictions):
            print(f"Text: {text}\nPredicted Sentiment: {pred}\n")

        return predictions


In [20]:
vectorizer, matrix = word_level.create_tfidf()

Creating TF-IDF features...
Created 2000 TF-IDF features


In [21]:
vectorizer.get_feature_names_out()

array(['ability', 'able', 'absolute', ..., 'youve', 'zero', 'zombie'],
      dtype=object)

In [22]:
new_selector = TextFeatureSelector(X             = matrix,
                                   y             = imdb_ratings_df['sentiment'].values,
                                   feature_names = (list(vectorizer.get_feature_names_out())),
                                   n_features    = 1000,
                                  )

In [23]:
chi2_features, scores = new_selector.chi_square_selection()

Performing chi-square feature selection...
Selected 1000 features using chi-square


In [24]:
fnames = vectorizer.get_feature_names_out()[chi2_features]

In [25]:
for i in range(len(fnames)):
    fnames[i] = fnames[i].replace(' ', '_')

In [26]:
fnames

array(['worst', 'waste', 'bad', 'excellent', 'waste_time', 'one_worst',
       'great', 'worst_movie', 'awful', 'worse', 'terrible', 'nothing',
       'wonderful', 'one_best', 'poor', 'supposed', 'minute', 'best',
       'worst_film', 'boring', 'even', 'loved', 'bad_movie', 'crap',
       'stupid', 'amazing', 'money', 'perfect', 'worst_movie_ever',
       'favorite', 'horrible', 'pointless', 'highly', 'pathetic',
       'unless', 'acting', 'wasted', 'love', 'bad_acting', 'superb',
       'script', 'redeeming', 'lame', 'ridiculous', 'poorly', 'look_like',
       'really_bad', 'plot', 'performance', 'movie_bad', 'beautiful',
       'wonderfully', 'highly_recommend', 'today', 'save', 'cheap',
       'annoying', 'must_see', 'excuse', 'laughable', 'couldnt',
       'touching', 'also', 'enjoyed', 'brilliant', 'fantastic', 'avoid',
       'dull', 'badly', 'garbage', 'bother', 'life', 'powerful', 'could',
       'fails', 'terrific', 'oh', 'attempt', 'wooden', 'predictable',
       'least', 'do

In [27]:
class TextVectorizer:
    def __init__(self, feature_names=None, weight_factor=2.0, vector_size=100, window=5, min_count=1, epochs=10):
        """
        Initialize the TextVectorizer class.

        :param feature_names: List of feature names (tokens) to be weighted more.
        :param weight_factor: Weight multiplier for feature names.
        :param vector_size: Dimensionality of word vectors.
        :param window: Maximum distance between the current and predicted word in CBOW.
        :param min_count: Ignores all words with total frequency lower than this.
        :param epochs: Number of iterations (epochs) over the corpus.
        """
        self.feature_names = set(feature_names) if feature_names else set()
        self.weight_factor = weight_factor
        self.vector_size = vector_size
        self.window = window
        self.min_count = min_count
        self.epochs = epochs
        self.model = None


    def train(self, corpus):
        """
        Train the CBOW model on the given corpus.

        :param corpus: List of tokenized texts (list of lists of strings).
        """
        sentences = [text.split() for text in corpus]  # Tokenized text
        bigram = Phrases(sentences, min_count=5, threshold=10)
        trigram = Phrases(bigram[sentences], threshold=10)
        
        bigram_phraser = Phraser(bigram)
        trigram_phraser = Phraser(trigram)
        
        # Transform sentences to include phrases
        processed_corpus = [trigram_phraser[bigram_phraser[sentence]] for sentence in sentences]

        print("Vectorizer training...")

        self.model = Word2Vec(
            sentences=processed_corpus,
            vector_size=self.vector_size,
            window=self.window,
            min_count=self.min_count,
            sg=0,  # CBOW model
        )
        
        self.model.train(corpus, total_examples=len(corpus), epochs=self.epochs)
        print("Vectorizer Training Complete")

    def _get_weighted_vector(self, word):
        """
        Get the weighted vector for a given word.

        :param word: Word for which the vector is to be retrieved.
        :return: Weighted vector for the word.
        """
        if word in self.model.wv:
            vector = self.model.wv[word]
            if word in self.feature_names:
                vector *= self.weight_factor
            return vector
        else:
            return np.zeros(self.vector_size)

    def text_to_vector(self, text):
        """
        Convert a text into its vector representation.

        :param text: List of words (tokens) in the text.
        :return: Vector representation of the text.
        """
        vectors = [self._get_weighted_vector(word) for word in text]
        if vectors:
            return np.mean(vectors, axis=0)  # Average the vectors
        else:
            return np.zeros(self.vector_size)

    def transform(self, texts):
        """
        Transform a list of texts into their vector representations.

        :param texts: List of tokenized texts (list of lists of strings).
        :return: List of vector representations of the texts.
        """
        return [self.text_to_vector(text) for text in texts]



In [28]:
word_vec = TextVectorizer(feature_names=list(fnames), vector_size=25, min_count=3, epochs=30)

In [29]:
word_vec.train(list(imdb_ratings_df['clean_text']))

vectorized_text = word_vec.transform(list(imdb_ratings_df['clean_text']))

Vectorizer training...
Vectorizer Training Complete
