In [3]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
import tensorflow as tf
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Input, Embedding, LSTM, Conv1D, MaxPooling1D, GlobalMaxPooling1D
from tensorflow.keras.layers import Dense, Dropout, Bidirectional, BatchNormalization
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.optimizers import Adam
import re
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer

In [None]:
# Download necessary NLTK data
nltk.download('punkt', quiet=True)
nltk.download('stopwords', quiet=True)
nltk.download('wordnet', quiet=True)

# Set random seed for reproducibility
np.random.seed(42)
tf.random.set_seed(42)

class SentimentAnalysisModel:
    def __init__(self, max_features=10000, max_sequence_length=200, embedding_dim=100):
        self.max_features = max_features  # Maximum number of words in the vocabulary
        self.max_sequence_length = max_sequence_length  # Maximum length of each text sequence
        self.embedding_dim = embedding_dim  # Dimension of word embeddings
        self.tokenizer = None
        self.model = None
        self.history = None
        self.lemmatizer = WordNetLemmatizer()
        self.stop_words = set(stopwords.words('english'))
    
    def preprocess_text(self, text):
        """Clean and preprocess text data"""
        # Convert to lowercase
        text = text.lower()
        
        # Remove HTML tags
        text = re.sub(r'<.*?>', '', text)
        
        # Remove URLs
        text = re.sub(r'http\S+|www\S+|https\S+', '', text)
        
        # Remove non-alphabetic characters
        text = re.sub(r'[^a-zA-Z\s]', '', text)
        
        # Tokenize
        tokens = word_tokenize(text)
        
        # Remove stopwords and lemmatize
        tokens = [self.lemmatizer.lemmatize(word) for word in tokens if word not in self.stop_words]
        
        return ' '.join(tokens)
    
    def prepare_data(self, texts, labels, test_size=0.2, val_size=0.2):
        """Prepare and tokenize data for training"""
        # Preprocess all texts
        processed_texts = [self.preprocess_text(text) for text in texts]
        
        # Split data
        X_train_val, X_test, y_train_val, y_test = train_test_split(
            processed_texts, labels, test_size=test_size, random_state=42
        )
        
        X_train, X_val, y_train, y_val = train_test_split(
            X_train_val, y_train_val, test_size=val_size/(1-test_size), random_state=42
        )
        
        # Create and fit tokenizer
        self.tokenizer = Tokenizer(num_words=self.max_features, oov_token="<OOV>")
        self.tokenizer.fit_on_texts(X_train)
        
        # Convert texts to sequences
        X_train_seq = self.tokenizer.texts_to_sequences(X_train)
        X_val_seq = self.tokenizer.texts_to_sequences(X_val)
        X_test_seq = self.tokenizer.texts_to_sequences(X_test)
        
        # Pad sequences
        X_train_pad = pad_sequences(X_train_seq, maxlen=self.max_sequence_length, padding='post')
        X_val_pad = pad_sequences(X_val_seq, maxlen=self.max_sequence_length, padding='post')
        X_test_pad = pad_sequences(X_test_seq, maxlen=self.max_sequence_length, padding='post')
        
        return (X_train_pad, y_train), (X_val_pad, y_val), (X_test_pad, y_test)
    
    def build_lstm_model(self, output_dim=1, is_binary=True):
        """Build an LSTM-based model for sentiment analysis"""
        model = Sequential([
            Embedding(self.max_features, self.embedding_dim, input_length=self.max_sequence_length),
            Bidirectional(LSTM(128, return_sequences=True)),
            Bidirectional(LSTM(64, dropout=0.2)),
            Dense(64, activation='relu'),
            BatchNormalization(),
            Dropout(0.5),
            Dense(output_dim, activation='sigmoid' if is_binary else 'softmax')
        ])
        
        model.compile(
            optimizer=Adam(learning_rate=0.001),
            loss='binary_crossentropy' if is_binary else 'categorical_crossentropy',
            metrics=['accuracy']
        )
        
        self.model = model
        return model
    
    def build_cnn_model(self, output_dim=1, is_binary=True):
        """Build a CNN-based model for sentiment analysis"""
        model = Sequential([
            Embedding(self.max_features, self.embedding_dim, input_length=self.max_sequence_length),
            Conv1D(128, 5, activation='relu'),
            MaxPooling1D(5),
            Conv1D(128, 5, activation='relu'),
            MaxPooling1D(5),
            GlobalMaxPooling1D(),
            Dense(128, activation='relu'),
            BatchNormalization(),
            Dropout(0.5),
            Dense(output_dim, activation='sigmoid' if is_binary else 'softmax')
        ])
        
        model.compile(
            optimizer=Adam(learning_rate=0.001),
            loss='binary_crossentropy' if is_binary else 'categorical_crossentropy',
            metrics=['accuracy']
        )
        
        self.model = model
        return model
    
    def train(self, train_data, val_data, epochs=10, batch_size=32):
        """Train the model"""
        X_train, y_train = train_data
        X_val, y_val = val_data
        
        # Setup callbacks
        callbacks = [
            EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True),
            ModelCheckpoint('best_model.h5', save_best_only=True, monitor='val_loss')
        ]
        
        # Train model
        self.history = self.model.fit(
            X_train, y_train,
            epochs=epochs,
            batch_size=batch_size,
            validation_data=(X_val, y_val),
            callbacks=callbacks,
            verbose=1
        )
        
        return self.history
    
    def evaluate(self, test_data):
        """Evaluate model on test data"""
        X_test, y_test = test_data
        loss, accuracy = self.model.evaluate(X_test, y_test, verbose=0)
        
        # Make predictions
        y_pred_prob = self.model.predict(X_test)
        
        # For binary classification
        if y_pred_prob.shape[1] == 1:
            y_pred = (y_pred_prob > 0.5).astype(int).flatten()
            report = classification_report(y_test, y_pred)
            cm = confusion_matrix(y_test, y_pred)
        else:  # For multi-class
            y_pred = np.argmax(y_pred_prob, axis=1)
            y_test_classes = np.argmax(y_test, axis=1) if len(y_test.shape) > 1 else y_test
            report = classification_report(y_test_classes, y_pred)
            cm = confusion_matrix(y_test_classes, y_pred)
        
        return {
            'loss': loss,
            'accuracy': accuracy,
            'classification_report': report,
            'confusion_matrix': cm
        }
    
    def predict(self, texts):
        """Make predictions on new text data"""
        # Preprocess texts
        processed_texts = [self.preprocess_text(text) for text in texts]
        
        # Convert to sequences and pad
        sequences = self.tokenizer.texts_to_sequences(processed_texts)
        padded_sequences = pad_sequences(sequences, maxlen=self.max_sequence_length, padding='post')
        
        # Make predictions
        predictions = self.model.predict(padded_sequences)
        
        return predictions
    
    def plot_training_history(self):
        """Plot training history"""
        plt.figure(figsize=(12, 5))
        
        # Plot accuracy
        plt.subplot(1, 2, 1)
        plt.plot(self.history.history['accuracy'], label='Train Accuracy')
        plt.plot(self.history.history['val_accuracy'], label='Validation Accuracy')
        plt.title('Model Accuracy')
        plt.xlabel('Epoch')
        plt.ylabel('Accuracy')
        plt.legend()
        
        # Plot loss
        plt.subplot(1, 2, 2)
        plt.plot(self.history.history['loss'], label='Train Loss')
        plt.plot(self.history.history['val_loss'], label='Validation Loss')
        plt.title('Model Loss')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.legend()
        
        plt.tight_layout()
        plt.show()
    
    def plot_confusion_matrix(self, cm, classes=None):
        """Plot confusion matrix"""
        plt.figure(figsize=(8, 6))
        sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
                    xticklabels=classes, yticklabels=classes)
        plt.title('Confusion Matrix')
        plt.ylabel('True Label')
        plt.xlabel('Predicted Label')
        plt.tight_layout()
        plt.show()
    
    def save_model(self, model_path):
        """Save model and tokenizer"""
        # Save model
        self.model.save(model_path)
        
        # Save tokenizer
        import pickle
        with open(f"{os.path.splitext(model_path)[0]}_tokenizer.pickle", 'wb') as handle:
            pickle.dump(self.tokenizer, handle, protocol=pickle.HIGHEST_PROTOCOL)
    
    @classmethod
    def load_model(cls, model_path):
        """Load a saved model and tokenizer"""
        # Create new instance
        instance = cls()
        
        # Load model
        instance.model = tf.keras.models.load_model(model_path)
        
        # Load tokenizer
        import pickle
        with open(f"{os.path.splitext(model_path)[0]}_tokenizer.pickle", 'rb') as handle:
            instance.tokenizer = pickle.load(handle)
        
        return instance


