In [None]:
import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
import lightgbm as lgb
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from tqdm import tqdm
import matplotlib.pyplot as plt

# 检查 GPU 是否可用
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# 数据加载与预处理
transform = transforms.Compose([transforms.Resize((224, 224)),  # ResNet 要求输入尺寸
                                 transforms.ToTensor(),
                                 transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])])

# 加载 CIFAR-10 数据集
train_dataset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
test_dataset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=64, shuffle=False)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=64, shuffle=False)

# 加载预训练的 ResNet 模型
resnet = torchvision.models.resnet18(pretrained=True)
resnet.fc = nn.Identity()  # 去掉最后的分类层，保留特征提取部分
resnet.to(device)  # 将模型移动到 GPU
resnet.eval()

# 提取特征函数
def extract_features(loader, model, device):
    features = []
    labels = []
    with torch.no_grad():
        for images, targets in tqdm(loader, desc="Extracting features"):  # tqdm 显示进度
            images = images.to(device)  # 将输入数据移动到 GPU
            outputs = model(images)  # 提取特征
            features.append(outputs.cpu())  # 移动回 CPU
            labels.append(targets)
    features = torch.cat(features).numpy()
    labels = torch.cat(labels).numpy()
    return features, labels

# 提取训练和测试集的特征
print("Extracting features using ResNet on GPU...")
X_train, y_train = extract_features(train_loader, resnet, device)
X_test, y_test = extract_features(test_loader, resnet, device)



In [None]:

# 定义评估函数
def evaluate_model(model, X_test, y_test):
    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    precision = precision_score(y_test, y_pred, average='macro')
    recall = recall_score(y_test, y_pred, average='macro')
    f1 = f1_score(y_test, y_pred, average='macro')
    return {'accuracy': accuracy, 'precision': precision, 'recall': recall, 'f1': f1}

# 保存指标的字典
metrics = {}

# 逻辑回归分类
print("Training Logistic Regression...")
log_reg = LogisticRegression(max_iter=500, multi_class='multinomial', solver='lbfgs', random_state=42)
log_reg.fit(X_train, y_train)
metrics['LogisticRegression'] = evaluate_model(log_reg, X_test, y_test)

# LightGBM 分类
print("Training LightGBM...")
lgb_train = lgb.Dataset(X_train, label=y_train)
params = {
    'objective': 'multiclass',
    'num_class': 10,  # CIFAR-10 有 10 个类别
    'boosting_type': 'gbdt',
    'metric': 'multi_logloss',
    'num_leaves': 31,
    'learning_rate': 0.1,
    'feature_fraction': 0.9,
    'verbose': -1,
    'random_state': 42
}
lgb_model = lgb.train(params, lgb_train, num_boost_round=100)
y_pred_lgb = lgb_model.predict(X_test)
y_pred_lgb = y_pred_lgb.argmax(axis=1)  # 获取预测的类别
metrics['LightGBM'] = {
    'accuracy': accuracy_score(y_test, y_pred_lgb),
    'precision': precision_score(y_test, y_pred_lgb, average='macro'),
    'recall': recall_score(y_test, y_pred_lgb, average='macro'),
    'f1': f1_score(y_test, y_pred_lgb, average='macro')
}

# 随机森林分类
print("Training Random Forest...")
random_forest = RandomForestClassifier(n_estimators=100, n_jobs=-1, random_state=42)
random_forest.fit(X_train, y_train)
metrics['RandomForest'] = evaluate_model(random_forest, X_test, y_test)

# 打印保存的指标
print("Metrics for all models:")
for model_name, model_metrics in metrics.items():
    print(f"{model_name}: {model_metrics}")

In [None]:
import matplotlib.pyplot as plt
import numpy as np

