In [None]:
import os
import glob
import spacy
import random
import warnings
from matplotlib import pyplot as plt
from collections import Counter
import pandas as pd
import seaborn as sns

# set the random seed for reproducibility
random.seed(123)
# turn off depreciation warnings and future warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)
warnings.filterwarnings("ignore", category=FutureWarning)
# load spacy model
nlp = spacy.load('en_core_web_lg')

# function to load data
def load_data(base_dir):
    data = []
    labels = []
    files = []
    for label in ['positive', 'negative']:
        for filepath in glob.glob(os.path.join(base_dir, label, '*.txt')):
            with open(filepath, 'r', encoding='utf-8') as file:
                data.append(file.read())
                labels.append(1 if label == 'positive' else 0)
                files.append(filepath)
                
    return data, labels, files

# delete the contents after "What I've decided and why"
def clean_data(data):
    cleaned_data = []
    for text in data:
        cleaned_data.append(text.split("What I've decided and why")[0])
    return cleaned_data


## Vfidf vectorizer and Logistic Regression

In [None]:
# preprocess the texts
def preprocess_texts(texts):
    docs = [nlp(text) for text in texts]
    return docs

# function to remove stopwords and punctuation
def remove_stopwords_punctuation(docs):
    cleaned_docs = []
    for doc in docs:
        doc = [token for token in doc if not token.is_stop and not token.is_punct]
        doc = [token for token in doc if token.text not in ['\n', 'Mr', 'Mrs', 'Miss', 'Ms']]
        doc = [token for token in doc if len(token.text) > 1]
        cleaned_docs.append(doc)
    return cleaned_docs

#  lowercase and lemmatise the tokens
def lowercase_and_lemmatise(docs):
    lemmatised_docs = []
    for doc in docs:
        lemmatised_tokens = [token.lemma_.lower() for token in doc]
        lemmatised_docs.append(lemmatised_tokens)
    return lemmatised_docs

# join the tokens back together
def join_tokens(docs):
    return [' '.join(doc) for doc in docs]

# load training data
train_data, train_labels, train_files = load_data('data/train')
# load test data
test_data, test_labels, test_files = load_data('data/test')

train_data = clean_data(train_data)
test_data = clean_data(test_data)

# preprocess the training data
train_data = preprocess_texts(train_data)
# preprocess the test data
test_data = preprocess_texts(test_data)

# remove stopwords and punctuation from the training data
train_data = remove_stopwords_punctuation(train_data)
# remove stopwords and punctuation from the test data
test_data = remove_stopwords_punctuation(test_data)

# lowercase and lemmatise the training data
train_data = lowercase_and_lemmatise(train_data)
# lowercase and lemmatise the test data
test_data = lowercase_and_lemmatise(test_data)

# join the tokens back together for the training data
train_data = join_tokens(train_data)
# join the tokens back together for the test data
test_data = join_tokens(test_data)

In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import GridSearchCV
from sklearn.pipeline import make_pipeline

pipeline = make_pipeline(
    TfidfVectorizer(),
    LogisticRegression(max_iter=1000)
)

# define parameter grid for GridSearchCV
param_grid = {
    'tfidfvectorizer__ngram_range': [(1, 1), (1, 2)],
    'tfidfvectorizer__max_df': [0.9, 0.95],
    'tfidfvectorizer__min_df': [2, 5],
    'logisticregression__C': [0.1, 1, 10]
}

# perform GridSearchCV
grid_search = GridSearchCV(pipeline, param_grid, cv=5, n_jobs=-1, verbose=1)
grid_search.fit(train_data, train_labels)

# best model
best_model = grid_search.best_estimator_

# print the best parameters
print(grid_search.best_params_)


In [None]:
import matplotlib.pyplot as plt
import numpy as np

# function to plot the most important unigrams and bigrams
def plot_top_coefficients(model, train_data, top_n=20):
    # fit the vectorizer to the training data to get feature names
    vectorizer = model.named_steps['tfidfvectorizer']
    X_train_transformed = vectorizer.fit_transform(train_data)

    # get the logistic regression model coefficients
    log_reg = model.named_steps['logisticregression']
    coefficients = log_reg.coef_.flatten()

    # get feature names (unigrams and bigrams)
    feature_names = vectorizer.get_feature_names_out()

    # get the top positive and negative features (highest and lowest coefficients)
    top_positive_coefficients = np.argsort(coefficients)[-top_n:]
    top_negative_coefficients = np.argsort(coefficients)[:top_n]

    # plot the most important unigrams and bigrams
    top_coefficients = np.hstack([top_negative_coefficients, top_positive_coefficients])
    plt.figure(figsize=(15, 5))
    colors = ['lightcoral' if c < 0 else 'lightblue' for c in coefficients[top_coefficients]]
    plt.bar(np.arange(2 * top_n), coefficients[top_coefficients], color=colors)
    feature_names = np.array(feature_names)
    plt.xticks(np.arange(2 * top_n), feature_names[top_coefficients], rotation=60, ha='right')
    plt.title(f'top {top_n//2} positive and negative unigrams and bigrams')
    plt.show()

