## 1. 挂载 Google Drive 并设置工作目录

In [None]:
# 挂载 Google Drive
drive.mount('/content/drive')

# 创建保存结果的目录
!mkdir -p "/content/drive/MyDrive/mixed_precision_results"

## 2. 准备数据集

In [None]:
# 创建示例数据
texts = [
    "这部电影很精彩，演员的表演非常出色",
    "画面很差，故事情节也很烂",
    "音乐很动听，节奏感很强",
    "服务态度恶劣，等了很久才上菜",
    "风景优美，空气清新，是个度假的好地方",
    "价格太贵了，性价比很低",
    "质量不错，用着很舒服",
    "外观设计很差，做工粗糙"
] * 100  # 复制100次以增加数据量

labels = [1, 0, 1, 0, 1, 0, 1, 0] * 100  # 1表示正面评价，0表示负面评价

# 创建DataFrame
df = pd.DataFrame({
    'text': texts,
    'label': labels
})

# 划分训练集和测试集
train_df, test_df = train_test_split(df, test_size=0.2, random_state=42)

print(f"训练集大小：{len(train_df)}")
print(f"测试集大小：{len(test_df)}")

## 3. 定义数据集类

In [None]:
class TextDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_len=64):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_len = max_len
        
    def __len__(self):
        return len(self.texts)
    
    def __getitem__(self, idx):
        text = str(self.texts[idx])
        label = self.labels[idx]
        
        encoding = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=self.max_len,
            padding='max_length',
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt'
        )
        
        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'labels': torch.tensor(label, dtype=torch.long)
        }

## 4. 定义模型

In [None]:
class TextClassifier(nn.Module):
    def __init__(self, n_classes):
        super(TextClassifier, self).__init__()
        self.bert = BertModel.from_pretrained('bert-base-chinese')
        self.drop = nn.Dropout(p=0.3)
        self.fc = nn.Linear(self.bert.config.hidden_size, n_classes)
    
    def forward(self, input_ids, attention_mask):
        outputs = self.bert(
            input_ids=input_ids,
            attention_mask=attention_mask
        )
        pooled_output = outputs[1]
        output = self.drop(pooled_output)
        return self.fc(output)

## 5. 训练函数（支持混合精度）

In [None]:
def train_model(model, train_loader, optimizer, criterion, device, use_amp=False, epochs=3):
    model.train()
    scaler = GradScaler() if use_amp else None
    train_losses = []
    train_accs = []
    training_times = []
    memory_usage = []
    
    for epoch in range(epochs):
        total_loss = 0
        correct = 0
        total = 0
        epoch_start_time = time.time()
        
        progress_bar = tqdm(train_loader, desc=f'Epoch {epoch + 1}/{epochs}')
        for batch in progress_bar:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)
            
            optimizer.zero_grad()
            
            # 使用混合精度训练
            if use_amp:
                with autocast():
                    outputs = model(input_ids, attention_mask)
                    loss = criterion(outputs, labels)
                
                scaler.scale(loss).backward()
                scaler.step(optimizer)
                scaler.update()
            else:
                outputs = model(input_ids, attention_mask)
                loss = criterion(outputs, labels)
                
                loss.backward()
                optimizer.step()
            
            total_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            
            # 记录内存使用
            memory_usage.append(torch.cuda.memory_allocated() / 1024**2)  # MB
            
            progress_bar.set_postfix({
                'loss': loss.item(),
                'memory': f'{memory_usage[-1]:.1f}MB'
            })
        
        epoch_time = time.time() - epoch_start_time
        training_times.append(epoch_time)
        
        epoch_loss = total_loss / len(train_loader)
        epoch_acc = 100 * correct / total
        train_losses.append(epoch_loss)
        train_accs.append(epoch_acc)
        
        print(f'Epoch {epoch + 1}/{epochs}:')
        print(f'Average Loss: {epoch_loss:.4f}')
        print(f'Accuracy: {epoch_acc:.2f}%')
        print(f'Time: {epoch_time:.2f}s')
        print(f'Average Memory Usage: {sum(memory_usage)/len(memory_usage):.1f}MB\n')
    
    return train_losses, train_accs, training_times, memory_usage

## 6. 比较 FP32 和 FP16 训练

In [None]:
# 设置参数
MAX_LEN = 64
BATCH_SIZE = 32
EPOCHS = 3
LEARNING_RATE = 2e-5

# 初始化tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-chinese')

# 创建数据加载器
train_dataset = TextDataset(
    texts=train_df.text.values,
    labels=train_df.label.values,
    tokenizer=tokenizer,
    max_len=MAX_LEN
)

train_loader = DataLoader(
    train_dataset,
    batch_size=BATCH_SIZE,
    shuffle=True
)

# 设置设备
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# 训练FP32模型
print("Training with FP32...")
model_fp32 = TextClassifier(n_classes=2).to(device)
optimizer_fp32 = AdamW(model_fp32.parameters(), lr=LEARNING_RATE)
criterion = nn.CrossEntropyLoss()

results_fp32 = train_model(
    model_fp32, 
    train_loader, 
    optimizer_fp32, 
    criterion, 
    device, 
    use_amp=False, 
    epochs=EPOCHS
)