# 可视化指标的高级柱状图
def plot_metrics(metrics):
    metrics_names = ['accuracy', 'precision', 'recall', 'f1']
    model_names = list(metrics.keys())
    
    # 设置浅色系配色
    colors = ['#ADD8E6', '#90EE90', '#FFB6C1', '#FFDAB9']
    
    for metric_name in metrics_names:
        values = [metrics[model][metric_name] for model in model_names]
        
        plt.figure(figsize=(10, 6))
        bars = plt.bar(model_names, values, color=colors, alpha=0.8, edgecolor='black')
        
        # 添加数值标注
        for bar in bars:
            yval = bar.get_height()
            plt.text(bar.get_x() + bar.get_width()/2, yval + 0.02, f"{yval:.2f}",
                     ha='center', va='bottom', fontsize=12, color='black')
        
        # 美化图形
        plt.title(f"Comparison of {metric_name.capitalize()} Across Models", fontsize=16, fontweight='bold')
        plt.ylabel(metric_name.capitalize(), fontsize=14)
        plt.xlabel("Models", fontsize=14)
        plt.ylim(0, 1)  # 设置 y 轴范围为 0 到 1
        plt.grid(axis='y', linestyle='--', alpha=0.7)
        
        # 保存或展示图像
        plt.tight_layout()
        plt.show()

# 调用绘图函数
plot_metrics(metrics)


In [None]:
from sklearn.decomposition import PCA
from sklearn.cluster import KMeans
import matplotlib.pyplot as plt
import numpy as np

# 使用 K-means 聚类
print("Performing K-means clustering...")
kmeans = KMeans(n_clusters=10, random_state=42)
kmeans_labels = kmeans.fit_predict(X_train)

# 使用 PCA 将数据降至 2 维
print("Reducing dimensions with PCA...")
pca = PCA(n_components=2)
X_pca = pca.fit_transform(X_train)

# 绘制聚类结果的散点图
def plot_clusters(X, labels, title="K-means Clustering with PCA"):
    plt.figure(figsize=(10, 7))
    
    # 获取唯一的簇标签
    unique_labels = np.unique(labels)
    colors = plt.cm.Paired(np.linspace(0, 1, len(unique_labels)))  # 使用一组颜色
    
    for label, color in zip(unique_labels, colors):
        cluster_points = X[labels == label]
        plt.scatter(cluster_points[:, 0], cluster_points[:, 1], 
                    s=50, label=f"Cluster {label}", color=color, alpha=0.7, edgecolor='k')
    
    # 美化图形
    plt.title(title, fontsize=16, fontweight='bold')
    plt.xlabel("Principal Component 1", fontsize=14)
    plt.ylabel("Principal Component 2", fontsize=14)
    plt.legend(title="Clusters", fontsize=10)
    plt.grid(alpha=0.5, linestyle='--')
    plt.tight_layout()
    plt.show()

# 调用函数绘制散点图
plot_clusters(X_pca, kmeans_labels)


In [None]:
from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns
import matplotlib.pyplot as plt

# 混淆矩阵和分类报告函数
def evaluate_and_visualize(y_test, y_pred, model_name):
    # 混淆矩阵
    cm = confusion_matrix(y_test, y_pred)
    print(f"\nClassification Report for {model_name}:\n")
    print(classification_report(y_test, y_pred, digits=4))
    
    # 可视化混淆矩阵
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap="Blues", xticklabels=range(10), yticklabels=range(10))
    plt.title(f"Confusion Matrix for {model_name}", fontsize=16)
    plt.xlabel("Predicted Labels", fontsize=14)
    plt.ylabel("True Labels", fontsize=14)
    plt.tight_layout()
    plt.show()

# LG 混淆矩阵和分类报告
print("Evaluating log...")
y_pred_knn = log_reg.predict(X_test)
evaluate_and_visualize(y_test, y_pred_knn, "log")

# LightGBM 混淆矩阵和分类报告
print("Evaluating LightGBM...")
evaluate_and_visualize(y_test, y_pred_lgb, "LightGBM")

# 随机森林 混淆矩阵和分类报告
print("Evaluating Random Forest...")
y_pred_rf = random_forest.predict(X_test)
evaluate_and_visualize(y_test, y_pred_rf, "Random Forest")