# plot the top coefficients using the best model and training data
plot_top_coefficients(best_model, train_data, top_n=20)


In [25]:
from sklearn.metrics import accuracy_score, precision_score
from sklearn.metrics import recall_score, f1_score
from sklearn.metrics import confusion_matrix

# Evaluate model and return metrics
def evaluate_model(model, test_data, test_labels, data_type='test'):
    predictions = model.predict(test_data)
    accuracy = accuracy_score(test_labels, predictions)
    precision = precision_score(test_labels, predictions)
    recall = recall_score(test_labels, predictions)
    f1 = f1_score(test_labels, predictions)
    conf_matrix = confusion_matrix(test_labels, predictions)
    
    results = {
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'f1_score': f1,
        'confusion_matrix': conf_matrix
    }

    print(f'{data_type} data metrics:')
    print(f'Accuracy: {accuracy:.4f}')
    print(f'Precision: {precision:.4f}')
    print(f'Recall: {recall:.4f}')
    print(f'F1 Score: {f1:.4f}')
    print(f'Confusion Matrix: \n{conf_matrix}')

    return results

# Get model metrics
train_metrics = evaluate_model(best_model, train_data, train_labels, data_type='Train')
print('-'*50)
test_metrics = evaluate_model(best_model, test_data, test_labels, data_type='Test')

Train data metrics:
Accuracy: 0.9011
Precision: 0.8937
Recall: 0.9091
F1 Score: 0.9013
Confusion Matrix: 
[[368  44]
 [ 37 370]]
--------------------------------------------------
Test data metrics:
Accuracy: 0.7561
Precision: 0.7407
Recall: 0.7843
F1 Score: 0.7619
Confusion Matrix: 
[[75 28]
 [22 80]]


## Bert Model

In [None]:
# # function to load data
# def load_data(base_dir):
#     data = []
#     labels = []
#     files = []
#     for label in ['positive', 'negative']:
#         for filepath in glob.glob(os.path.join(base_dir, label, '*.txt')):
#             with open(filepath, 'r', encoding='utf-8') as file:
#                 data.append(file.read())
#                 labels.append(1 if label == 'positive' else 0)
#                 files.append(filepath)
                
#     return data, labels, files

# # delete the contents after "What I've decided and why"
# def clean_data(data):
#     cleaned_data = []
#     for text in data:
#         cleaned_data.append(text.split("What I've decided and why")[0])
#     return cleaned_data

# load training data
train_data, train_labels, train_files = load_data('data/train')
# load test data
test_data, test_labels, test_files = load_data('data/test')

train_data = clean_data(train_data)
test_data = clean_data(test_data)

from sentence_transformers import SentenceTransformer

# 加载预训练的Sentence-BERT模型
sbert_model = SentenceTransformer('all-MiniLM-L6-v2')

# 向量化训练和测试数据
train_embeddings = sbert_model.encode(train_data, convert_to_tensor=True, show_progress_bar=True)
test_embeddings = sbert_model.encode(test_data, convert_to_tensor=True, show_progress_bar=True)


In [34]:
import os
import glob
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from sentence_transformers import SentenceTransformer


class TextDataset(Dataset):
    def __init__(self, embeddings, labels):
        self.embeddings = embeddings
        self.labels = labels

    def __len__(self):
        return len(self.embeddings)

    def __getitem__(self, idx):
        return self.embeddings[idx], self.labels[idx]

# 创建训练和测试数据集
train_dataset = TextDataset(train_embeddings, train_labels)
test_dataset = TextDataset(test_embeddings, test_labels)

# 创建数据加载器
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

class LSTMClassifier(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, num_layers, dropout_prob):
        super(LSTMClassifier, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True, dropout=dropout_prob)
        self.fc = nn.Linear(hidden_dim, output_dim)
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers
        self.dropout = nn.Dropout(dropout_prob)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).to(x.device)

        out, _ = self.lstm(x, (h0, c0))
        out = self.dropout(out[:, -1, :])
        out = self.fc(out)
        return out

# 模型参数
input_dim = train_embeddings.shape[1]
hidden_dim = 128
output_dim = 2  # 假设二分类
num_layers = 2
dropout_prob = 0.5  # Dropout的概率

# 实例化模型
model = LSTMClassifier(input_dim, hidden_dim, output_dim, num_layers, dropout_prob)
model = model.to('cuda' if torch.cuda.is_available() else 'cpu')

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-6)

num_epochs = 1000

for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    for embeddings, labels in train_loader:
        # embeddings, labels = embeddings.to('cuda'), labels.to('cuda')
        
        optimizer.zero_grad()
        outputs = model(embeddings.unsqueeze(1))
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
    if (epoch+1) % 100 == 0:
        print(f'Epoch {epoch+1}/{num_epochs}, Loss: {total_loss/len(train_loader)}')

model.eval()
correct = 0
total = 0
with torch.no_grad():
    for embeddings, labels in test_loader:
        # embeddings, labels = embeddings.to('cuda'), labels.to('cuda')
        outputs = model(embeddings.unsqueeze(1))
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print(f'Test Accuracy: {100 * correct / total:.2f}%')