# 清理内存
del model_fp32
torch.cuda.empty_cache()
gc.collect()

# 训练FP16模型
print("\nTraining with Mixed Precision (FP16)...")
model_fp16 = TextClassifier(n_classes=2).to(device)
optimizer_fp16 = AdamW(model_fp16.parameters(), lr=LEARNING_RATE)

results_fp16 = train_model(
    model_fp16, 
    train_loader, 
    optimizer_fp16, 
    criterion, 
    device, 
    use_amp=True, 
    epochs=EPOCHS
)

## 7. 比较结果

In [None]:
def plot_comparison(results_fp32, results_fp16):
    fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 10))
    
    # 损失对比
    ax1.plot(results_fp32[0], label='FP32')
    ax1.plot(results_fp16[0], label='FP16')
    ax1.set_title('Training Loss')
    ax1.set_xlabel('Epoch')
    ax1.set_ylabel('Loss')
    ax1.legend()
    
    # 准确率对比
    ax2.plot(results_fp32[1], label='FP32')
    ax2.plot(results_fp16[1], label='FP16')
    ax2.set_title('Training Accuracy')
    ax2.set_xlabel('Epoch')
    ax2.set_ylabel('Accuracy (%)')
    ax2.legend()
    
    # 训练时间对比
    ax3.bar(['FP32', 'FP16'], 
            [sum(results_fp32[2]), sum(results_fp16[2])],
            alpha=0.8)
    ax3.set_title('Total Training Time')
    ax3.set_ylabel('Time (seconds)')
    
    # 内存使用对比
    ax4.boxplot([results_fp32[3], results_fp16[3]], labels=['FP32', 'FP16'])
    ax4.set_title('Memory Usage Distribution')
    ax4.set_ylabel('Memory (MB)')
    
    plt.tight_layout()
    plt.savefig('/content/drive/MyDrive/mixed_precision_results/comparison.png')
    plt.show()

plot_comparison(results_fp32, results_fp16)

# 打印详细统计信息
print("\nDetailed Statistics:")
print("FP32:")
print(f"Average training time per epoch: {sum(results_fp32[2])/len(results_fp32[2]):.2f}s")
print(f"Average memory usage: {sum(results_fp32[3])/len(results_fp32[3]):.1f}MB")
print(f"Final accuracy: {results_fp32[1][-1]:.2f}%")

print("\nFP16:")
print(f"Average training time per epoch: {sum(results_fp16[2])/len(results_fp16[2]):.2f}s")
print(f"Average memory usage: {sum(results_fp16[3])/len(results_fp16[3]):.1f}MB")
print(f"Final accuracy: {results_fp16[1][-1]:.2f}%")

print("\nPerformance Improvement:")
time_improvement = (sum(results_fp32[2]) - sum(results_fp16[2])) / sum(results_fp32[2]) * 100
memory_improvement = (sum(results_fp32[3]) - sum(results_fp16[3])) / sum(results_fp32[3]) * 100
print(f"Time reduction: {time_improvement:.1f}%")
print(f"Memory reduction: {memory_improvement:.1f}%")

## 8. 推理性能比较

In [None]:
def inference_test(model, tokenizer, text, device, use_amp=False):
    model.eval()
    encoding = tokenizer.encode_plus(
        text,
        add_special_tokens=True,
        max_length=MAX_LEN,
        padding='max_length',
        truncation=True,
        return_attention_mask=True,
        return_tensors='pt'
    )
    
    input_ids = encoding['input_ids'].to(device)
    attention_mask = encoding['attention_mask'].to(device)
    
    start_time = time.time()
    with torch.no_grad():
        if use_amp:
            with autocast():
                outputs = model(input_ids, attention_mask)
        else:
            outputs = model(input_ids, attention_mask)
    
    inference_time = time.time() - start_time
    memory_used = torch.cuda.memory_allocated() / 1024**2
    
    probabilities = torch.softmax(outputs, dim=1)
    prediction = torch.argmax(probabilities, dim=1).item()
    confidence = probabilities[0][prediction].item()
    
    return prediction, confidence, inference_time, memory_used

# 测试文本
test_texts = [
    "这家餐厅的菜品非常美味，服务也很周到",
    "产品质量太差了，一点都不耐用",
    "这部电影剧情紧凑，演技在线"
]

print("FP32 Inference:")
for text in test_texts:
    pred, conf, time_fp32, mem_fp32 = inference_test(model_fp32, tokenizer, text, device, use_amp=False)
    print(f"\n文本: {text}")
    print(f"预测: {'正面' if pred == 1 else '负面'}")
    print(f"置信度: {conf:.2%}")
    print(f"推理时间: {time_fp32*1000:.2f}ms")
    print(f"内存使用: {mem_fp32:.1f}MB")

print("\nFP16 Inference:")
for text in test_texts:
    pred, conf, time_fp16, mem_fp16 = inference_test(model_fp16, tokenizer, text, device, use_amp=True)
    print(f"\n文本: {text}")
    print(f"预测: {'正面' if pred == 1 else '负面'}")
    print(f"置信度: {conf:.2%}")
    print(f"推理时间: {time_fp16*1000:.2f}ms")
    print(f"内存使用: {mem_fp16:.1f}MB")