In [None]:
import pandas as pd
import numpy as np
import nltk
import jieba
from gensim.models import Word2Vec
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.layers import Input, Embedding, MultiHeadAttention, Conv1D, GlobalMaxPooling1D, Dense, Concatenate
from tensorflow.keras.models import Model
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
import seaborn as sns
from sklearn.metrics import roc_auc_score, roc_curve
from wordcloud import WordCloud
import matplotlib.pyplot as plt

In [None]:
# Download NLTK stopwords
nltk.download('stopwords')
stop_words = set(nltk.corpus.stopwords.words('english'))

df_train = pd.read_csv("./dataset/train.csv")
df_test = pd.read_csv("./dataset/test.csv")

In [None]:
# Basic text cleaning and tokenization with Jieba
def preprocess_text(text):
    text = str(text)
    # Tokenization with Jieba
    tokens = list(jieba.cut(text))
    # Remove stopwords
    tokens = [word for word in tokens if word not in stop_words]
    return tokens

In [None]:
# Replace NaN values with an empty string
df_train['text'] = df_train['text'].fillna('')
df_test['text'] = df_test['text'].fillna('')

# Apply preprocessing
df_train['tokens'] = df_train['text'].apply(preprocess_text)
df_test['tokens'] = df_test['text'].apply(preprocess_text)

In [None]:
# Concatenate tokens for Word2Vec training
all_tokens = pd.concat([df_train['tokens'], df_test['tokens']], axis=0)

# Train a Word2Vec model or load a pre-trained model
model_w2v = Word2Vec(sentences=all_tokens, vector_size=100, window=5, min_count=1, workers=4)

In [None]:
# Function to convert tokens to vectors, using zero vector for unknown words
def tokens_to_vectors(tokens, model):
    vectors = [model.wv[word] if word in model.wv else np.zeros((model.vector_size,)) for word in tokens]
    return np.array(vectors)

# Convert train and test tokens to vectors
df_train['vectors'] = df_train['tokens'].apply(lambda tokens: tokens_to_vectors(tokens, model_w2v))
df_test['vectors'] = df_test['tokens'].apply(lambda tokens: tokens_to_vectors(tokens, model_w2v))

# Find the maximum sequence length to use for padding
max_seq_length = max(df_train['vectors'].apply(len).max(), df_test['vectors'].apply(len).max())

# Pad sequences
X_train_padded = pad_sequences(df_train['vectors'].tolist(), maxlen=max_seq_length, dtype='float32', padding='post')
X_test_padded = pad_sequences(df_test['vectors'].tolist(), maxlen=max_seq_length, dtype='float32', padding='post')

In [None]:
# Convert sentiment labels to numerical values
label_mapping = {'negative': 0, 'positive': 1, 'neutral': 2}
df_train['label'] = df_train['sentiment'].map(label_mapping)
df_test['label'] = df_test['sentiment'].map(label_mapping)

y_train = df_train['label'].values
y_test = df_test['label'].values

In [None]:
# Split dataset into training, validation, and test sets
X_train_padded, X_temp, y_train, y_temp = train_test_split(X_train_padded, y_train, test_size=0.4, random_state=42)
X_val_padded, X_test_padded, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)

In [None]:
from tensorflow.keras.layers import MultiHeadAttention
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense, Conv1D, GlobalMaxPooling1D, Concatenate, Dropout, BatchNormalization
from tensorflow.keras.regularizers import l2
from tensorflow.keras.optimizers import Adam

In [None]:
embedding_dim = model_w2v.vector_size
input_layer = Input(shape=(max_seq_length, embedding_dim))
attention_out = MultiHeadAttention(num_heads=2, key_dim=embedding_dim, value_dim=embedding_dim)(query=input_layer, key=input_layer, value=input_layer)

# TextCNN
conv1 = Conv1D(filters=100, kernel_size=3, activation='relu')(attention_out)
pool1 = GlobalMaxPooling1D()(conv1)

conv2 = Conv1D(filters=100, kernel_size=4, activation='relu')(attention_out)
pool2 = GlobalMaxPooling1D()(conv2)

conv3 = Conv1D(filters=100, kernel_size=5, activation='relu')(attention_out)
pool3 = GlobalMaxPooling1D()(conv3)

concatenated = Concatenate()([pool1, pool2, pool3])

dropout_layer = Dropout(0.5)(concatenated)

# Fully connected layer with batch normalization
dense = Dense(units=256, activation='relu', kernel_regularizer=l2(0.01))(dropout_layer)
batch_norm = BatchNormalization()(dense)

# Output layer
output_layer = Dense(units=3, activation='softmax')(batch_norm)

# Define and compile the model
model = Model(inputs=input_layer, outputs=output_layer)
optimizer = Adam(learning_rate=0.001)
model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'])

# Train the model
history = model.fit(X_train_padded, y_train, epochs=50, batch_size=32, validation_data=(X_val_padded, y_val))

In [None]:
plt.figure(figsize=(12, 5))

