In [3]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

# 数据集类 - 负责数据加载和预处理
class CustomDataset:
    def __init__(self, x_path, y_path, is_classification=False):
        """
        初始化数据集
        :param x_path: 特征数据CSV文件路径
        :param y_path: 标签数据文件路径
        :param is_classification: 是否为分类任务
        """
        # 加载数据
        self.x_data = pd.read_csv(x_path)
        self.y_data = pd.read_csv(y_path).values.flatten()
        
        # 记录是否为分类任务（影响后续处理）
        self.is_classification = is_classification
        
        # 数据预处理 - 确保所有数据都是数值类型
        self._preprocess_data()
        
        # 转换为PyTorch张量
        self.x_tensor = torch.tensor(self.x_data.values, dtype=torch.float32)
        self.y_tensor = torch.tensor(self.y_data, dtype=torch.float32).view(-1, 1)  # 转换为列向量
        
    def _preprocess_data(self):
        """预处理数据，确保没有object类型，只保留数值类型"""
        # 检查并处理非数值列
        for col in self.x_data.columns:
            if self.x_data[col].dtype == 'object':
                # 尝试转换为数值类型
                try:
                    self.x_data[col] = pd.to_numeric(self.x_data[col])
                except ValueError:
                    # 如果转换失败，则进行独热编码
                    self.x_data = pd.get_dummies(self.x_data, columns=[col], drop_first=True)
        
        # 处理缺失值 - 使用列的平均值填充
        self.x_data = self.x_data.fillna(self.x_data.mean())
        
        # 特征标准化（对线性模型很重要）
        self.x_data = (self.x_data - self.x_data.mean()) / self.x_data.std()
    
    def get_dataloader(self, batch_size=32, shuffle=True):
        """创建数据加载器，用于批量训练"""
        dataset = TensorDataset(self.x_tensor, self.y_tensor)
        return DataLoader(dataset, batch_size=batch_size, shuffle=shuffle)
    
    def get_feature_dim(self):
        """返回特征维度"""
        return self.x_data.shape[1]


