# Data Preprocessing

In [14]:
import nltk

nltk.data.clear_cache()  # 清除 nltk 数据缓存
nltk.download('punkt')    # 重新下载 punkt 资源
nltk.download('punkt_tab')



[nltk_data] Downloading package punkt to C:\Users\Liu
[nltk_data]     Yifan\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package punkt_tab to C:\Users\Liu
[nltk_data]     Yifan\AppData\Roaming\nltk_data...
[nltk_data]   Unzipping tokenizers\punkt_tab.zip.


True

In [15]:
from nltk.tokenize import word_tokenize
text = "This is a test sentence."
tokens = word_tokenize(text)
print(tokens)

['This', 'is', 'a', 'test', 'sentence', '.']


In [17]:
from transformers import BertTokenizer
import pandas as pd




df=pd.read_csv("training.300000.processed.noemoticon.csv", encoding='latin-1')
df = df[['text', 'sentiment']]  # 选择需要的列

# 清洗推文文本
df['sentiment'] = df['sentiment'].replace(4, 1)  # 将 4 替换为 1，表示正面情感

# 使用 BERT 的 tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

# 定义预处理函数，将文本转换为 tokenizer 的输入格式
def preprocess_function(examples):
    return tokenizer(examples['text'], truncation=True, padding='max_length', max_length=128)

# 应用分词到数据集
df['input_ids'] = df['text'].apply(lambda x: tokenizer(x, padding='max_length', truncation=True, max_length=128)['input_ids'])

tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development


vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

# Training
## Transform

In [6]:
# Step 1: 导入库
import pandas as pd
import torch
from torch import nn
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split
from transformers import BertTokenizer, BertConfig, BertModel
from torch.optim import AdamW

# Step 2: 加载数据集
file_path = 'training.300000.processed.noemoticon.csv'
df = pd.read_csv(file_path, encoding='latin-1')

# 只保留文本和情感标签，替换情感标签 4 -> 1（表示正面）
df = df[['text', 'sentiment']]
df['sentiment'] = df['sentiment'].replace(4, 1)  # 1 表示正面，0 表示负面

# Step 3: 划分数据集为训练集和验证集
train_texts, val_texts, train_labels, val_labels = train_test_split(df['text'], df['sentiment'], test_size=0.2, random_state=42)

# Step 4: 使用 BertTokenizer 对数据进行分词
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

train_encodings = tokenizer(list(train_texts), truncation=True, padding=True, max_length=128,return_attention_mask=True)
val_encodings = tokenizer(list(val_texts), truncation=True, padding=True, max_length=128,return_attention_mask=True)

# Step 5: 创建 PyTorch 数据集类
class TweetDataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item

# 创建训练集和验证集的数据集对象
train_dataset = TweetDataset(train_encodings, train_labels.tolist())
val_dataset = TweetDataset(val_encodings, val_labels.tolist())

# Step 6: 创建 DataLoader
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32)

# Step 7: 定义从零构建的 Transformer 模型
class CustomBERTModel(nn.Module):
    def __init__(self, config):
        super(CustomBERTModel, self).__init__()
        self.bert = BertModel(config)  # 从零开始初始化 BERT 模型
        self.dropout = nn.Dropout(0.3)  # 添加 dropout 层防止过拟合
        self.classifier = nn.Linear(config.hidden_size, 2)  # 二分类

    def forward(self, input_ids, attention_mask=None):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        pooled_output = outputs[1]  # 获取 [CLS] token 的输出
        pooled_output = self.dropout(pooled_output)
        logits = self.classifier(pooled_output)
        return logits

# 自定义配置
config = BertConfig(
    vocab_size=30522,      # 词汇表大小
    hidden_size=512,       # 隐藏层大小
    num_hidden_layers=6,   # Transformer 层数
    num_attention_heads=8, # 多头注意力的头数
    intermediate_size=1024,# 前馈神经网络的中间层大小
    max_position_embeddings=512  # 最大序列长度
)

# 初始化自定义的 Transformer 模型
model = CustomBERTModel(config)

# Step 8: 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = AdamW(model.parameters(), lr=2e-5)
model.to("cuda")