In [None]:
import optuna
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
from sklearn.preprocessing import StandardScaler
import seaborn as sns
import matplotlib.pyplot as plt

# 数据预处理：标准化
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# Optuna 调参目标函数
def objective(trial):
    # 定义超参数搜索空间
    penalty = trial.suggest_categorical('penalty', ['l2', 'none'])  # 正则化类型
    C = trial.suggest_float('C', 0.01, 10.0, log=True)  # 正则化强度
    max_iter = trial.suggest_int('max_iter', 500, 2000)  # 最大迭代次数

    # 初始化逻辑回归模型
    model = LogisticRegression(
        penalty=penalty,
        C=C,
        max_iter=max_iter,
        random_state=42,
        solver='lbfgs',  # 支持多分类
        multi_class='multinomial'
    )
    
    # 训练模型
    model.fit(X_train_scaled, y_train)
    
    # 验证集预测
    y_pred = model.predict(X_test_scaled)
    
    # 返回准确率作为优化目标
    return accuracy_score(y_test, y_pred)

# 进行参数调优
print("Starting Optuna parameter tuning...")
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=10)

# 最佳参数和最佳准确率
best_params = study.best_params
best_accuracy = study.best_value
print("\nBest parameters found by Optuna:")
print(best_params)
print(f"Best accuracy: {best_accuracy:.4f}")

# 使用最佳参数重新训练逻辑回归
print("Retraining Logistic Regression with best parameters...")
best_model = LogisticRegression(
    penalty=best_params['penalty'],
    C=best_params['C'],
    max_iter=best_params['max_iter'],
    random_state=42,
    solver='lbfgs',
    multi_class='multinomial'
)
best_model.fit(X_train_scaled, y_train)
y_pred_best = best_model.predict(X_test_scaled)

# 分类报告和混淆矩阵
print("\nBest Logistic Regression Classification Report:\n")
print(classification_report(y_test, y_pred_best, digits=4))

# 混淆矩阵可视化
def visualize_confusion_matrix(y_true, y_pred, model_name="Best Logistic Regression"):
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap="Blues", xticklabels=range(5), yticklabels=range(5))
    plt.title(f"Confusion Matrix for {model_name}", fontsize=16)
    plt.xlabel("Predicted Labels", fontsize=14)
    plt.ylabel("True Labels", fontsize=14)
    plt.tight_layout()
    plt.show()

visualize_confusion_matrix(y_test, y_pred_best, model_name="Best Logistic Regression")


In [None]:
import os
import re
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns
import matplotlib.pyplot as plt
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from sklearn.preprocessing import LabelEncoder
from sklearn.feature_extraction.text import CountVectorizer

# nltk 数据下载
import nltk
nltk.download('punkt')
nltk.download('stopwords')

# 数据集加载
def load_bbc_dataset(base_path):
    texts, labels = [], []
    label_map = {"business": 0, "entertainment": 1, "politics": 2, "sport": 3, "tech": 4}
    for label, label_id in label_map.items():
        folder = os.path.join(base_path, label)
        for file in os.listdir(folder):
            file_path = os.path.join(folder, file)
            with open(file_path, 'r', encoding='latin1') as f:
                texts.append(f.read())
                labels.append(label_id)
    return texts, labels

# 数据预处理
def preprocess_text(texts):
    stop_words = set(stopwords.words('english'))
    preprocessed_texts = []
    for text in texts:
        # 转小写，移除标点
        text = re.sub(r'[^\w\s]', '', text.lower())
        # 分词
        words = word_tokenize(text)
        # 移除停用词
        words = [word for word in words if word not in stop_words]
        preprocessed_texts.append(' '.join(words))
    return preprocessed_texts

# 自定义数据集类
class BBCDataset(Dataset):
    def __init__(self, texts, labels, vectorizer):
        self.texts = vectorizer.transform(texts).toarray()
        self.labels = np.array(labels)

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return torch.tensor(self.texts[idx], dtype=torch.float32), torch.tensor(self.labels[idx], dtype=torch.long)