# 模型类 - 包含线性回归和逻辑回归
class MLModel(nn.Module):
    def __init__(self, input_dim, is_classification=False):
        """
        初始化模型
        :param input_dim: 输入特征维度
        :param is_classification: 是否为分类任务（True为逻辑回归，False为线性回归）
        """
        super(MLModel, self).__init__()
        # 定义线性层
        self.linear = nn.Linear(input_dim, 1)
        # 分类任务使用sigmoid激活函数
        self.is_classification = is_classification
        self.sigmoid = nn.Sigmoid()
        
        # 记录训练历史
        self.train_losses = []
        self.val_losses = []
    
    def forward(self, x):
        """前向传播"""
        out = self.linear(x)
        if self.is_classification:
            out = self.sigmoid(out)
        return out
    
    def fit(self, train_loader, val_loader=None, epochs=100, lr=0.01):
        """
        训练模型
        :param train_loader: 训练数据加载器
        :param val_loader: 验证数据加载器（可选）
        :param epochs: 训练轮数
        :param lr: 学习率
        """
        # 根据任务类型选择损失函数
        if self.is_classification:
            criterion = nn.BCELoss()  # 二元交叉熵损失（分类）
        else:
            criterion = nn.MSELoss()  # 均方误差损失（回归）
        
        # 优化器
        optimizer = optim.SGD(self.parameters(), lr=lr)
        
        # 训练循环
        for epoch in range(epochs):
            self.train()  # 设置为训练模式
            train_loss = 0.0
            
            for inputs, labels in train_loader:
                # 清零梯度
                optimizer.zero_grad()
                
                # 前向传播
                outputs = self(inputs)
                loss = criterion(outputs, labels)
                
                # 反向传播和优化
                loss.backward()
                optimizer.step()
                
                train_loss += loss.item() * inputs.size(0)
            
            # 计算平均训练损失
            train_loss /= len(train_loader.dataset)
            self.train_losses.append(train_loss)
            
            # 验证（如果提供了验证集）
            if val_loader:
                self.eval()  # 设置为评估模式
                val_loss = 0.0
                with torch.no_grad():  # 不计算梯度
                    for inputs, labels in val_loader:
                        outputs = self(inputs)
                        loss = criterion(outputs, labels)
                        val_loss += loss.item() * inputs.size(0)
                
                val_loss /= len(val_loader.dataset)
                self.val_losses.append(val_loss)
                
                # 每10轮打印一次信息
                if (epoch + 1) % 10 == 0:
                    print(f'Epoch [{epoch+1}/{epochs}], Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}')
            else:
                if (epoch + 1) % 10 == 0:
                    print(f'Epoch [{epoch+1}/{epochs}], Train Loss: {train_loss:.4f}')
    
    def predict(self, x):
        """
        预测
        :param x: 输入特征（可以是numpy数组或DataFrame）
        :return: 预测结果
        """
        self.eval()  # 设置为评估模式
        
        # 转换输入为张量
        if isinstance(x, pd.DataFrame) or isinstance(x, np.ndarray):
            x_tensor = torch.tensor(x, dtype=torch.float32)
        elif isinstance(x, torch.Tensor):
            x_tensor = x
        else:
            raise TypeError("输入必须是DataFrame、numpy数组或PyTorch张量")
        
        # 预测
        with torch.no_grad():
            outputs = self(x_tensor)
            
            # 分类任务返回类别（0或1）
            if self.is_classification:
                return (outputs >= 0.5).float()
            # 回归任务直接返回数值
            else:
                return outputs.numpy()
    
    def evaluate(self, test_loader):
        """
        评估模型性能
        :param test_loader: 测试数据加载器
        :return: 评估指标字典
        """
        self.eval()  # 设置为评估模式
        all_preds = []
        all_labels = []
        
        with torch.no_grad():
            for inputs, labels in test_loader:
                outputs = self(inputs)
                if self.is_classification:
                    preds = (outputs >= 0.5).float()
                else:
                    preds = outputs
                
                all_preds.extend(preds.numpy())
                all_labels.extend(labels.numpy())
        
        # 转换为numpy数组
        all_preds = np.array(all_preds)
        all_labels = np.array(all_labels)
        
        # 计算评估指标
        metrics = {}
        
        if self.is_classification:
            # 分类任务指标
            accuracy = np.mean(all_preds == all_labels)
            precision = np.sum((all_preds == 1) & (all_labels == 1)) / np.sum(all_preds == 1) if np.sum(all_preds == 1) > 0 else 0
            recall = np.sum((all_preds == 1) & (all_labels == 1)) / np.sum(all_labels == 1) if np.sum(all_labels == 1) > 0 else 0
            f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
            
            metrics['accuracy'] = accuracy
            metrics['precision'] = precision
            metrics['recall'] = recall
            metrics['f1'] = f1
            print(f"分类准确率: {accuracy:.4f}")
            print(f"精确率: {precision:.4f}")
            print(f"召回率: {recall:.4f}")
            print(f"F1分数: {f1:.4f}")
        else:
            # 回归任务指标
            mse = np.mean((all_preds - all_labels) ** 2)
            rmse = np.sqrt(mse)
            mae = np.mean(np.abs(all_preds - all_labels))
            r2 = 1 - (np.sum((all_labels - all_preds) ** 2) / np.sum((all_labels - np.mean(all_labels)) ** 2))
            
            metrics['mse'] = mse
            metrics['rmse'] = rmse
            metrics['mae'] = mae
            metrics['r2'] = r2
            print(f"均方误差 (MSE): {mse:.4f}")
            print(f"均方根误差 (RMSE): {rmse:.4f}")
            print(f"平均绝对误差 (MAE): {mae:.4f}")
            print(f"R² 分数: {r2:.4f}")
        
        return metrics
    
    def plot_training_curve(self):
        """绘制训练损失曲线"""
        plt.figure(figsize=(10, 6))
        plt.plot(range(1, len(self.train_losses)+1), self.train_losses, label='训练损失')
        if self.val_losses:
            plt.plot(range(1, len(self.val_losses)+1), self.val_losses, label='验证损失')
        plt.xlabel('轮次')
        plt.ylabel('损失')
        plt.title('训练和验证损失曲线')
        plt.legend()
        plt.grid(True)
        plt.show()

