In [None]:
import torch

# 检查是否有可用的GPU
print(torch.cuda.is_available())

# 输出当前使用的GPU设备
if torch.cuda.is_available():
    print(torch.cuda.current_device())
    print(torch.cuda.get_device_name(torch.cuda.current_device()))
else:
    print("CUDA is not available.")

import pandas as pd
from sklearn.model_selection import train_test_split
from transformers import BertTokenizer, BertModel
import torch
from torch.utils.data import Dataset, DataLoader

# 加载Financial PhraseBank数据集
file_path = 'E:/Haibo_Fang23-24-Dissertation/report/FinancialPhraseBank-v1.0/Sentences_50Agree.txt'

# 加载数据并添加列名
df = pd.read_csv(file_path, delimiter='@', header=None, names=['sentence', 'sentiment'], encoding='ISO-8859-1')

# 去除空白符号
df['sentence'] = df['sentence'].str.strip()
df['sentiment'] = df['sentiment'].str.strip()

# 将情绪标签转换为数字
df['sentiment'] = df['sentiment'].map({'negative': 0, 'neutral': 1, 'positive': 2})

# 划分训练集和测试集
train_texts, test_texts, train_labels, test_labels = train_test_split(df['sentence'].tolist(), df['sentiment'].tolist(), test_size=0.2, random_state=42)

# 自定义数据集类
class FinancialPhraseBankDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_len):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx]
        encoding = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=self.max_len,
            return_token_type_ids=False,
            padding='max_length',
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt'
        )
        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'labels': torch.tensor(label, dtype=torch.long)
        }

# 设置设备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# 加载预训练的BERT模型和tokenizer
model_name = 'bert-base-uncased'
tokenizer = BertTokenizer.from_pretrained(model_name)
bert_model = BertModel.from_pretrained(model_name).to(device)

# 创建数据集和数据加载器
train_dataset = FinancialPhraseBankDataset(train_texts, train_labels, tokenizer, max_len=512)
test_dataset = FinancialPhraseBankDataset(test_texts, test_labels, tokenizer, max_len=512)

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False)


In [None]:
import torch.nn as nn

class BertLSTMModel(nn.Module):
    def __init__(self, bert_model, hidden_dim, output_dim, n_layers, bidirectional, dropout):
        super(BertLSTMModel, self).__init__()
        self.bert = bert_model
        self.lstm = nn.LSTM(bert_model.config.hidden_size, 
                            hidden_dim, 
                            num_layers=n_layers, 
                            bidirectional=bidirectional, 
                            batch_first=True, 
                            dropout=dropout)
        self.fc = nn.Linear(hidden_dim * 2 if bidirectional else hidden_dim, output_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, input_ids, attention_mask):
        with torch.no_grad():
            embedded = self.bert(input_ids, attention_mask=attention_mask)[0]
        lstm_out, (hidden, cell) = self.lstm(embedded)
        if self.lstm.bidirectional:
            hidden = self.dropout(torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim=1))
        else:
            hidden = self.dropout(hidden[-1,:,:])
        output = self.fc(hidden)
        return output


In [None]:
import torch.optim as optim
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix

# Best parameters found by Optuna
best_params = {'hidden_dim': 413, 'n_layers': 2, 'bidirectional': True, 'dropout': 0.38384259206641963, 'learning_rate': 4.042723942202167e-05}

# Define model with best parameters
model = BertLSTMModel(bert_model, best_params['hidden_dim'], output_dim=3, best_params['n_layers'], 
                      best_params['bidirectional'], best_params['dropout']).to(device)
optimizer = optim.Adam(model.parameters(), lr=best_params['learning_rate'])
criterion = nn.CrossEntropyLoss().to(device)

# Training function
def train_model(model, dataloader, optimizer, criterion, device):
    model.train()
    total_loss = 0
    for batch in dataloader:
        optimizer.zero_grad()
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)
        outputs = model(input_ids, attention_mask)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(dataloader)

# Evaluation function
def evaluate_model(model, dataloader, criterion, device):
    model.eval()
    total_loss = 0
    predictions, true_labels = [], []
    with torch.no_grad():
        for batch in dataloader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)
            outputs = model(input_ids, attention_mask)
            loss = criterion(outputs, labels)
            total_loss += loss.item()
            preds = torch.argmax(outputs, dim=1).flatten()
            predictions.extend(preds.cpu().numpy())
            true_labels.extend(labels.cpu().numpy())
    return total_loss / len(dataloader), predictions, true_labels

# Training and evaluation
n_epochs = 3
for epoch in range(n_epochs):
    train_loss = train_model(model, train_loader, optimizer, criterion, device)
    valid_loss, predictions, true_labels = evaluate_model(model, test_loader, criterion, device)
    accuracy = accuracy_score(true_labels, predictions)
    print(f'Epoch {epoch+1}/{n_epochs}')
    print(f'Train Loss: {train_loss:.4f}, Valid Loss: {valid_loss:.4f}, Accuracy: {accuracy:.4f}')

# Print classification report
print(classification_report(true_labels, predictions, target_names=['negative', 'neutral', 'positive']))


In [None]:
import matplotlib.pyplot as plt

# Example loss data
train_losses = [0.7674, 0.5498, 0.4802]  # Replace with your actual training loss data
valid_losses = [0.5746, 0.5328, 0.5025]  # Replace with your actual validation loss data

# Plot loss curves
plt.figure(figsize=(10, 5))
plt.plot(train_losses, label='Train Loss')
plt.plot(valid_losses, label='Valid Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.title('Training and Validation Loss')
plt.show()
