In [None]:
!pip install tensorflow nltk seaborn nlpaug

In [None]:
import nltk
nltk.download('stopwords')
nltk.download('wordnet')
nltk.download('punkt')

import numpy as np
import pandas as pd
import re
import string
import matplotlib.pyplot as plt
import seaborn as sns
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Embedding, LSTM, Dense, Dropout, GlobalMaxPool1D, Bidirectional, SpatialDropout1D
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, LearningRateScheduler
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score, f1_score, precision_score, recall_score
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from sklearn.utils.class_weight import compute_class_weight
from tensorflow.keras.layers import Layer, Conv1D, MaxPooling1D, LayerNormalization
from tensorflow.keras.optimizers import AdamW
import tensorflow.keras.backend as K

In [None]:
from google.colab import files
uploaded = files.upload()

df = pd.read_csv("all-data.csv", header=None, encoding="ISO-8859-1")
df.columns = ["label", "text"]
df.head()

In [None]:
lemmatizer = WordNetLemmatizer()
stop_words = set(stopwords.words('english'))

def preprocess_text(text):
    text = text.lower()
    # Remove URLs
    text = re.sub(r'https?://\S+|www\.\S+', '', text)
    # Keep meaningful punctuation like ! and ?
    text = re.sub(r'[#$%&\()*+,-./:<=>@\\^_`{|}~\[\]]', ' ', text)
    # Handle multiple exclamation/question marks
    text = re.sub(r'!+', ' ! ', text)
    text = re.sub(r'\?+', ' ? ', text)
    # Remove numbers
    text = re.sub(r'\d+', '', text)
    # Tokenize, lemmatize and remove stopwords
    words = text.split()
    words = [lemmatizer.lemmatize(word) for word in words if word not in stop_words]
    return ' '.join(words)

In [None]:
df["clean_text"] = df["text"].astype(str).apply(preprocess_text)

# Check data distribution
print(df["label"].value_counts())

# Encode labels
label_mapping = {"negative": 0, "neutral": 1, "positive": 2}
df["label"] = df["label"].astype(str).str.strip().map(label_mapping).fillna(1).astype(int)

In [None]:
max_features = 10000  # Increased vocabulary size
maxlen = 100  # Increased sequence length

tokenizer = Tokenizer(num_words=max_features, oov_token="<OOV>")
tokenizer.fit_on_texts(df["clean_text"])
sequences = tokenizer.texts_to_sequences(df["clean_text"])
X = pad_sequences(sequences, maxlen=maxlen, padding='post', truncating='post')
y = df["label"].values

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, stratify=y, random_state=42)

In [None]:
!wget https://nlp.stanford.edu/data/glove.6B.zip
!unzip glove.6B.zip

In [None]:
embedding_dim = 300  # Increased embedding dimension
embedding_index = {}

with open("glove.6B.300d.txt", encoding="utf-8") as f:
    for line in f:
        values = line.split()
        word = values[0]
        coefs = np.asarray(values[1:], dtype="float32")
        embedding_index[word] = coefs

embedding_matrix = np.zeros((max_features, embedding_dim))

for word, i in tokenizer.word_index.items():
    if i < max_features:
        embedding_vector = embedding_index.get(word)
        if embedding_vector is not None:
            embedding_matrix[i] = embedding_vector

In [None]:
class Attention(Layer):
    def __init__(self, **kwargs):
        super(Attention, self).__init__(**kwargs)

    def build(self, input_shape):
        self.W = self.add_weight(name="att_weight", shape=(input_shape[-1], 1), initializer="normal")
        self.b = self.add_weight(name="att_bias", shape=(1,), initializer="zeros")
        super(Attention, self).build(input_shape)

    def call(self, x):
        e = K.tanh(K.dot(x, self.W) + self.b)
        a = K.softmax(e, axis=1)
        output = x * a
        return K.sum(output, axis=1)

    def compute_output_shape(self, input_shape):
        return (input_shape[0], input_shape[-1])