# 可视化训练前后差异的函数
def plot_predictions(y_true, y_pred, is_classification=False, feature=None, feature_name="特征"):
    """
    绘制真实值与预测值的对比图
    :param y_true: 真实标签
    :param y_pred: 预测结果
    :param is_classification: 是否为分类任务
    :param feature: 用于散点图的特征（可选）
    :param feature_name: 特征名称
    """
    plt.figure(figsize=(12, 5))
    
    if feature is not None:
        # 绘制特征与目标值的散点图（训练前）
        plt.subplot(1, 2, 1)
        plt.scatter(feature, y_true, alpha=0.5)
        plt.xlabel(feature_name)
        plt.ylabel('真实值')
        plt.title('训练前: ' + feature_name + ' vs 真实值')
        plt.grid(True)
        
        # 绘制特征与预测值的散点图（训练后）
        plt.subplot(1, 2, 2)
        plt.scatter(feature, y_pred, alpha=0.5, color='orange')
        plt.xlabel(feature_name)
        plt.ylabel('预测值')
        plt.title('训练后: ' + feature_name + ' vs 预测值')
        plt.grid(True)
    else:
        # 如果没有特征，直接对比真实值和预测值
        plt.scatter(range(len(y_true)), y_true, alpha=0.5, label='真实值')
        plt.scatter(range(len(y_pred)), y_pred, alpha=0.5, color='orange', label='预测值')
        plt.xlabel('样本索引')
        plt.ylabel('值')
        plt.title('真实值 vs 预测值')
        plt.legend()
        plt.grid(True)
    
    plt.tight_layout()
    plt.show()

    # 对于回归任务，额外绘制预测值 vs 真实值的散点图
    if not is_classification:
        plt.figure(figsize=(8, 6))
        plt.scatter(y_true, y_pred, alpha=0.5)
        plt.plot([min(y_true), max(y_true)], [min(y_true), max(y_true)], 'r--')  # 理想线
        plt.xlabel('真实值')
        plt.ylabel('预测值')
        plt.title('预测值 vs 真实值')
        plt.grid(True)
        plt.show()


In [None]:
# 假设你已经有了预处理好的X和y数据文件

# 1. 线性回归示例（加州房价数据集）
# 创建数据集实例
housing_dataset = CustomDataset(
    x_path='input.csv', 
    y_path='output.csv',
    is_classification=False
)

# 获取数据加载器
train_loader = housing_dataset.get_dataloader(batch_size=64)

# 创建模型
housing_model = MLModel(
    input_dim=housing_dataset.get_feature_dim(),
    is_classification=False
)

# 训练模型
housing_model.fit(train_loader, epochs=100, lr=0.001)

# 绘制训练曲线
housing_model.plot_training_curve()

# 评估模型
metrics = housing_model.evaluate(train_loader)

# 获取预测结果
y_pred = housing_model.predict(housing_dataset.x_data)
y_true = housing_dataset.y_data

# 绘制训练前后的差异（使用第一个特征进行可视化）
first_feature = housing_dataset.x_data.iloc[:, 0].values
plot_predictions(
    y_true, 
    y_pred, 
    is_classification=False,
    feature=first_feature,
    feature_name="特征1"
)


# 2. 逻辑回归示例（泰坦尼克号数据集）
# 创建数据集实例
titanic_dataset = CustomDataset(
    x_path='titanic_x.csv', 
    y_path='titanic_y.csv',
    is_classification=True
)

# 获取数据加载器
train_loader = titanic_dataset.get_dataloader(batch_size=32)

# 创建模型
titanic_model = MLModel(
    input_dim=titanic_dataset.get_feature_dim(),
    is_classification=True
)

# 训练模型
titanic_model.fit(train_loader, epochs=150, lr=0.01)

# 绘制训练曲线
titanic_model.plot_training_curve()

# 评估模型
metrics = titanic_model.evaluate(train_loader)

# 获取预测结果
y_pred = housing_model.predict(titanic_dataset.x_data)
y_true = titanic_dataset.y_data

# 绘制训练前后的差异
first_feature = titanic_dataset.x_data.iloc[:, 0].values
plot_predictions(
    y_true, 
    y_pred, 
    is_classification=True,
    feature=first_feature,
    feature_name="特征1"
)


AssertionError: Size mismatch between tensors