# Example usage for binary sentiment classification (IMDb dataset)
def run_binary_sentiment_example():
    # Load IMDb dataset
    from tensorflow.keras.datasets import imdb
    
    # Load data
    (X_train, y_train), (X_test, y_test) = imdb.load_data(num_words=10000)
    
    # Convert back to text (IMDb dataset loads as sequences already)
    word_index = imdb.get_word_index()
    # Create a reverse mapping
    reverse_word_index = {value: key for key, value in word_index.items()}
    
    def sequence_to_text(sequence):
        # Add 3 because 0, 1, and 2 are reserved indices
        return ' '.join([reverse_word_index.get(i - 3, '?') for i in sequence if i > 3])
    
    X_train_texts = [sequence_to_text(seq) for seq in X_train]
    X_test_texts = [sequence_to_text(seq) for seq in X_test]
    
    # Create and train LSTM model
    sentiment_model = SentimentAnalysisModel(max_features=10000)
    (train_data, val_data, test_data) = sentiment_model.prepare_data(
        X_train_texts + X_test_texts[:1000],  # Using part of test set to have more training data
        np.concatenate([y_train, y_test[:1000]])
    )
    
    # Build LSTM model
    sentiment_model.build_lstm_model()
    print(sentiment_model.model.summary())
    
    # Train model
    history = sentiment_model.train(train_data, val_data, epochs=5)
    
    # Evaluate
    results = sentiment_model.evaluate(test_data)
    print("Evaluation Results:")
    print(f"Loss: {results['loss']:.4f}")
    print(f"Accuracy: {results['accuracy']:.4f}")
    print("\nClassification Report:")
    print(results['classification_report'])
    
    # Plot results
    sentiment_model.plot_training_history()
    sentiment_model.plot_confusion_matrix(results['confusion_matrix'], ['Negative', 'Positive'])
    
    # Save model
    sentiment_model.save_model('imdb_lstm_model.h5')
    
    # Try CNN model for comparison
    cnn_model = SentimentAnalysisModel(max_features=10000)
    # Reuse the prepared data
    cnn_model.tokenizer = sentiment_model.tokenizer
    cnn_model.build_cnn_model()
    print(cnn_model.model.summary())
    
    # Train CNN model
    cnn_history = cnn_model.train(train_data, val_data, epochs=5)
    
    # Evaluate CNN
    cnn_results = cnn_model.evaluate(test_data)
    print("\nCNN Model Evaluation Results:")
    print(f"Loss: {cnn_results['loss']:.4f}")
    print(f"Accuracy: {cnn_results['accuracy']:.4f}")
    print("\nClassification Report:")
    print(cnn_results['classification_report'])
    
    # Plot CNN results
    cnn_model.plot_training_history()
    cnn_model.plot_confusion_matrix(cnn_results['confusion_matrix'], ['Negative', 'Positive'])
    
    # Save CNN model
    cnn_model.save_model('imdb_cnn_model.h5')