In [None]:
from nlpaug.augmenter.word import SynonymAug

def augment_data(texts, labels, augment_percentage=0.3):
    aug = SynonymAug(aug_src='wordnet')
    augmented_texts = []
    augmented_labels = []

    for i, (text, label) in enumerate(zip(texts, labels)):
        if np.random.random() < augment_percentage:
            try:
                aug_text = aug.augment(text)[0]
                augmented_texts.append(aug_text)
                augmented_labels.append(label)
            except:
                continue

    return augmented_texts, augmented_labels

In [None]:
train_texts = [tokenizer.sequences_to_texts([seq])[0] for seq in X_train]
aug_texts, aug_labels = augment_data(train_texts, y_train)

# Tokenize and pad augmented data
aug_sequences = tokenizer.texts_to_sequences(aug_texts)
X_aug = pad_sequences(aug_sequences, maxlen=maxlen, padding='post', truncating='post')

# Combine with original training data
X_train_combined = np.vstack([X_train, X_aug])
y_train_combined = np.concatenate([y_train, aug_labels])

In [None]:
class_weights = compute_class_weight('balanced', classes=np.unique(y_train_combined), y=y_train_combined)
class_weight_dict = dict(enumerate(class_weights))
print("Class weights:", class_weight_dict)

In [None]:
def lr_schedule(epoch):
    lr = 3e-4
    if epoch > 15:
        lr *= 0.1
    elif epoch > 8:
        lr *= 0.5
    return lr

In [None]:
model = Sequential([
    Embedding(input_dim=max_features, output_dim=embedding_dim, weights=[embedding_matrix], trainable=False),
    SpatialDropout1D(0.2),
    Conv1D(128, kernel_size=3, padding='same', activation="relu"),
    MaxPooling1D(pool_size=2),
    Bidirectional(LSTM(128, return_sequences=True)),
    Attention(),
    LayerNormalization(),
    Dropout(0.3),
    Dense(64, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.001)),
    Dropout(0.2),
    Dense(3, activation='softmax')
])

In [None]:
model.compile(
    loss='sparse_categorical_crossentropy',
    optimizer=AdamW(learning_rate=3e-4, weight_decay=1e-5),
    metrics=['accuracy']
)

model.summary()

In [None]:
early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.3, patience=3, verbose=1)
lr_scheduler = LearningRateScheduler(lr_schedule)

In [None]:
history = model.fit(
    X_train_combined,
    y_train_combined,
    epochs=30,
    batch_size=32,
    validation_data=(X_test, y_test),
    callbacks=[early_stopping, reduce_lr, lr_scheduler],
    class_weight=class_weight_dict
)

In [None]:
def evaluate_model(model, X_test, y_test):
    y_pred_proba = model.predict(X_test)
    y_pred = np.argmax(y_pred_proba, axis=1)

    accuracy = accuracy_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred, average='weighted')
    precision = precision_score(y_test, y_pred, average='weighted')
    recall = recall_score(y_test, y_pred, average='weighted')

    print(f"Accuracy: {accuracy:.4f}")
    print(f"F1 Score: {f1:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")

    print("\nClassification Report:")
    print(classification_report(y_test, y_pred, target_names=["negative", "neutral", "positive"]))

    return y_pred

def plot_confusion_matrix(y_true, y_pred):
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=["negative", "neutral", "positive"], yticklabels=["negative", "neutral", "positive"])
    plt.xlabel("Predicted")
    plt.ylabel("Actual")
    plt.title("Confusion Matrix")
    plt.show()

In [None]:
y_pred = evaluate_model(model, X_test, y_test)
plot_confusion_matrix(y_test, y_pred)

# Cell 22: Plot training history
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Model Accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')

plt.subplot(1, 2, 2)
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model Loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')
plt.tight_layout()
plt.show()