# 神经网络模型
class TextClassificationModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_classes):
        super(TextClassificationModel, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_dim, num_classes)
        self.softmax = nn.Softmax(dim=1)

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        return self.softmax(x)

# 加载数据
base_path = "./"  # 替换为 BBC 数据集的根路径
texts, labels = load_bbc_dataset(base_path)

# 数据预处理
texts = preprocess_text(texts)

# 特征提取
vectorizer = CountVectorizer(max_features=5000)  # 选择前 5000 个最常见词
vectorizer.fit(texts)

# 划分数据集
X_train, X_test, y_train, y_test = train_test_split(texts, labels, test_size=0.2, random_state=42)

# 创建数据加载器
train_dataset = BBCDataset(X_train, y_train, vectorizer)
test_dataset = BBCDataset(X_test, y_test, vectorizer)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# 模型训练
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = TextClassificationModel(input_dim=5000, hidden_dim=128, num_classes=5).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 训练过程 (记录训练和测试集的损失)
def train_model_with_loss(model, train_loader, test_loader, num_epochs=10):
    train_losses, test_losses = [], []
    train_accuracies, test_accuracies = [], []

    for epoch in range(num_epochs):
        model.train()
        total_train_loss, correct, total = 0, 0, 0

        for texts, labels in train_loader:
            texts, labels = texts.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(texts)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            total_train_loss += loss.item()
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        train_loss = total_train_loss / len(train_loader)
        train_accuracy = correct / total
        train_losses.append(train_loss)
        train_accuracies.append(train_accuracy)

        # 验证集损失和准确率
        model.eval()
        total_test_loss, correct, total = 0, 0, 0
        with torch.no_grad():
            for texts, labels in test_loader:
                texts, labels = texts.to(device), labels.to(device)
                outputs = model(texts)
                loss = criterion(outputs, labels)
                total_test_loss += loss.item()
                _, predicted = outputs.max(1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        test_loss = total_test_loss / len(test_loader)
        test_accuracy = correct / total
        test_losses.append(test_loss)
        test_accuracies.append(test_accuracy)

        print(f"Epoch [{epoch + 1}/{num_epochs}], Train Loss: {train_loss:.4f}, Test Loss: {test_loss:.4f}, Train Acc: {train_accuracy:.4f}, Test Acc: {test_accuracy:.4f}")

    return train_losses, test_losses, train_accuracies, test_accuracies

# 训练模型


# 模型评估
def evaluate_model(model, loader):
    model.eval()
    correct, total = 0, 0
    with torch.no_grad():
        for texts, labels in loader:
            texts, labels = texts.to(device), labels.to(device)
            outputs = model(texts)
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    return correct / total

# 训练模型
num_epochs = 10
train_losses, test_losses, train_accuracies, test_accuracies = train_model_with_loss(model, train_loader, test_loader, num_epochs)
# 绘制损失曲线和准确率曲线
def plot_metrics_with_test(train_losses, test_losses, train_accuracies, test_accuracies):
    plt.figure(figsize=(12, 5))

    # Loss 曲线
    plt.subplot(1, 2, 1)
    plt.plot(train_losses, label="Train Loss", color='blue')
    plt.plot(test_losses, label="Test Loss", color='orange')
    plt.title("Loss Curve")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.legend()
    plt.grid(alpha=0.5)

    # Accuracy 曲线
    plt.subplot(1, 2, 2)
    plt.plot(train_accuracies, label="Train Accuracy", color='green')
    plt.plot(test_accuracies, label="Test Accuracy", color='red')
    plt.title("Accuracy Curve")
    plt.xlabel("Epoch")
    plt.ylabel("Accuracy")
    plt.legend()
    plt.grid(alpha=0.5)

    plt.tight_layout()
    plt.show()

plot_metrics_with_test(train_losses, test_losses, train_accuracies, test_accuracies)

# 混淆矩阵和分类报告
def confusion_and_report(model, loader):
    model.eval()
    all_preds, all_labels = [], []
    with torch.no_grad():
        for texts, labels in loader:
            texts, labels = texts.to(device), labels.to(device)
            outputs = model(texts)
            _, predicted = outputs.max(1)
            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    cm = confusion_matrix(all_labels, all_preds)
    print("\nClassification Report:\n")
    print(classification_report(all_labels, all_preds, digits=4, target_names=["business", "entertainment", "politics", "sport", "tech"]))

    # 混淆矩阵可视化
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap="Blues", xticklabels=["business", "entertainment", "politics", "sport", "tech"], yticklabels=["business", "entertainment", "politics", "sport", "tech"])
    plt.title("Confusion Matrix")
    plt.xlabel("Predicted Labels")
    plt.ylabel("True Labels")
    plt.tight_layout()
    plt.show()