# Accuracy curves
plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'], label='Training Accuracy', color='#6c8ebf')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy', color='#82B366')
plt.title('Training and Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend(['Train', 'Validation'], loc='lower right')

# Loss curves
plt.subplot(1, 2, 2)
plt.plot(history.history['loss'], label='Training Loss', color='#6c8ebf')
plt.plot(history.history['val_loss'], label='Validation Loss', color='#82B366')
plt.title('Training and Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend(['Train', 'Validation'], loc='upper right')
plt.tight_layout()
plt.show()

In [None]:
from sklearn.preprocessing import LabelBinarizer
from itertools import cycle
from sklearn.metrics import roc_auc_score, roc_curve, auc

if len(y_test.shape) == 1:
    label_binarizer = LabelBinarizer()
    y_test = label_binarizer.fit_transform(y_test)

label_map = {0: 'negative', 1: 'positive', 2: 'neutral'}

y_prob = model.predict(X_test_padded)
roc_auc_ovr = roc_auc_score(y_test, y_prob, multi_class='ovr')

print(f"ROC-AUC Score: {roc_auc_ovr:0.2f}")

n_classes = y_test.shape[1]
fpr = dict()
tpr = dict()
roc_auc = dict()
for i in range(n_classes):
    fpr[i], tpr[i], _ = roc_curve(y_test[:, i], y_prob[:, i])
    roc_auc[i] = auc(fpr[i], tpr[i])

plt.figure(figsize=(8, 6))

colors = cycle(['#6c8ebf', '#B3B3B3', '#82B366'])
for i, color in zip(range(n_classes), colors):
    plt.plot(fpr[i], tpr[i], color=color, lw=2,
             label='{0} (AUC = {1:0.2f})'.format(label_map[i], roc_auc[i]))

plt.plot([0, 1], [0, 1], 'k--', lw=2)
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC-AUC curve')
plt.legend(loc="best")
plt.grid(True)
plt.show()

In [None]:
y_pred = np.argmax(model.predict(X_test_padded), axis=1)

conf_matrix = confusion_matrix(np.argmax(y_test, axis=1), y_pred)

plt.figure(figsize=(8, 6))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Greens', 
            xticklabels=[label_map[i] for i in range(n_classes)], 
            yticklabels=[label_map[i] for i in range(n_classes)])
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.title('Confusion Matrix')
plt.show()

In [None]:
from sklearn.preprocessing import LabelBinarizer
from itertools import cycle
from sklearn.metrics import precision_recall_curve, average_precision_score

precision = dict()
recall = dict()
average_precision = dict()
for i in range(n_classes):
    precision[i], recall[i], _ = precision_recall_curve(y_test[:, i], y_prob[:, i])
    average_precision[i] = average_precision_score(y_test[:, i], y_prob[:, i])

plt.figure(figsize=(8, 6))

colors = cycle(['#6c8ebf', '#B3B3B3', '#82B366'])
for i, color in zip(range(n_classes), colors):
    plt.plot(recall[i], precision[i], color=color, lw=2,
             label='{0} (AP = {1:0.2f})'.format(label_map[i], average_precision[i]))

plt.xlabel('Recall')
plt.ylabel('Precision')
plt.title('Precision-Recall Curve')
plt.legend(loc="best")
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.grid(True)
plt.show()

In [None]:
from sklearn.metrics import classification_report

y_pred = np.argmax(model.predict(X_test_padded), axis=1)
print(classification_report(y_test, y_pred, target_names=label_map.values()))

In [None]:
from wordcloud import WordCloud
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
import nltk

nltk.download('punkt')
posText = df_train[df_train['sentiment'] == 'positive']['selected_text']
negText = df_train[df_train['sentiment'] == 'negative']['selected_text']
neuText = df_train[df_train['sentiment'] == 'neutral']['selected_text']

posWord = [word.lower() for text in posText for word in word_tokenize(str(text))]
negWord = [word.lower() for text in negText for word in word_tokenize(str(text))]
neuWord = [word.lower() for text in neuText for word in word_tokenize(str(text))]

stop_words = set(stopwords.words('english'))
positive = [word for word in posWord if word not in stop_words]
negative = [word for word in negWord if word not in stop_words]
neutral = [word for word in neuWord if word not in stop_words]

def plot_word_cloud(words, title):
    wordcloud = WordCloud(width=800, height=400, background_color='white',colormap= 'summer').generate(' '.join(words))
    plt.figure(figsize=(12, 8))
    plt.imshow(wordcloud, interpolation='bilinear')
    plt.title(title)
    plt.axis('off')
    plt.show()

plot_word_cloud(positive, 'Positive Word Cloud')

In [None]:
plot_word_cloud(negative, 'Negative Word Cloud')

In [None]:
plot_word_cloud(neutral, 'Neutral Word Cloud')

In [None]:
model = Model(inputs=input_layer, outputs=[output_layer, attention_out])
prediction, attention_weights = model.predict(X_test_padded)
attention_sample = attention_weights[0]

attention_avg = np.mean(attention_sample, axis=0)

top_k_indices = np.argsort(attention_avg)[-5:]  
def highlight_keywords(text_tokens, top_indices):
    highlighted_text = ""
    for idx, token in enumerate(text_tokens):
        if idx in top_indices:
            highlighted_text += "**{}** ".format(token) 
        else:
            highlighted_text += "{} ".format(token)
    return highlighted_text

highlighted_text = highlight_keywords(df_train['tokens'], top_k_indices)
print(highlighted_text)

In [None]:
sentiment_lexicon = {
    "positive": positive_words,
    "negative": negative_words,
    "neutral": neutral_words
}

def get_average_vector(words, model):
    vectors = [model.wv[word] for word in words if word in model.wv]
    if vectors:
        return np.mean(vectors, axis=0)
    else:
        return np.zeros(model.vector_size)

average_vectors = {
    sentiment: get_average_vector(words, model_w2v) for sentiment, words in sentiment_lexicon.items()
}