# Step 9: 训练模型，并在每轮结束后进行验证
for epoch in range(3):  # 假设训练 3 轮
    model.train()  # 进入训练模式
    total_train_loss = 0
    
    # 训练阶段
    for batch in train_loader:
        optimizer.zero_grad()  # 清空梯度
        input_ids =  batch['input_ids'].to("cuda")
        attention_mask = batch['attention_mask'].to("cuda")  # attention_mask 也要移动到 GPU

        labels = batch['labels'].to("cuda")
        
        # 前向传播
        outputs = model(input_ids,attention_mask=attention_mask)
        loss = criterion(outputs, labels)  # 计算损失
        total_train_loss += loss.item()
        
        # 反向传播和参数更新
        loss.backward()
        optimizer.step()

    print(f'Epoch {epoch + 1}, Training Loss: {total_train_loss / len(train_loader)}')

    # 验证阶段
    model.eval()  # 进入评估模式
    total_val_loss = 0
    correct = 0
    total = 0
    
    with torch.no_grad():  # 禁用梯度计算
        for batch in val_loader:
            input_ids = batch['input_ids'].to("cuda")
            attention_mask = batch['attention_mask'].to('cuda')
            labels = batch['labels'].to("cuda")
            
            # 前向传播
            outputs = model(input_ids,attention_mask=attention_mask)
            loss = criterion(outputs, labels)  # 计算验证损失
            total_val_loss += loss.item()
            
            # 计算准确率
            _, predicted = torch.max(outputs, dim=1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    val_accuracy = correct / total  # 计算验证准确率
    print(f'Epoch {epoch + 1}, Validation Loss: {total_val_loss / len(val_loader)}, Validation Accuracy: {val_accuracy * 100:.2f}%')

# # Step 10: 进行推理
# def predict_sentiment(text):
#     inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=128)
#     outputs = model(inputs['input_ids'])
#     _, prediction = torch.max(outputs, dim=1)
#     return 'Positive' if prediction.item() == 1 else 'Negative'

# # 测试推理
# print(predict_sentiment("I love this product!"))


KeyboardInterrupt: 

# Fine tune


In [1]:
import torch
import numpy as np
from torch import nn
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import GridSearchCV, KFold, train_test_split
from sklearn.metrics import make_scorer, accuracy_score
from transformers import BertTokenizer, BertConfig, BertModel
from torch.optim import AdamW
import pandas as pd

# Step 1: 定义自定义数据集
class TweetDataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item

# Step 2: 加载数据集并进行预处理
file_path = 'training.300000.processed.noemoticon.csv'
df = pd.read_csv(file_path, encoding='latin-1')

# 只保留文本和情感标签，替换情感标签 4 -> 1（表示正面）
df = df[['text', 'sentiment']]
df['sentiment'] = df['sentiment'].replace(4, 1)

# 划分数据集为训练集和验证集
train_texts, val_texts, train_labels, val_labels = train_test_split(df['text'], df['sentiment'], test_size=0.2, random_state=42)

# Step 3: 实例化 BERT tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

# Step 4: 定义自定义 PyTorch 模型（使用 BERT 作为主干）
class CustomBERTModel(nn.Module):
    def __init__(self, config):
        super(CustomBERTModel, self).__init__()
        self.bert = BertModel(config)
        self.dropout = nn.Dropout(0.3)
        self.classifier = nn.Linear(config.hidden_size, 2)  # 二分类问题

    def forward(self, input_ids, attention_mask=None):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        pooled_output = outputs[1]  # 获取 [CLS] token 的输出
        pooled_output = self.dropout(pooled_output)
        logits = self.classifier(pooled_output)
        return logits

# Step 5: 定义用于训练的模型封装类
class CustomTorchModel:
    def __init__(self, config, learning_rate, batch_size, device):
        self.config = config
        self.learning_rate = learning_rate
        self.batch_size = batch_size
        self.device = device
        self.model = CustomBERTModel(config).to(self.device)
        self.optimizer = AdamW(self.model.parameters(), lr=self.learning_rate)
        self.criterion = nn.CrossEntropyLoss()

    def fit(self, X_train, y_train):
        # 使用全局变量 tokenizer
        train_encodings = tokenizer(list(X_train), truncation=True, padding=True, max_length=128, return_attention_mask=True)
        train_dataset = TweetDataset(train_encodings, y_train.tolist())
        train_loader = DataLoader(train_dataset, batch_size=self.batch_size, shuffle=True)

        self.model.train()
        for epoch in range(3):  # 你可以调整 epoch
            total_loss = 0
            for batch in train_loader:
                self.optimizer.zero_grad()
                input_ids = batch['input_ids'].to(self.device)
                attention_mask = batch['attention_mask'].to(self.device)
                labels = batch['labels'].to(self.device)

                outputs = self.model(input_ids, attention_mask=attention_mask)
                loss = self.criterion(outputs, labels)
                total_loss += loss.item()

                loss.backward()
                self.optimizer.step()

        return self

    def predict(self, X):
        val_encodings = tokenizer(list(X), truncation=True, padding=True, max_length=128, return_attention_mask=True)
        val_dataset = TweetDataset(val_encodings, np.zeros(len(X)))
        val_loader = DataLoader(val_dataset, batch_size=self.batch_size)

        self.model.eval()
        predictions = []
        with torch.no_grad():
            for batch in val_loader:
                input_ids = batch['input_ids'].to(self.device)
                attention_mask = batch['attention_mask'].to(self.device)

                outputs = self.model(input_ids, attention_mask=attention_mask)
                _, preds = torch.max(outputs, dim=1)
                predictions.extend(preds.cpu().numpy())

        return np.array(predictions)

# Step 6: 封装模型为 Sklearn 格式以用于 GridSearchCV
class SklearnWrappedModel:
    def __init__(self, num_hidden_layers=6, num_attention_heads=8, learning_rate=2e-5, batch_size=32):
        self.num_hidden_layers = num_hidden_layers
        self.num_attention_heads = num_attention_heads
        self.learning_rate = learning_rate
        self.batch_size = batch_size
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        self.config = BertConfig(
            vocab_size=30522,
            hidden_size=512,
            num_hidden_layers=num_hidden_layers,
            num_attention_heads=num_attention_heads,
            intermediate_size=1024,
            max_position_embeddings=512
        )

    def get_params(self, deep=True):
        return {
            'num_hidden_layers': self.num_hidden_layers,
            'num_attention_heads': self.num_attention_heads,
            'learning_rate': self.learning_rate,
            'batch_size': self.batch_size
        }

    def set_params(self, **params):
        for key, value in params.items():
            setattr(self, key, value)
        self.config.num_hidden_layers = self.num_hidden_layers
        self.config.num_attention_heads = self.num_attention_heads
        return self

    def fit(self, X, y):
        self.model = CustomTorchModel(self.config, self.learning_rate, self.batch_size, self.device)
        self.model.fit(X, y)

    def predict(self, X):
        return self.model.predict(X)

# Step 7: 定义网格搜索的超参数空间
param_grid = {
    'learning_rate': [1e-5, 2e-5, 5e-5],
    'batch_size': [16, 32],
    'num_hidden_layers': [4, 6, 8],
    'num_attention_heads': [4, 8]
}

# Step 8: 进行五折交叉验证和网格搜索
cv = KFold(n_splits=5, shuffle=True, random_state=42)

grid_search = GridSearchCV(
    estimator=SklearnWrappedModel(),
    param_grid=param_grid,
    scoring=make_scorer(accuracy_score),
    cv=cv,
    n_jobs=5  # 并行处理
)

# Step 9: 开始训练和网格搜索
grid_search.fit(train_texts, train_labels)

# 查看最佳参数
print(f"Best Parameters: {grid_search.best_params_}")
print(f"Best CV Accuracy: {grid_search.best_score_}")

# 使用最佳模型进行验证集评估
best_model = grid_search.best_estimator_
preds = best_model.predict(val_texts)
val_accuracy = accuracy_score(val_labels, preds)
print(f"Validation Accuracy: {val_accuracy * 100:.2f}%")


KeyboardInterrupt: 