confusion_and_report(model, test_loader)


In [None]:
# 优化的神经网络模型
class OptimizedTextClassificationModel(nn.Module):
    def __init__(self, input_dim, hidden_dim1, hidden_dim2, num_classes):
        super(OptimizedTextClassificationModel, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim1)
        self.bn1 = nn.BatchNorm1d(hidden_dim1)
        self.dropout1 = nn.Dropout(0.5)
        
        self.fc2 = nn.Linear(hidden_dim1, hidden_dim2)
        self.bn2 = nn.BatchNorm1d(hidden_dim2)
        self.dropout2 = nn.Dropout(0.5)
        
        self.fc3 = nn.Linear(hidden_dim2, num_classes)
        self.softmax = nn.Softmax(dim=1)

    def forward(self, x):
        x = self.fc1(x)
        x = self.bn1(x)
        x = nn.ReLU()(x)
        x = self.dropout1(x)
        
        x = self.fc2(x)
        x = self.bn2(x)
        x = nn.ReLU()(x)
        x = self.dropout2(x)
        
        x = self.fc3(x)
        return self.softmax(x)

# 初始化优化后的模型
model = OptimizedTextClassificationModel(input_dim=5000, hidden_dim1=256, hidden_dim2=128, num_classes=5).to(device)

# 优化器和学习率调度器
optimizer = optim.Adam(model.parameters(), lr=0.001)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.5)  # 每5个epoch学习率减半

# 训练过程 (添加调度器)
def train_model_with_scheduler(model, train_loader, test_loader, num_epochs=10):
    train_losses, test_losses = [], []
    train_accuracies, test_accuracies = [], []

    for epoch in range(num_epochs):
        model.train()
        total_train_loss, correct, total = 0, 0, 0

        for texts, labels in train_loader:
            texts, labels = texts.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(texts)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            total_train_loss += loss.item()
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        train_loss = total_train_loss / len(train_loader)
        train_accuracy = correct / total
        train_losses.append(train_loss)
        train_accuracies.append(train_accuracy)

        # 验证集损失和准确率
        model.eval()
        total_test_loss, correct, total = 0, 0, 0
        with torch.no_grad():
            for texts, labels in test_loader:
                texts, labels = texts.to(device), labels.to(device)
                outputs = model(texts)
                loss = criterion(outputs, labels)
                total_test_loss += loss.item()
                _, predicted = outputs.max(1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        test_loss = total_test_loss / len(test_loader)
        test_accuracy = correct / total
        test_losses.append(test_loss)
        test_accuracies.append(test_accuracy)

        # 更新学习率
        scheduler.step()

        print(f"Epoch [{epoch + 1}/{num_epochs}], Train Loss: {train_loss:.4f}, Test Loss: {test_loss:.4f}, Train Acc: {train_accuracy:.4f}, Test Acc: {test_accuracy:.4f}")

    return train_losses, test_losses, train_accuracies, test_accuracies

# 训练优化后的模型
print("Training optimized model...")
train_losses, test_losses, train_accuracies, test_accuracies = train_model_with_scheduler(model, train_loader, test_loader, 100)

# 绘制损失曲线和准确率曲线
plot_metrics_with_test(train_losses, test_losses, train_accuracies, test_accuracies)

# 混淆矩阵和分类报告
confusion_and_report(model, test_loader)