# Example usage for multi-class emotion detection (assuming GoEmotions dataset)
def run_multiclass_emotion_example():
    # This is a placeholder - you would need to download the GoEmotions dataset
    # https://github.com/google-research/google-research/tree/master/goemotions
    
    # Example function to load GoEmotions dataset - adapt this to actual data format
    def load_goemotions(file_path):
        df = pd.read_csv(file_path)
        # Assuming format with text column and multiple emotion columns
        texts = df['text'].values
        
        # Extract emotion columns, assuming one-hot encoded format
        emotion_columns = ['joy', 'sadness', 'anger', 'fear', 'surprise', 'disgust']
        emotions = df[emotion_columns].values
        
        return texts, emotions
    
    try:
        # Try to load the dataset - replace with actual path
        texts, emotions = load_goemotions('goemotions.csv')
        
        # Create and train model for multi-class classification
        emotion_model = SentimentAnalysisModel(max_features=15000)
        (train_data, val_data, test_data) = emotion_model.prepare_data(texts, emotions)
        
        # Number of emotion classes
        num_emotions = emotions.shape[1]
        
        # Build LSTM model for multi-class
        emotion_model.build_lstm_model(output_dim=num_emotions, is_binary=False)
        print(emotion_model.model.summary())
        
        # Train model
        history = emotion_model.train(train_data, val_data, epochs=10)
        
        # Evaluate
        results = emotion_model.evaluate(test_data)
        print("Emotion Detection Results:")
        print(f"Loss: {results['loss']:.4f}")
        print(f"Accuracy: {results['accuracy']:.4f}")
        print("\nClassification Report:")
        print(results['classification_report'])
        
        # Plot results
        emotion_model.plot_training_history()
        emotion_labels = ['joy', 'sadness', 'anger', 'fear', 'surprise', 'disgust']
        emotion_model.plot_confusion_matrix(results['confusion_matrix'], emotion_labels)
        
        # Save model
        emotion_model.save_model('emotion_lstm_model.h5')
        
    except FileNotFoundError:
        print("GoEmotions dataset not found. Please download it and specify the correct path.")
        print("You can find the dataset at: https://github.com/google-research/google-research/tree/master/goemotions")


if __name__ == "__main__":
    print("Running binary sentiment analysis example (IMDb)...")
    run_binary_sentiment_example()
    
    print("\nTo run the multi-class emotion example, first download the GoEmotions dataset.")
    print("Then uncomment the call to run_multiclass_emotion_example() in the main block.")
    # run_multiclass_emotion_example()