In [1]:
import os
import numpy as np
from PIL import Image
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix
from tqdm import tqdm


定义训练会用到的几个计算函数

In [2]:
def get_segmentation_results(predictions):
    segmentation_results = torch.argmax(predictions, dim=1)
    return segmentation_results

def get_predictions(output):
    bs,c,h,w = output.size()
    values, indices = output.cpu().max(1)
    indices = indices.view(bs,h,w) # bs x h x w
    return indices

def calculate_miou(predictions, labels, num_classes):
    miou_list = []
    for i in range(predictions.shape[0]):
        pred = predictions[i]
        label = labels[i]
        confusion = confusion_matrix(label.flatten(), pred.flatten(), labels=range(num_classes))
        TP = np.diag(confusion)
        FP = np.sum(confusion, axis=0) - TP
        FN = np.sum(confusion, axis=1) - TP
        union = TP + FP + FN
        iou = TP / union
        miou = np.nanmean(iou)
        miou_list.append(miou)
    return np.mean(miou_list)

def extract_middle(images, size):
    _,w,h = images.shape
    # 计算起始和结束索引
    start = (w - size) // 2
    end = start + size

    # 提取中间的200x200区域
    middle_images = images[:, start:end, start:end]

    return middle_images

通过随机搜索算法自动寻找图像中最佳的4个区域坐标，使得从这些区域提取的特征能够训练出性能最好的分割模型。
原始PNG → 随机搜索 → 提取4个区域 → 计算差分 → (2,300,300)数据 → 训练分割模型

In [5]:


class CustomNPYDataset(Dataset):
    def __init__(self, data_dir, label_dir, transform=None):
        """
        初始化数据集
        
        参数:
        data_dir: 存放(2,300,300)格式NPY数据的文件夹路径
        label_dir: 存放分割掩码标签NPY文件的文件夹路径
        transform: 用于数据增强和预处理的转换操作
        """
        self.data_dir = data_dir
        self.label_dir = label_dir
        self.transform = transform
        
        # 获取数据文件名列表（.npy文件）
        self.data_files = [f for f in os.listdir(data_dir) if f.endswith('.npy')]
        # 获取标签文件名列表（.npy文件）
        self.label_files = [f for f in os.listdir(label_dir) if f.endswith('.npy')]
        
        # 确保数据和标签文件数量一致且文件名对应
        self.data_files.sort()
        self.label_files.sort()
        
        if len(self.data_files) != len(self.label_files):
            print(f"警告: 数据文件数量({len(self.data_files)})与标签文件数量({len(self.label_files)})不一致")
        
        # 只保留有对应标签的数据文件
        self.valid_indices = []
        for i, data_file in enumerate(self.data_files):
            # 假设数据和标签文件名相同或可以对应
            label_file = data_file  # 或者根据您的命名规则调整
            
            if label_file in self.label_files:
                self.valid_indices.append(i)
            else:
                print(f"警告: 数据文件 {data_file} 没有对应的标签文件")
        
        print(f"有效数据-标签对数量: {len(self.valid_indices)}")

    def pad(self, M):
        bs,c,H,W = M.size()
        # 计算需要补零的宽度和高度
        pad_height = np.abs((H - W) // 2)  # 向上取整
        pad_width = np.abs((W - H) // 2)  # 向上取整
        # 如果高度小于宽度，我们需要在高度上补零
        if H < W:
            M = F.pad(M, (0, 0, pad_width, pad_width), 'constant', 0)
        # 如果宽度小于高度，我们需要在宽度上补零
        elif W < H:
            M = F.pad(M, (pad_height, pad_height, 0, 0), 'constant', 0)
        return M
    
    def __len__(self):
        """
        返回数据集中有效样本的数量
        """
        return len(self.valid_indices)

    def __getitem__(self, idx):
        """
        返回一个样本的数据和标签
        """
        # 获取有效索引
        valid_idx = self.valid_indices[idx]
        
        # 获取文件名
        data_file = self.data_files[valid_idx]
        label_file = data_file  # 假设标签文件名与数据文件名相同
        
        # 构建完整路径
        data_path = os.path.join(self.data_dir, data_file)
        label_path = os.path.join(self.label_dir, label_file)
        
        # 加载数据 (形状应为 (2, 300, 300))
        data = np.load(data_path)
        
        # 加载标签 (形状应为 (300, 300) 或类似)
        label = np.load(label_path)
        label = torch.from_numpy(label)
        label = self.pad(label.unsqueeze(0).unsqueeze(0)).squeeze(0).squeeze(0)
        label = F.interpolate(label.unsqueeze(0).unsqueeze(0), size=(300, 300), mode='nearest').squeeze(0).squeeze(0)
        # 转换为PyTorch张量
        data_tensor = torch.from_numpy(data).float()
        label_tensor = label.long()
        
        # 确保数据形状正确
        if data_tensor.dim() == 2:
            # 如果是单通道，添加通道维度
            data_tensor = data_tensor.unsqueeze(0)
        
        # 确保标签形状正确
        if label_tensor.dim() == 3 and label_tensor.shape[0] == 1:
            # 如果标签有单通道维度，移除它
            label_tensor = label_tensor.squeeze(0)
        
        # 应用转换操作
        if self.transform:
            data_tensor = self.transform(data_tensor)
        
        return data_tensor, label_tensor


class depthwise_separable_conv(nn.Module):
    """MobileNet V1风格的深度可分离卷积块"""
    def __init__(self, in_channels, out_channels, kernel_size=3, stride=1, padding=1):
        super().__init__()
        # 深度卷积
        self.depthwise = nn.Conv2d(
            in_channels,
            in_channels,
            kernel_size=kernel_size,
            stride=stride,
            padding=padding,
            groups=in_channels,  # 关键参数：分组数=输入通道数
            bias=False
        )
        # 逐点卷积
        self.pointwise = nn.Conv2d(
            in_channels,
            out_channels,
            kernel_size=1,
            stride=1,
            padding=0,
            bias=False
        )
        self.bn1 = nn.BatchNorm2d(in_channels)
        self.bn2 = nn.BatchNorm2d(out_channels)

    def forward(self, x):
        x = F.relu(self.bn1(self.depthwise(x)))  # 深度卷积+BN+ReLU
        x = F.relu(self.bn2(self.pointwise(x)))  # 逐点卷积+BN+ReLU
        return x

class double_conv2d_bn(nn.Module):
    """双深度可分离卷积块（替换原始双标准卷积）"""
    def __init__(self, in_channels, out_channels, kernel_size=3, strides=1, padding=1):
        super().__init__()
        # 第一个深度可分离卷积
        self.conv1 = depthwise_separable_conv(
            in_channels, 
            out_channels,
            kernel_size=kernel_size,
            stride=strides,
            padding=padding
        )
        # 第二个深度可分离卷积
        self.conv2 = depthwise_separable_conv(
            out_channels,
            out_channels,
            kernel_size=kernel_size,
            stride=strides,
            padding=padding
        )

    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        return x

class deconv2d_bn(nn.Module):
    """反卷积块（保持原结构不变）"""
    def __init__(self, in_channels, out_channels, kernel_size=2, strides=2):
        super().__init__()
        self.conv1 = nn.ConvTranspose2d(
            in_channels, out_channels,
            kernel_size=kernel_size,
            stride=strides, bias=True
        )
        self.bn1 = nn.BatchNorm2d(out_channels)

    def forward(self, x):
        return F.relu(self.bn1(self.conv1(x)))


class NewUnet(nn.Module):
    def __init__(self):
        super(NewUnet, self).__init__()
        # self.mycustomlayer = MyCustomLayer()
        # self.layer1_conv = double_conv2d_bn(1, 12)
        self.pointwise = nn.Conv2d(
            2,
            12,
            kernel_size=1,
            stride=1,
            padding=0,
            bias=False
        )
        self.layer2_conv = double_conv2d_bn(12, 24)
        self.layer3_conv = double_conv2d_bn(24, 12)
        self.layer4_conv = nn.Conv2d(12,3,kernel_size=3,
                                     stride=1,padding=1,bias=True)
        
        self.deconv1 = deconv2d_bn(24, 12)
        
        self.sigmoid = nn.Sigmoid()
        
    def forward(self, x):
        # conv1, PSF, aft_conv = self.mycustomlayer(x)          #用于训练MetaUnet,后接conv1_2 = self.pointwise(conv1)
        #conv1_2 = self.layer1_conv(x)                          #用于训练传统U-Net 
        conv1_2 = self.pointwise(x)
        pool1 = F.max_pool2d(conv1_2, 2)
        
        conv2 = self.layer2_conv(pool1)
        convt1 = self.deconv1(conv2)
        concat1 = torch.cat([convt1, conv1_2], dim=1)
        conv3 = self.layer3_conv(concat1)
        
        conv4 = self.layer4_conv(conv3)
        outp = self.sigmoid(conv4)
        return outp

# ===============================
# 自定义训练和评估函数
# ===============================

class CustomSegmentationTrainer:
    def __init__(self, model, device='cuda' if torch.cuda.is_available() else 'cpu'):
        self.model = model
        self.device = device
        self.model.to(device)
        
    def train_epoch(self, dataloader, optimizer, criterion):
        """训练一个epoch"""
        self.model.train()
        running_loss = 0.0
        
        for batch_idx, (data, target) in enumerate(dataloader):
            data, target = data.to(self.device), target.to(self.device)
            
            optimizer.zero_grad()
            output = self.model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()
            
        return running_loss / len(dataloader)
    
    def evaluate(self, dataloader, criterion):
        """评估模型"""
        self.model.eval()
        running_loss = 0.0
        total_pixels = 0
        correct_pixels = 0
        
        with torch.no_grad():
            for data, target in dataloader:
                data, target = data.to(self.device), target.to(self.device)
                output = self.model(data)
                loss = criterion(output, target)
                running_loss += loss.item()
                
                # 计算像素准确率
                pred = output.argmax(dim=1)
                correct_pixels += (pred == target).sum().item()
                total_pixels += target.numel()
        
        accuracy = correct_pixels / total_pixels
        return running_loss / len(dataloader), accuracy


    
def calculate_miou(predictions, labels, num_classes):
    miou_list = []
    for i in range(predictions.shape[0]):
        pred = predictions[i]
        label = labels[i]
        confusion = confusion_matrix(label.flatten(), pred.flatten(), labels=range(num_classes))
        TP = np.diag(confusion)
        FP = np.sum(confusion, axis=0) - TP
        FN = np.sum(confusion, axis=1) - TP
        union = TP + FP + FN
        iou = TP / union
        miou = np.nanmean(iou)
        miou_list.append(miou)
    return np.mean(miou_list)

# ===============================
# 使用分割模型的评估函数
# ===============================

def evaluate_regions_with_segmentation_model(image_paths, labels, regions, model_type='segmentation', 
                                           test_size=0.2, model_params=None, num_epochs=5):
    """
    使用分割模型评估region精度
    
    参数:
    image_paths: 图像路径列表
    labels: 标签列表（这里labels参数可能不需要，因为标签从文件加载）
    regions: 区域坐标
    model_type: 模型类型
    test_size: 测试集比例
    model_params: 模型参数
    num_epochs: 训练轮数
    """
    # 创建临时目录来保存提取的数据
    import tempfile
    import shutil
    
    temp_dir = tempfile.mkdtemp()
    data_temp_dir = os.path.join(temp_dir, 'data')
    label_temp_dir = os.path.join(temp_dir, 'labels')
    os.makedirs(data_temp_dir)
    os.makedirs(label_temp_dir)
    
    try:
        # 提取所有图像的区域并保存为npy文件
        print("提取区域数据...")
        for i, img_path in enumerate(tqdm(image_paths)):
            result_array = extract_four_regions(img_path, regions=regions)
            if result_array is not None:
                # 保存数据
                data_path = os.path.join(data_temp_dir, f'data_{i:06d}.npy')
                np.save(data_path, result_array)
                
                # 这里需要处理标签 - 假设labels包含标签文件路径
                # 根据你的数据格式调整
                if i < len(labels):
                    # 复制标签文件到临时目录
                    label_src = labels[i] if isinstance(labels[i], str) else None
                    if label_src and os.path.exists(label_src):
                        label_dst = os.path.join(label_temp_dir, f'data_{i:06d}.npy')
                        shutil.copy2(label_src, label_dst)
        
        # 创建数据集
        dataset = CustomNPYDataset(data_temp_dir, label_temp_dir)
        
        if len(dataset) == 0:
            return 0
        
        # 分割训练测试集
        dataset_size = len(dataset)
        train_size = int((1 - test_size) * dataset_size)
        test_size = dataset_size - train_size
        train_dataset, test_dataset = torch.utils.data.random_split(
            dataset, [train_size, test_size], 
            generator=torch.Generator().manual_seed(42)
        )
        
        # 创建数据加载器
        train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)
        test_loader = DataLoader(test_dataset, batch_size=4, shuffle=False)
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        # 初始化模型
        model = NewUnet()  
        trainer = CustomSegmentationTrainer(model)
        
        # 定义优化器和损失函数
        optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
        class_weights = torch.tensor([1.0, 4.0, 20.0], dtype=torch.float32)  # 示例权重，根据类别频率调整
        class_weights = class_weights.to(device)  # 将权重移动到设备上
        criterion = nn.CrossEntropyLoss(weight=class_weights)  # 使用加权交叉熵损失
        # 训练模型
        print("训练分割模型...")
        for epoch in range(num_epochs):
            train_loss = trainer.train_epoch(train_loader, optimizer, criterion)
            print(f"Epoch {epoch+1}/{num_epochs}, Loss: {train_loss:.4f}")
        
        # 评估模型
        test_loss, test_accuracy = trainer.evaluate(test_loader, criterion)
        miou_list=[]
        val_loss=0.0
        with torch.no_grad():
            for data in test_loader:
                inputs, labels = data
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                predictions = get_segmentation_results(outputs)
                predictions = predictions.cpu().numpy()
                labels = labels.cpu().numpy()
                miou = calculate_miou(predictions, labels, num_classes=3)
                miou_list.append(miou)
                val_loss += loss.item()
                mean_miou = np.mean(miou_list)
        
        print(f"测试准确率: {test_accuracy:.4f}, 自定义指标: {mean_miou:.4f}")
        
        return mean_miou,model
        
    finally:
        # 清理临时文件
        shutil.rmtree(temp_dir)

# ===============================
# 数据加载和处理函数（保持不变）
# ===============================

def load_data_from_folder(data_folder):
    """从文件夹加载图像和标签"""
    image_paths = []
    label_paths = []
    
    if not os.path.exists(data_folder):
        raise ValueError(f"数据文件夹不存在: {data_folder}")
    
    png_files = [f for f in os.listdir(data_folder) if f.lower().endswith('.png')]
    
    print(f"找到 {len(png_files)} 个PNG文件")
    
    for png_file in sorted(png_files):
        img_path = os.path.join(data_folder, png_file)
        image_paths.append(img_path)
        
        base_name = os.path.splitext(png_file)[0]
        npy_file = base_name + '.npy'
        label_path = os.path.join(data_folder, npy_file)
        
        if os.path.exists(label_path):
            label_paths.append(label_path)
        else:
            print(f"警告: 找不到标签文件 {label_path}")
            label_paths.append(None)
    
    valid_indices = [i for i, label_path in enumerate(label_paths) if label_path is not None]
    image_paths = [image_paths[i] for i in valid_indices]
    label_paths = [label_paths[i] for i in valid_indices]
    
    print(f"有效数据对: {len(image_paths)}")
    
    return image_paths, label_paths

def load_labels(label_paths):
    """从NPY文件加载标签"""
    labels = []
    
    for label_path in tqdm(label_paths, desc="加载标签"):
        try:
            label = np.load(label_path)
            if isinstance(label, np.ndarray) and label.size > 1:
                label = label.item() if label.size == 1 else label[0]
            labels.append(label)
        except Exception as e:
            print(f"加载标签失败 {label_path}: {e}")
            labels.append(None)
    
    valid_indices = [i for i, label in enumerate(labels) if label is not None]
    valid_labels = [labels[i] for i in valid_indices]
    
    return valid_labels, valid_indices

# ===============================
# 使用分割模型的随机搜索函数
# ===============================

def efficient_random_search_with_segmentation_model(data_folder, base_regions, n_iter=10, max_offset=100, 
                                                   model_type='segmentation', num_epochs=3):
    """
    使用分割模型进行高效随机搜索
    """
    # 从文件夹加载数据
    image_paths, label_paths = load_data_from_folder(data_folder)
    
    if len(image_paths) == 0:
        raise ValueError("没有有效的数据对可用于训练")
    
    print(f"最终有效数据: {len(image_paths)} 个图像-标签对")
    
    best_metric_score = 0
    best_regions = base_regions.copy()
    history = []
    
    for i in tqdm(range(n_iter), desc="随机搜索"):
        # 生成随机偏移
        new_regions = []
        for j, (x1, y1, x2, y2) in enumerate(base_regions):
            dx1 = np.random.randint(-max_offset, max_offset+1)
            dy1 = np.random.randint(-max_offset, max_offset+1)
            dx2 = np.random.randint(-max_offset, max_offset+1)
            dy2 = np.random.randint(-max_offset, max_offset+1)
            
            new_region = (
                max(0, x1 + dx1),
                max(0, y1 + dy1),
                x2 + dx2,
                y2 + dy2
            )
            new_regions.append(new_region)
        
        # 使用分割模型评估
        metric_score,model = evaluate_regions_with_segmentation_model(
            image_paths, label_paths, new_regions, 
            model_type=model_type, num_epochs=num_epochs
        )
        history.append((new_regions, metric_score))
        
        if metric_score > best_metric_score:
            best_metric_score = metric_score
            best_regions = new_regions.copy()
            print(f"迭代 {i}: 新最佳指标得分 {best_metric_score:.4f}")
            torch.save(model, './best_model.pth')
    return best_regions, best_metric_score, history

# ===============================
# 区域提取函数（保持不变）
# ===============================

def extract_four_regions(image_path, output_path=None, regions=None):
    """从PNG图像中截取四个区域并返回result_array"""
    try:
        img = Image.open(image_path)
    except Exception as e:
        print(f"无法打开图像: {e}")
        return None
    
    img_array = np.array(img)
    
    if regions is None:
        regions = [
            (300,3550,1800,5050),
    (100,300,1600,1800),
    (3500,3350,5000,4850),
    (3400,150,4900,1650),
        ]
    
    if len(regions) != 4:
        print("需要提供四个区域坐标")
        return None
    
    extracted_regions = []
    for i, (x1, y1, x2, y2) in enumerate(regions):
        if (x1 < 0 or y1 < 0 or x2 > img_array.shape[1] or y2 > img_array.shape[0]):
            print(f"区域 {i+1} 超出图像范围")
            continue
        
        region = img_array[y1:y2, x1:x2]

        pad_width=0
        top_pad = np.tile(region[0:1, :], (pad_width, 1))
        bottom_pad = np.tile(region[-1:, :], (pad_width, 1))
        region_padded = np.vstack([top_pad, region, bottom_pad])
        
        left_pad = np.tile(region_padded[:, 0:1], (1, pad_width))
        right_pad = np.tile(region_padded[:, -1:], (1, pad_width))
        region_padded = np.hstack([left_pad, region_padded, right_pad])

        if region_padded.shape[0] != 300 or region_padded.shape[1] != 300:
            region_img = Image.fromarray(region_padded)
            region_img = region_img.resize((300, 300))
            region = np.array(region_img)
        
        extracted_regions.append(region)
    
    if len(extracted_regions) == 4:
        array = np.stack(extracted_regions, axis=0).astype(np.float32)
        diff1 = array[0]- array[2]
        diff2 = array[1]-array[3]
        
        result_array = np.stack([diff1, diff2], axis=0)
        
        if output_path:
            np.save(output_path, result_array)
            print(f"已保存到: {output_path}")
        
        return result_array
    else:
        print("无法提取四个有效区域")
        return None

def extract_regions_from_array(img_array, regions):
    """从已加载的图像数组中提取区域"""
    extracted_regions = []
    for i, (x1, y1, x2, y2) in enumerate(regions):
        if (x1 < 0 or y1 < 0 or x2 > img_array.shape[1] or y2 > img_array.shape[0]):
            return None
        
        region = img_array[y1:y2, x1:x2]

        pad_width=0
        top_pad = np.tile(region[0:1, :], (pad_width, 1))
        bottom_pad = np.tile(region[-1:, :], (pad_width, 1))
        region_padded = np.vstack([top_pad, region, bottom_pad])
        
        left_pad = np.tile(region_padded[:, 0:1], (1, pad_width))
        right_pad = np.tile(region_padded[:, -1:], (1, pad_width))
        region_padded = np.hstack([left_pad, region_padded, right_pad])

        if region_padded.shape[0] != 300 or region_padded.shape[1] != 300:
            region_img = Image.fromarray(region_padded)
            region_img = region_img.resize((300, 300))
            region = np.array(region_img)
        
        extracted_regions.append(region)
    
    if len(extracted_regions) == 4:
        array = np.stack(extracted_regions, axis=0).astype(np.float32)
        diff1 = array[0]- array[2]
        diff2 = array[1]-array[3]
        
        result_array = np.stack([diff1, diff2], axis=0)
        
        return result_array
    else:
        return None

# ===============================
# 主函数
# ===============================

def main():
    # 数据文件夹路径
    data_folder = r"C:\Users\86155\Desktop\20250709\twolayer3\processed"
    
    # 基础region坐标
    base_regions = [
       (300,3550,1800,5050),
    (100,300,1600,1800),
    (3500,3350,5000,4850),
    (3400,150,4900,1650),
    ]
   
    

    print("初始region:", base_regions)
    
    # 使用分割模型进行搜索
    print("开始使用分割模型进行随机搜索...")
    best_regions, best_metric_score, history = efficient_random_search_with_segmentation_model(
        data_folder, base_regions, n_iter=50, max_offset=50, num_epochs=50
    )
    # max(offset)= 70
    print(f"最佳结果: 指标得分={best_metric_score:.4f}")
    print(f"最佳region: {best_regions}")
    
    # 保存最佳结果
    np.save("best_regions_segmentation.npy", np.array(best_regions))
    print("最佳region已保存到 best_regions_segmentation.npy")

if __name__ == "__main__":
    main()

初始region: [(300, 3550, 1800, 5050), (100, 300, 1600, 1800), (3500, 3350, 5000, 4850), (3400, 150, 4900, 1650)]
开始使用分割模型进行随机搜索...
找到 806 个PNG文件
有效数据对: 806
最终有效数据: 806 个图像-标签对


随机搜索:   0%|          | 0/50 [00:00<?, ?it/s]

提取区域数据...


 73%|███████▎  | 591/806 [02:37<00:57,  3.74it/s]
随机搜索:   0%|          | 0/50 [02:39<?, ?it/s]


KeyboardInterrupt: 

In [None]:
class UnetS1(nn.Module):
    def __init__(self):
        super(UnetS1, self).__init__()
        # self.mycustomlayer = MyCustomLayer()
        self.layer1_conv = double_conv2d_bn(1, 12)
        self.pointwise = nn.Conv2d(
            2,
            12,
            kernel_size=1,
            stride=1,
            padding=0,
            bias=False
        )
        self.layer2_conv = double_conv2d_bn(12, 24)
        self.layer3_conv = double_conv2d_bn(24, 12)
        # self.layer3_conv = double_conv2d_bn(12, 12)
        self.layer4_conv = nn.Conv2d(12,3,kernel_size=3,
                                     stride=1,padding=1,bias=True)
        
        self.deconv1 = deconv2d_bn(24, 12)
        
        self.sigmoid = nn.Sigmoid()
        
    def forward(self, x):      
        conv1_2 = self.layer1_conv(x)
        pool1 = F.max_pool2d(conv1_2, 2)
        
        conv2 = self.layer2_conv(pool1)
        
        convt1 = self.deconv1(conv2)
        concat1 = torch.cat([convt1, conv1_2], dim=1)
        conv3 = self.layer3_conv(concat1)
        
        conv4 = self.layer4_conv(conv3)
        outp = self.sigmoid(conv4)
        return outp
model = UnetS1()
inp = torch.rand(10, 1, 300, 300)
outp = model(inp)
print("输出形状:", outp.shape)

输出形状: torch.Size([10, 3, 300, 300])


数据准备，为图像数据集创建坐标标签，适用于需要空间位置信息的计算机视觉任务。

In [4]:
import pandas as pd
import os

# 生成所有坐标点（按顺序）
coordinates = []
for x in range(0, 31):  # -30 到 30
    for y in range(0, 31):  # -30 到 30
        x_pos=2*(x-15)
        y_pos=2*(y-15)
        coordinates.append((x_pos, y_pos))

print(f"总坐标点数: {len(coordinates)}")  # 61 * 61 = 3721

# 创建映射关系（假设坐标和图片是按相同顺序排列的）
data = []
for i, (x, y) in enumerate(coordinates, 1):
    img_name = f"frame_{i}.png"  # frame_1.png, frame_2.png, ..., frame_961.png
    img_path = os.path.join(r'C:\Users\86155\Desktop\20250709\dataset_final\train\data_ori', img_name)
    
    data.append({
        'image_id': i,
        'image_path': img_path,
        'x_coord': x,
        'y_coord': y,
        'coord_label': f"({x},{y})"
    })

# 创建DataFrame并保存
df = pd.DataFrame(data)
df.to_csv('coordinate_mapping.csv', index=False)
print("坐标映射文件创建完成")
print(df.head())
print("...")
print(df.tail())

总坐标点数: 961
坐标映射文件创建完成
   image_id                                         image_path  x_coord  \
0         1  C:\Users\86155\Desktop\20250709\dataset_final\...      -30   
1         2  C:\Users\86155\Desktop\20250709\dataset_final\...      -30   
2         3  C:\Users\86155\Desktop\20250709\dataset_final\...      -30   
3         4  C:\Users\86155\Desktop\20250709\dataset_final\...      -30   
4         5  C:\Users\86155\Desktop\20250709\dataset_final\...      -30   

   y_coord coord_label  
0      -30   (-30,-30)  
1      -28   (-30,-28)  
2      -26   (-30,-26)  
3      -24   (-30,-24)  
4      -22   (-30,-22)  
...
     image_id                                         image_path  x_coord  \
956       957  C:\Users\86155\Desktop\20250709\dataset_final\...       30   
957       958  C:\Users\86155\Desktop\20250709\dataset_final\...       30   
958       959  C:\Users\86155\Desktop\20250709\dataset_final\...       30   
959       960  C:\Users\86155\Desktop\20250709\dataset_final\... 

用于在图像分割结果中识别特定类别的连通区域，并计算这些区域的质心坐标。
only_largest=True参数用于指定只返回面积最大的连通区域的质心。

In [None]:
import pandas as pd
from scipy import ndimage
import json
import cv2
def find_class_centroids(segmentation_map, target_class=2, min_area=10, only_largest=True):
    """
    在分割图中寻找特定类别（默认为2）的连通区域并计算质心
    
    Args:
        segmentation_map: 分割结果图 (H, W)
        target_class: 目标类别值
        min_area: 最小区域面积（像素数）
        only_largest: 如果为True，只返回最大区域的质心；否则返回所有区域的质心
    
    Returns:
        centroids: 质心坐标列表 [(x1, y1), ...] （根据only_largest参数返回一个或多个）
    """
    # 创建目标类别的二值掩码
    binary_mask = (segmentation_map == target_class).astype(np.uint8)
    
    if np.sum(binary_mask) == 0:
        return []
    
    # 使用连通组件分析找出所有独立区域
    labeled_array, num_features = ndimage.label(binary_mask)
    
    regions_info = []  # 存储区域信息：面积、质心、掩码
    
    for label in range(1, num_features + 1):
        # 提取当前标签的区域
        region_mask = (labeled_array == label).astype(np.uint8)
        area = np.sum(region_mask)
        
        # 过滤面积太小的区域
        if area < min_area:
            continue
        
        # 计算区域的矩（用于质心计算）
        M = cv2.moments(region_mask)
        
        if M['m00'] != 0:
            cx = M['m10'] / M['m00']
            cy = M['m01'] / M['m00']
            
            regions_info.append({
                'label': label,
                'area': area,
                'centroid': (cx, cy),
                'region_mask': region_mask.copy()  # 保存掩码，便于后续可视化
            })
    
    if not regions_info:
        return []
    
    # 根据 only_largest 参数决定返回哪些质心
    if only_largest:
        # 找出面积最大的区域
        largest_region = max(regions_info, key=lambda x: x['area'])
        # 打印调试信息
        # print(f"找到{len(regions_info)}个{target_class}类区域，选取最大区域(面积={largest_region['area']})的质心")
        return [largest_region['centroid']], regions_info  # 返回质心列表和区域信息
    else:
        # 返回所有区域的质心
        centroids = [info['centroid'] for info in regions_info]
        return centroids, regions_info

批量处理测试图像，使用训练好的分割模型进行预测，保存分割结果、可视化图像，并提取特定类别（默认类别2）的区域质心信息。

In [None]:
def save_all_segmentations(model, test_image_paths, regions, 
                           output_dir='./segmentation_results', device='cpu',only_largest_class2=True):
    """
    批量保存所有分割结果
    
    Args:
        model: 训练好的模型
        test_image_paths: 测试图像路径列表
        regions: 区域坐标列表
        output_dir: 输出目录
        device: 计算设备
    """
    model.eval()
    model.to(device)
    
    # 创建输出目录
    os.makedirs(output_dir, exist_ok=True)
    npy_dir = os.path.join(output_dir, 'npy_files')
    vis_dir = os.path.join(output_dir, 'visualizations')
    os.makedirs(npy_dir, exist_ok=True)
    os.makedirs(vis_dir, exist_ok=True)
    
    centroids_records = []  # 记录所有质心结果
    
    for idx, img_path in enumerate(test_image_paths):
        try:
            # 1. 提取特征并预测
            input_array = extract_four_regions(img_path, regions=regions)
            if input_array is None:
                continue
                
            input_tensor = torch.from_numpy(input_array).float().unsqueeze(0).to(device)
            
            with torch.no_grad():
                output = model(input_tensor)
                predictions = get_segmentation_results(output)
                predictions_np = predictions.squeeze().cpu().numpy()
            
            # 2. 保存为.npy文件
            base_name = os.path.splitext(os.path.basename(img_path))[0]
            npy_path = os.path.join(npy_dir, f'{base_name}_seg.npy')
            np.save(npy_path, predictions_np)
            
            # 3. 计算数值2区域的质心
            centroids, regions_info = find_class_centroids(
                predictions_np, 
                target_class=2, 
                min_area=10, 
                only_largest=only_largest_class2  # 使用参数控制
            )
            
            # 记录质心信息
            for centroid in centroids:
                centroids_records.append({
                    'image_name': os.path.basename(img_path),
                    'class': 2,
                    'centroid_x': float(centroid[0]),
                    'centroid_y': float(centroid[1]),
                    'is_largest': True,  # 新增字段，标记是否为最大区域
                    'region_count': len(regions_info) if regions_info else 0
                })
            
            # 修改这里：传入regions_info给可视化函数
            
            if (idx + 1) % 10 == 0:
                print(f"已处理 {idx + 1}/{len(test_image_paths)} 张图像")
                
        except Exception as e:
            print(f"处理 {img_path} 时出错: {e}")
            continue
    
    # 5. 保存质心结果到CSV
    if centroids_records:
        df_centroids = pd.DataFrame(centroids_records)
        csv_path = os.path.join(output_dir, 'class2_centroids.csv')
        df_centroids.to_csv(csv_path, index=False, encoding='utf-8-sig')
        print(f"\n质心结果已保存至: {csv_path}")
        print(f"共找到 {len(df_centroids)} 个质心点")
    
    # 6. 保存汇总统计信息
    stats = {
        'total_images': len(test_image_paths),
        'processed_images': idx + 1,
        'total_centroids_found': len(centroids_records),
        'output_directory': output_dir
    }
    
    stats_path = os.path.join(output_dir, 'processing_stats.json')
    with open(stats_path, 'w', encoding='utf-8') as f:
        json.dump(stats, f, indent=2, ensure_ascii=False)
    
    print(f"\n所有分割结果已保存至: {output_dir}")
    print(f"NPY文件: {npy_dir}")
    
    return df_centroids

使用优化得到的最佳区域和训练好的分割模型，对测试数据集进行批量预测、保存分割结果，并提取特定类别区域的质心信息。

In [None]:
def main_with_best_results():
    """使用搜索得到的最佳区域和模型进行预测、保存和质心搜索"""
    # [原有代码：加载路径、区域和模型...]
    data_folder = r"C:\Users\86155\Desktop\20250709\dataset_final\train\data_ori"
    best_regions_path = "best_regions_segmentation_U2.npy"
    if os.path.exists(best_regions_path):
        best_regions = np.load(best_regions_path)
        print(f"加载最佳区域: {best_regions}")
    else:
        print("找不到最佳区域文件，使用默认区域")
        best_regions = [
            (5, 3350, 1605, 4950),
    (80, 5, 1680, 1605),
    (3220, 3520, 4820, 5120),
    (3400, 140, 5000, 1740),
        ]
    best_model_path = "./best_modelfind_U2.pth"
    
    # 加载测试数据
    test_image_paths, test_label_paths = load_data_from_folder(data_folder)
    
    if len(test_image_paths) == 0:
        print("没有找到测试数据")
        return
    
    print(f"找到 {len(test_image_paths)} 个测试样本")
    
    # 新增：批量保存分割结果并搜索质心
    print("\n开始批量保存分割结果并进行质心搜索...")
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = torch.load(best_model_path, map_location=device, weights_only=False)
    
    df_centroids = save_all_segmentations(
        model=model,
        test_image_paths=test_image_paths,
        regions=best_regions,
        output_dir='./segmentation_results_data',
        device=device
    )
    
    # 可选：显示质心统计信息
    if df_centroids is not None and not df_centroids.empty:
        print("\n质心统计信息:")
        print(f"总质心数量: {len(df_centroids)}")
        print(f"涉及图像数量: {df_centroids['image_name'].nunique()}")
        
        # 每张图像的平均质心数
        avg_centroids = df_centroids.groupby('image_name').size().mean()
        print(f"每张图像平均质心数: {avg_centroids:.2f}")
        
        # 质心坐标范围
        print(f"X坐标范围: [{df_centroids['centroid_x'].min():.1f}, {df_centroids['centroid_x'].max():.1f}]")
        print(f"Y坐标范围: [{df_centroids['centroid_y'].min():.1f}, {df_centroids['centroid_y'].max():.1f}]")
    

执行质心坐标提取

In [None]:
if __name__ == "__main__":
    # 使用最佳结果进行预测和可视化
    main_with_best_results()

用于加载、匹配、检查和可视化角度数据（来自CSV文件，包含图像文件名和对应的角度）和坐标数据

In [None]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import PolynomialFeatures, StandardScaler
from sklearn.linear_model import LinearRegression, Ridge
from sklearn.model_selection import train_test_split, cross_val_score, KFold
from sklearn.metrics import mean_squared_error, mean_absolute_error
from scipy.stats import pearsonr
import seaborn as sns
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')

# 设置中文显示（如果需要）
plt.rcParams['font.sans-serif'] = ['SimHei']  # 用来正常显示中文标签
plt.rcParams['axes.unicode_minus'] = False  # 用来正常显示负号

class GazeDataProcessor:
    """视线数据处理和匹配类"""
    
    def __init__(self, angle_csv_path=None, coord_csv_path=None):
        """
        初始化处理器
        
        参数:
            angle_csv_path: 角度数据CSV文件路径
            coord_csv_path: 坐标数据CSV文件路径
        """
        self.angle_csv_path = angle_csv_path
        self.coord_csv_path = coord_csv_path
        self.df_angles = None
        self.df_coords = None
        self.df_merged = None
        self.eye_points = None
        self.screen_points = None
        self.filenames = None
        
    def load_angle_data(self, csv_path=None, sep=',', path_col='image_path', 
                       angle_x_col='angle_x', angle_y_col='angle_y'):
        """
        加载角度数据CSV
        
        参数:
            csv_path: CSV文件路径
            sep: 分隔符，默认为逗号
            path_col: 图像路径列名
            angle_x_col: X角度列名
            angle_y_col: Y角度列名
        """
        if csv_path is None:
            csv_path = self.angle_csv_path
            
        print(f"正在加载角度数据: {csv_path}")
        
        try:
            self.df_angles = pd.read_csv(csv_path, sep=sep)
            print(f"角度数据列: {list(self.df_angles.columns)}")
            
            # 重命名列以便统一处理
            if path_col in self.df_angles.columns:
                self.df_angles = self.df_angles.rename(columns={
                    path_col: 'image_path',
                    angle_x_col: 'angle_x',
                    angle_y_col: 'angle_y'
                })
            
            # 提取文件名
            self.df_angles['filename'] = self.df_angles['image_path'].apply(os.path.basename)
            
            print(f"成功加载 {len(self.df_angles)} 条角度数据")
            print(f"数据预览:")
            print(self.df_angles.head())
            
            return self.df_angles
            
        except Exception as e:
            print(f"加载角度数据失败: {e}")
            return None
    
    def load_coordinate_data(self, csv_path=None, sep=',', filename_col='image_name',
                           coord_x_col='coord_x', coord_y_col='coord_y'):
        """
        加载坐标数据CSV
        
        参数:
            csv_path: CSV文件路径
            sep: 分隔符，默认为逗号
            filename_col: 文件名列名
            coord_x_col: X坐标列名
            coord_y_col: Y坐标列名
        """
        if csv_path is None:
            csv_path = self.coord_csv_path
            
        print(f"正在加载坐标数据: {csv_path}")
        
        try:
            self.df_coords = pd.read_csv(csv_path, sep=sep)
            print(f"坐标数据列: {list(self.df_coords.columns)}")
            
            # 重命名列以便统一处理
            if filename_col in self.df_coords.columns:
                self.df_coords = self.df_coords.rename(columns={
                    filename_col: 'filename',
                    coord_x_col: 'coord_x',
                    coord_y_col: 'coord_y'
                })
            
            print(f"成功加载 {len(self.df_coords)} 条坐标数据")
            print(f"数据预览:")
            print(self.df_coords.head())
            
            return self.df_coords
            
        except Exception as e:
            print(f"加载坐标数据失败: {e}")
            return None
    
    def match_data(self, how='inner'):
        """
        匹配角度和坐标数据
        
        参数:
            how: 合并方式，'inner'（内连接，默认）、'outer'（外连接）、'left'（左连接）、'right'（右连接）
        """
        if self.df_angles is None or self.df_coords is None:
            print("请先加载角度和坐标数据")
            return None
        
        print("\n开始匹配数据...")
        
        # 合并数据
        self.df_merged = pd.merge(
            self.df_angles[['filename', 'angle_x', 'angle_y']],
            self.df_coords[['filename', 'coord_x', 'coord_y']],
            on='filename',
            how=how
        )
        
        # 统计匹配结果
        total_angles = len(self.df_angles)
        total_coords = len(self.df_coords)
        matched = len(self.df_merged)
        
        print(f"匹配完成:")
        print(f"  - 角度数据总数: {total_angles}")
        print(f"  - 坐标数据总数: {total_coords}")
        print(f"  - 成功匹配: {matched}")
        print(f"  - 匹配率: {matched/min(total_angles, total_coords)*100:.2f}%")
        
        if how == 'inner':
            # 对于内连接，检查未匹配的数据
            unmatched_angles = self.df_angles[~self.df_angles['filename'].isin(self.df_merged['filename'])]
            unmatched_coords = self.df_coords[~self.df_coords['filename'].isin(self.df_merged['filename'])]
            
            if len(unmatched_angles) > 0:
                print(f"\n未匹配的角度数据 ({len(unmatched_angles)} 条):")
                print(unmatched_angles[['filename', 'angle_x', 'angle_y']].head())
                
            if len(unmatched_coords) > 0:
                print(f"\n未匹配的坐标数据 ({len(unmatched_coords)} 条):")
                print(unmatched_coords[['filename', 'coord_x', 'coord_y']].head())
        
        # 提取数组
        self.eye_points = self.df_merged[['coord_x', 'coord_y']].values.astype(np.float32)
        self.screen_angles = self.df_merged[['angle_x', 'angle_y']].values.astype(np.float32)
        angles_rad = np.radians(self.screen_angles)  # 转换为弧度
        screen_x = np.tan(angles_rad[:,1])
        screen_y = np.tan(angles_rad[:,0]) 
        self.screen_points = np.column_stack((screen_x, screen_y))


        self.filenames = self.df_merged['filename'].values
        
        print(f"\n提取的数据形状:")
        print(f"  eye_points: {self.eye_points.shape}")
        print(f"  screen_points: {self.screen_points.shape}")
        
        return self.df_merged
    
    def check_data_quality(self):
        """检查数据质量"""
        if self.df_merged is None:
            print("请先匹配数据")
            return
        
        print("\n数据质量检查:")
        print("=" * 60)
        
        # 1. 检查NaN值
        nan_angles = self.df_merged[['angle_x', 'angle_y']].isna().sum().sum()
        nan_coords = self.df_merged[['coord_x', 'coord_y']].isna().sum().sum()
        
        print(f"1. NaN值统计:")
        print(f"   角度数据NaN值: {nan_angles}")
        print(f"   坐标数据NaN值: {nan_coords}")
        
        # 2. 检查数据范围
        print(f"\n2. 数据范围:")
        print(f"   角度X范围: [{self.df_merged['angle_x'].min():.4f}, {self.df_merged['angle_x'].max():.4f}]")
        print(f"   角度Y范围: [{self.df_merged['angle_y'].min():.4f}, {self.df_merged['angle_y'].max():.4f}]")
        print(f"   坐标X范围: [{self.df_merged['coord_x'].min():.2f}, {self.df_merged['coord_x'].max():.2f}]")
        print(f"   坐标Y范围: [{self.df_merged['coord_y'].min():.2f}, {self.df_merged['coord_y'].max():.2f}]")
        
        # 3. 检查数据统计信息
        print(f"\n3. 数据统计信息:")
        print("   角度数据:")
        print(self.df_merged[['angle_x', 'angle_y']].describe())
        print("\n   坐标数据:")
        print(self.df_merged[['coord_x', 'coord_y']].describe())
        
        # 4. 检查异常值（使用IQR方法）
        def count_outliers(series):
            Q1 = series.quantile(0.25)
            Q3 = series.quantile(0.75)
            IQR = Q3 - Q1
            lower_bound = Q1 - 1.5 * IQR
            upper_bound = Q3 + 1.5 * IQR
            outliers = (series < lower_bound) | (series > upper_bound)
            return outliers.sum()
        
        outliers_angle_x = count_outliers(self.df_merged['angle_x'])
        outliers_angle_y = count_outliers(self.df_merged['angle_y'])
        outliers_coord_x = count_outliers(self.df_merged['coord_x'])
        outliers_coord_y = count_outliers(self.df_merged['coord_y'])
        
        print(f"\n4. 异常值统计 (IQR方法):")
        print(f"   角度X异常值: {outliers_angle_x} ({outliers_angle_x/len(self.df_merged)*100:.2f}%)")
        print(f"   角度Y异常值: {outliers_angle_y} ({outliers_angle_y/len(self.df_merged)*100:.2f}%)")
        print(f"   坐标X异常值: {outliers_coord_x} ({outliers_coord_x/len(self.df_merged)*100:.2f}%)")
        print(f"   坐标Y异常值: {outliers_coord_y} ({outliers_coord_y/len(self.df_merged)*100:.2f}%)")
        
        return {
            'has_nan': nan_angles + nan_coords > 0,
            'outliers': {
                'angle_x': outliers_angle_x,
                'angle_y': outliers_angle_y,
                'coord_x': outliers_coord_x,
                'coord_y': outliers_coord_y
            }
        }
    


    def split_by_filenames(self, train_filenames):
    
        if self.df_merged is None:
            print("请先匹配数据")
            return None
    
    # 确保train_filenames是列表类型
        if isinstance(train_filenames, str):
            train_filenames = [train_filenames]
    
        print(f"根据指定的 {len(train_filenames)} 个文件名分割数据...")
    
    # 创建训练集掩码
        train_mask = self.df_merged['filename'].isin(train_filenames)
    
    # 提取训练集
        train_df = self.df_merged[train_mask]
        eye_train = train_df[['coord_x', 'coord_y']].values.astype(np.float32)
    
    # 提取对应的screen_points（注意保持转换一致性）
        screen_angles_train = train_df[['angle_x', 'angle_y']].values.astype(np.float32)
        angles_rad_train = np.radians(screen_angles_train)
        screen_x_train = np.tan(angles_rad_train[:, 1])
        screen_y_train = np.tan(angles_rad_train[:, 0])
        screen_train = np.column_stack((screen_x_train, screen_y_train))
    
    # 提取测试集（剩余数据）
        test_df = self.df_merged[~train_mask]
        eye_test = test_df[['coord_x', 'coord_y']].values.astype(np.float32)
    
        screen_angles_test = test_df[['angle_x', 'angle_y']].values.astype(np.float32)
        angles_rad_test = np.radians(screen_angles_test)
        screen_x_test = np.tan(angles_rad_test[:, 1])
        screen_y_test = np.tan(angles_rad_test[:, 0])
        screen_test = np.column_stack((screen_x_test, screen_y_test))
    
        print(f"分割完成:")
        print(f"  训练集大小: {len(train_df)} (基于 {len(train_filenames)} 个指定文件名)")
        print(f"  测试集大小: {len(test_df)}")
        print(f"  总数据量: {len(self.df_merged)}")
    
    # 打印训练集使用的具体文件名
        print(f"  训练集文件名示例: {train_filenames[:9] if len(train_filenames) > 9 else train_filenames}")
    
        return eye_train, eye_test, screen_train, screen_test
    def visualize_data(self, save_fig=False, output_dir='output'):
        """可视化数据分布"""
        if self.df_merged is None:
            print("请先匹配数据")
            return
        
        # 创建输出目录
        if save_fig and not os.path.exists(output_dir):
            os.makedirs(output_dir)
        
        # 创建2x2的子图
        fig, axes = plt.subplots(2, 2, figsize=(14, 10))
        fig.suptitle('视线数据分布可视化', fontsize=16, fontweight='bold')
        
        # 1. 坐标分布散点图
        axes[0, 0].scatter(self.eye_points[:, 0], self.eye_points[:, 1], 
                          alpha=0.6, s=20, c='blue', edgecolors='white', linewidth=0.5)
        axes[0, 0].set_xlabel('X Coordinate', fontsize=12)
        axes[0, 0].set_ylabel('Y Coordinate', fontsize=12)
        axes[0, 0].set_title('Eye Points Distribution (Image Coordinates)', fontsize=13)
        axes[0, 0].grid(True, alpha=0.3)
        axes[0, 0].set_aspect('equal', adjustable='box')
        
        # 2. 角度分布散点图
        axes[0, 1].scatter(self.screen_points[:, 0], self.screen_points[:, 1], 
                          alpha=0.6, s=20, c='red', edgecolors='white', linewidth=0.5)
        axes[0, 1].set_xlabel('X Angle', fontsize=12)
        axes[0, 1].set_ylabel('Y Angle', fontsize=12)
        axes[0, 1].set_title('Gaze Points Distribution (Screen Angles)', fontsize=13)
        axes[0, 1].grid(True, alpha=0.3)
        axes[0, 1].set_aspect('equal', adjustable='box')
        
        # 3. 坐标分布直方图
        axes[1, 0].hist2d(self.eye_points[:, 0], self.eye_points[:, 1], 
                         bins=30, cmap='Blues')
        axes[1, 0].set_xlabel('X Coordinate', fontsize=12)
        axes[1, 0].set_ylabel('Y Coordinate', fontsize=12)
        axes[1, 0].set_title('Eye Points Density Distribution', fontsize=13)
        
        # 4. 角度分布直方图
        axes[1, 1].hist2d(self.screen_points[:, 0], self.screen_points[:, 1], 
                         bins=30, cmap='Reds')
        axes[1, 1].set_xlabel('X Angle', fontsize=12)
        axes[1, 1].set_ylabel('Y Angle', fontsize=12)
        axes[1, 1].set_title('Gaze Points Density Distribution', fontsize=13)
        
        plt.tight_layout()
        
        if save_fig:
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            fig_path = os.path.join(output_dir, f'data_distribution_{timestamp}.png')
            plt.savefig(fig_path, dpi=300, bbox_inches='tight')
            print(f"可视化图像已保存到: {fig_path}")
        
        plt.show()
        
        # 相关系数矩阵
        print("\n相关系数矩阵:")
        corr_matrix = self.df_merged[['coord_x', 'coord_y', 'angle_x', 'angle_y']].corr()
        print(corr_matrix)
        
        # 绘制相关系数热图
        plt.figure(figsize=(8, 6))
        sns.heatmap(corr_matrix, annot=True, cmap='coolwarm', center=0, 
                   square=True, fmt='.3f', linewidths=1)
        plt.title('相关系数矩阵热图', fontsize=14, fontweight='bold')
        plt.tight_layout()
        
        if save_fig:
            heatmap_path = os.path.join(output_dir, f'correlation_heatmap_{timestamp}.png')
            plt.savefig(heatmap_path, dpi=300, bbox_inches='tight')
            print(f"相关系数热图已保存到: {heatmap_path}")
        
        plt.show()
    
    def save_matched_data(self, output_path='matched_gaze_data.csv'):
        """保存匹配后的数据"""
        if self.df_merged is None:
            print("请先匹配数据")
            return
        
        # 确保目录存在
        output_dir = os.path.dirname(output_path)
        if output_dir and not os.path.exists(output_dir):
            os.makedirs(output_dir)
        
        self.df_merged.to_csv(output_path, index=False)
        print(f"匹配数据已保存到: {output_path}")
        
        return output_path
    
    def split_data(self, test_size=0.2, random_state=42):
        """分割数据集为训练集和测试集"""
        if self.eye_points is None or self.screen_points is None:
            print("请先匹配数据")
            return None
        
        eye_train, eye_test, screen_train, screen_test = train_test_split(
            self.eye_points, self.screen_points, 
            test_size=test_size, 
            random_state=random_state
        )
        
        print(f"数据分割完成:")
        print(f"  训练集大小: {eye_train.shape[0]}")
        print(f"  测试集大小: {eye_test.shape[0]}")
        
        return eye_train, eye_test, screen_train, screen_test
# 假设你想使用以下5个特定文件作为训练集
specified_train_files = [
    'frame_1.png', 
    'frame_16.png', 
    'frame_31.png', 
    'frame_466.png', 
    'frame_481.png',
    'frame_496.png', 
    'frame_930.png', 
    'frame_946.png', 
    'frame_961.png'
]

# 调用新方法

angle_csv=r'C:\Users\86155\Desktop\20250709\twolayer3\coordinate_mapping.csv'
cord_csv=r'C:\Users\86155\Desktop\20250709\twolayer3\segmentation_results_data\class2_centroids.csv'
angle_data = pd.read_csv(angle_csv)
coord_data = pd.read_csv(cord_csv)

# 创建处理器并加载数据
processor = GazeDataProcessor(angle_data, coord_data)
processor.match_data()

建立基于多项式回归的视线追踪模型，将瞳孔中心坐标映射到屏幕注视点坐标

In [None]:
class GazePolynomialModel:
    """视线多项式回归模型类"""
    
    def __init__(self, degree=2, alpha=0.0):
        """
        初始化多项式回归模型
        
        参数:
            degree: 多项式阶数，默认为2
            alpha: 正则化参数，默认为0（线性回归），>0为岭回归
        """
        self.degree = degree
        self.alpha = alpha
        self.poly = PolynomialFeatures(degree=degree, include_bias=True)
        self.scaler = StandardScaler()
        
        # if alpha > 0:
        #     self.model_x = Ridge(alpha=alpha, random_state=42)
        #     self.model_y = Ridge(alpha=alpha, random_state=42)
        # else:
        self.model_x = LinearRegression()
        self.model_y = LinearRegression()
        
        self.coefficients_x = None
        self.coefficients_y = None
        self.intercept_x = None
        self.intercept_y = None
        self.poly_feature_names = None
        self.scaler_mean = None
        self.scaler_scale = None

        self.is_fitted = False
        
    def fit(self, eye_points, screen_points):
        """训练模型"""
        print(f"训练多项式回归模型 (degree={self.degree}, alpha={self.alpha})...")
        
        # 生成多项式特征
        X_poly = self.poly.fit_transform(eye_points)
        self.poly_feature_names = self.poly.get_feature_names_out(['eye_x', 'eye_y'])
        # 标准化特征
        X_scaled = self.scaler.fit_transform(X_poly)
        self.scaler_mean = self.scaler.mean_
        self.scaler_scale = self.scaler.scale_

        # 训练X方向模型
        self.model_x.fit(X_scaled, screen_points[:, 0])
        self.coefficients_x = self.model_x.coef_
        self.intercept_x = self.model_x.intercept_

        # 训练Y方向模型
        self.model_y.fit(X_scaled, screen_points[:, 1])
        self.coefficients_y = self.model_y.coef_
        self.intercept_y = self.model_y.intercept_
        self.is_fitted = True
        print("模型训练完成")
        self._print_coefficients()
        return self
    
    def _print_coefficients(self):
        """打印模型系数"""
        print("\n" + "="*60)
        print("模型系数信息:")
        print("="*60)
        
        print(f"多项式特征 (degree={self.degree}):")
        for i, name in enumerate(self.poly_feature_names):
            print(f"  {i}: {name}")
        
        print(f"\nX方向模型系数 (共{len(self.coefficients_x)}个):")
        for i, (name, coeff) in enumerate(zip(self.poly_feature_names, self.coefficients_x)):
            print(f"  {name}: {coeff:.6f}")
        print(f"  截距: {self.intercept_x:.6f}")
        
        print(f"\nY方向模型系数 (共{len(self.coefficients_y)}个):")
        for i, (name, coeff) in enumerate(zip(self.poly_feature_names, self.coefficients_y)):
            print(f"  {name}: {coeff:.6f}")
        print(f"  截距: {self.intercept_y:.6f}")
        
        print(f"\n标准化器参数:")
        print(f"  均值 (mean): {self.scaler_mean}")
        print(f"  标准差 (scale): {self.scaler_scale}")
        print("="*60)
    
    def get_coefficients(self):
        """获取模型的所有系数"""
        if not self.is_fitted:
            raise ValueError("模型尚未训练，请先调用fit方法")
        
        return {
            'degree': self.degree,
            'alpha': self.alpha,
            'poly_feature_names': self.poly_feature_names,
            'coefficients_x': self.coefficients_x,
            'coefficients_y': self.coefficients_y,
            'intercept_x': self.intercept_x,
            'intercept_y': self.intercept_y,
            'scaler_mean': self.scaler_mean,
            'scaler_scale': self.scaler_scale,
            'model_x_coef': self.model_x.coef_,
            'model_x_intercept': self.model_x.intercept_,
            'model_y_coef': self.model_y.coef_,
            'model_y_intercept': self.model_y.intercept_
        }
    
    def save_coefficients(self, filepath='model_coefficients.pkl'):
        """保存模型系数到文件"""
        import pickle
        
        if not self.is_fitted:
            raise ValueError("模型尚未训练，无法保存系数")
        
        # 收集所有需要保存的数据
        model_data = {
            'degree': self.degree,
            'alpha': self.alpha,
            'poly_feature_names': self.poly_feature_names,
            'coefficients_x': self.coefficients_x,
            'coefficients_y': self.coefficients_y,
            'intercept_x': self.intercept_x,
            'intercept_y': self.intercept_y,
            'scaler_mean': self.scaler_mean,
            'scaler_scale': self.scaler_scale,
            'is_fitted': self.is_fitted
        }
        
        # 保存到文件
        with open(filepath, 'wb') as f:
            pickle.dump(model_data, f)
        
        print(f"模型系数已保存到: {filepath}")
        
        # 同时保存为文本文件便于查看
        txt_filepath = filepath.replace('.pkl', '.txt')
        self.save_coefficients_txt(txt_filepath)
        
        return filepath
    
    def save_coefficients_txt(self, filepath='model_coefficients.txt'):
        """保存模型系数到文本文件"""
        if not self.is_fitted:
            raise ValueError("模型尚未训练，无法保存系数")
        
        with open(filepath, 'w', encoding='utf-8') as f:
            f.write("="*60 + "\n")
            f.write("注视点映射模型系数\n")
            f.write("="*60 + "\n\n")
            
            f.write(f"模型配置:\n")
            f.write(f"  多项式阶数 (degree): {self.degree}\n")
            f.write(f"  正则化参数 (alpha): {self.alpha}\n")
            f.write(f"  是否已训练: {self.is_fitted}\n\n")
            
            f.write("多项式特征:\n")
            for i, name in enumerate(self.poly_feature_names):
                f.write(f"  {i}: {name}\n")
            f.write("\n")
            
            f.write("标准化器参数:\n")
            f.write(f"  均值 (mean): {self.scaler_mean}\n")
            f.write(f"  标准差 (scale): {self.scaler_scale}\n\n")
            
            f.write("X方向模型系数:\n")
            f.write(f"  截距: {self.intercept_x:.10f}\n")
            for i, (name, coeff) in enumerate(zip(self.poly_feature_names, self.coefficients_x)):
                f.write(f"  {name}: {coeff:.10f}\n")
            f.write("\n")
            
            f.write("Y方向模型系数:\n")
            f.write(f"  截距: {self.intercept_y:.10f}\n")
            for i, (name, coeff) in enumerate(zip(self.poly_feature_names, self.coefficients_y)):
                f.write(f"  {name}: {coeff:.10f}\n")
        
        print(f"模型系数(文本)已保存到: {filepath}")
        return filepath
    
    def save_full_model(self, filepath='gaze_model_full.pkl'):
        """保存完整模型对象（包括所有子模型）"""
        import pickle
        
        model_data = {
            'model_object': self,  # 保存整个对象
            'metadata': {
                'degree': self.degree,
                'alpha': self.alpha,
                'is_fitted': self.is_fitted
            }
        }
        
        with open(filepath, 'wb') as f:
            pickle.dump(model_data, f)
        
        print(f"完整模型已保存到: {filepath}")
        return filepath
    
    def predict(self, eye_points):
        """预测注视点"""
        if not self.is_fitted:
            raise ValueError("模型尚未训练，请先调用fit方法")
        
        # 生成多项式特征
        X_poly = self.poly.transform(eye_points)
        
        # 标准化特征
        X_scaled = self.scaler.transform(X_poly)
        
        # 预测
        pred_x = self.model_x.predict(X_scaled)
        pred_y = self.model_y.predict(X_scaled)
        
        return np.column_stack([pred_x, pred_y])
    
    def evaluate(self, eye_points, screen_points, verbose=True):
        """评估模型性能"""
        if not self.is_fitted:
            raise ValueError("模型尚未训练，请先调用fit方法")
        
        # 预测
        predictions = self.predict(eye_points)
        
        # 计算各种指标
        mse_x = mean_squared_error(screen_points[:, 0], predictions[:, 0])
        mse_y = mean_squared_error(screen_points[:, 1], predictions[:, 1])
        mse_total = mean_squared_error(screen_points.flatten(), predictions.flatten())
        
        mae_x = mean_absolute_error(screen_points[:, 0], predictions[:, 0])
        mae_y = mean_absolute_error(screen_points[:, 1], predictions[:, 1])
        mae_total = mean_absolute_error(screen_points.flatten(), predictions.flatten())
        
        rmse_x = np.sqrt(mse_x)
        rmse_y = np.sqrt(mse_y)
        rmse_total = np.sqrt(mse_total)
        
        # 计算相关系数
        corr_x, p_x = pearsonr(screen_points[:, 0], predictions[:, 0])
        corr_y, p_y = pearsonr(screen_points[:, 1], predictions[:, 1])
        
        if verbose:
            print("\n模型性能评估:")
            print("=" * 60)
            print(f"{'指标':<15} {'X方向':<15} {'Y方向':<15} {'总体':<15}")
            print("-" * 60)
            print(f"{'MSE':<15} {mse_x:<15.6f} {mse_y:<15.6f} {mse_total:<15.6f}")
            print(f"{'RMSE':<15} {rmse_x:<15.6f} {rmse_y:<15.6f} {rmse_total:<15.6f}")
            print(f"{'MAE':<15} {mae_x:<15.6f} {mae_y:<15.6f} {mae_total:<15.6f}")
            print(f"{'相关系数':<15} {corr_x:<15.6f} {corr_y:<15.6f} {'-':<15}")
            print(f"{'P值':<15} {p_x:<15.6f} {p_y:<15.6f} {'-':<15}")
        
        return {
            'mse_x': mse_x, 'mse_y': mse_y, 'mse_total': mse_total,
            'rmse_x': rmse_x, 'rmse_y': rmse_y, 'rmse_total': rmse_total,
            'mae_x': mae_x, 'mae_y': mae_y, 'mae_total': mae_total,
            'corr_x': corr_x, 'corr_y': corr_y,
            'p_x': p_x, 'p_y': p_y
        }
    
    def cross_validate(self, eye_points, screen_points, cv=5, verbose=True):
        """交叉验证"""
        # 生成多项式特征
        X_poly = self.poly.fit_transform(eye_points)
        X_scaled = self.scaler.fit_transform(X_poly)
        
        # 交叉验证X方向
        cv_scores_x = cross_val_score(self.model_x, X_scaled, screen_points[:, 0], 
                                     cv=cv, scoring='neg_mean_squared_error')
        mse_scores_x = -cv_scores_x
        rmse_scores_x = np.sqrt(mse_scores_x)
        
        # 交叉验证Y方向
        cv_scores_y = cross_val_score(self.model_y, X_scaled, screen_points[:, 1], 
                                     cv=cv, scoring='neg_mean_squared_error')
        mse_scores_y = -cv_scores_y
        rmse_scores_y = np.sqrt(mse_scores_y)
        
        if verbose:
            print(f"\n{self.degree}阶多项式模型交叉验证结果 (cv={cv}):")
            print("=" * 60)
            print(f"{'方向':<10} {'MSE均值':<15} {'MSE标准差':<15} {'RMSE均值':<15} {'RMSE标准差':<15}")
            print("-" * 60)
            print(f"{'X方向':<10} {np.mean(mse_scores_x):<15.6f} {np.std(mse_scores_x):<15.6f} "
                  f"{np.mean(rmse_scores_x):<15.6f} {np.std(rmse_scores_x):<15.6f}")
            print(f"{'Y方向':<10} {np.mean(mse_scores_y):<15.6f} {np.std(mse_scores_y):<15.6f} "
                  f"{np.mean(rmse_scores_y):<15.6f} {np.std(rmse_scores_y):<15.6f}")
        
        return {
            'mse_x_scores': mse_scores_x, 'mse_y_scores': mse_scores_y,
            'rmse_x_scores': rmse_scores_x, 'rmse_y_scores': rmse_scores_y
        }
    
    def find_best_degree(self, eye_points, screen_points, max_degree=5, cv=5, test_size=0.2):
        """寻找最佳多项式阶数"""
        print(f"寻找最佳多项式阶数 (1-{max_degree})...")
        
        degrees = range(1, max_degree + 1)
        results = []
        
        # 分割数据
        eye_train, eye_test, screen_train, screen_test = train_test_split(
            eye_points, screen_points, test_size=test_size, random_state=42
        )
        
        for degree in degrees:
            print(f"  测试阶数 {degree}...")
            
            # 创建模型
            model = GazePolynomialModel(degree=degree, alpha=self.alpha)
            
            # 交叉验证
            cv_results = model.cross_validate(eye_train, screen_train, cv=cv, verbose=False)
            
            # 训练模型
            model.fit(eye_train, screen_train)
            
            # 在测试集上评估
            test_results = model.evaluate(eye_test, screen_test, verbose=False)
            
            results.append({
                'degree': degree,
                'cv_rmse_x_mean': np.mean(cv_results['rmse_x_scores']),
                'cv_rmse_y_mean': np.mean(cv_results['rmse_y_scores']),
                'test_rmse_total': test_results['rmse_total'],
                'test_corr_x': test_results['corr_x'],
                'test_corr_y': test_results['corr_y']
            })
        
        # 转换为DataFrame
        results_df = pd.DataFrame(results)
        
        # 找到最佳阶数（基于测试集总RMSE）
        best_idx = results_df['test_rmse_total'].idxmin()
        best_degree = results_df.loc[best_idx, 'degree']
        best_rmse = results_df.loc[best_idx, 'test_rmse_total']
        
        print(f"\n最佳多项式阶数: {best_degree} (测试集RMSE={best_rmse:.6f})")
        print("\n各阶数性能对比:")
        print(results_df.to_string(index=False))
        
        # 可视化结果
        fig, axes = plt.subplots(1, 2, figsize=(14, 5))
        
        # RMSE随阶数变化
        axes[0].plot(results_df['degree'], results_df['cv_rmse_x_mean'], 'o-', label='X方向交叉验证RMSE')
        axes[0].plot(results_df['degree'], results_df['cv_rmse_y_mean'], 's-', label='Y方向交叉验证RMSE')
        axes[0].plot(results_df['degree'], results_df['test_rmse_total'], '^-', label='测试集总RMSE')
        axes[0].axvline(x=best_degree, color='r', linestyle='--', alpha=0.7, label=f'最佳阶数={best_degree}')
        axes[0].set_xlabel('多项式阶数')
        axes[0].set_ylabel('RMSE')
        axes[0].set_title('RMSE随多项式阶数变化')
        axes[0].legend()
        axes[0].grid(True, alpha=0.3)
        
        # 相关系数随阶数变化
        axes[1].plot(results_df['degree'], results_df['test_corr_x'], 'o-', label='X方向相关系数')
        axes[1].plot(results_df['degree'], results_df['test_corr_y'], 's-', label='Y方向相关系数')
        axes[1].axvline(x=best_degree, color='r', linestyle='--', alpha=0.7, label=f'最佳阶数={best_degree}')
        axes[1].set_xlabel('多项式阶数')
        axes[1].set_ylabel('相关系数')
        axes[1].set_title('相关系数随多项式阶数变化')
        axes[1].legend()
        axes[1].grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
        
        return best_degree, results_df
    
    def visualize_predictions(self, eye_points, screen_points, sample_size=961):
        """可视化预测结果"""
        if not self.is_fitted:
            raise ValueError("模型尚未训练，请先调用fit方法")
        
        # 预测
        predictions = self.predict(eye_points)
        
        # 随机选择一部分样本
        if len(eye_points) > sample_size:
            indices = np.random.choice(len(eye_points), sample_size, replace=False)
            eye_sample = eye_points[indices]
            screen_sample = screen_points[indices]
            pred_sample = predictions[indices]
        else:
            eye_sample = eye_points
            screen_sample = screen_points
            pred_sample = predictions
        
        # 创建可视化
        fig, axes = plt.subplots(1, 2, figsize=(14, 6))
        
        # 预测 vs 真实值散点图
        axes[0].scatter(screen_sample[:, 0], pred_sample[:, 0], alpha=0.6, label='X方向')
        axes[0].scatter(screen_sample[:, 1], pred_sample[:, 1], alpha=0.6, label='Y方向')
        
        # 添加对角线（完美预测线）
        min_val = min(screen_sample.min(), pred_sample.min())
        max_val = max(screen_sample.max(), pred_sample.max())
        axes[0].plot([min_val, max_val], [min_val, max_val], 'r--', alpha=0.7, label='完美预测')
        
        axes[0].set_xlabel('真实值')
        axes[0].set_ylabel('预测值')
        axes[0].set_title(f'预测值 vs 真实值 (多项式阶数={self.degree})')
        axes[0].legend()
        axes[0].grid(True, alpha=0.3)
        axes[0].set_aspect('equal', adjustable='box')
        
        # 预测误差分布
        errors_x = pred_sample[:, 0] - screen_sample[:, 0]
        errors_y = pred_sample[:, 1] - screen_sample[:, 1]
        
        axes[1].hist(errors_x, bins=20, alpha=0.7, label=f'X方向 (MAE={np.mean(np.abs(errors_x)):.4f})')
        axes[1].hist(errors_y, bins=20, alpha=0.7, label=f'Y方向 (MAE={np.mean(np.abs(errors_y)):.4f})')
        axes[1].axvline(x=0, color='r', linestyle='--', alpha=0.7)
        axes[1].set_xlabel('预测误差')
        axes[1].set_ylabel('频数')
        axes[1].set_title('预测误差分布')
        axes[1].legend()
        axes[1].grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
        sample_errors = pred_sample - screen_sample
        sample_error_magnitudes = np.sqrt(sample_errors[:, 0]**2 + sample_errors[:, 1]**2)
        
        sample_df = pd.DataFrame({
            'true_screen_x': screen_sample[:, 0],
            'true_screen_y': screen_sample[:, 1],
            'pred_screen_x': pred_sample[:, 0],
            'pred_screen_y': pred_sample[:, 1],
            'error_x': sample_errors[:, 0],
            'error_y': sample_errors[:, 1],
            'error_magnitude': sample_error_magnitudes,
            'is_visualization_sample': True  # 标记这是可视化样本
        })
        
        sample_data_path = 'output/visualization_sample_data3.csv'
        sample_df.to_csv(sample_data_path, index=False, encoding='utf-8')
        print(f"可视化样本数据已保存到: {sample_data_path}")

整合之前定义的 GazeDataProcessor 和 GazePolynomialModel 类，执行从数据加载到模型训练的完整流程。

In [None]:
def main_demo():
    """主演示函数"""
    print("=" * 70)
    print("视线数据匹配与多项式回归建模系统")
    print("=" * 70)
    
    # 初始化数据处理器
    processor = GazeDataProcessor()
    
    # 1. 加载数据（请替换为您的实际文件路径）
    # 假设您的CSV文件格式：
    # 角度数据CSV: image_path,angle_x,angle_y
    # 坐标数据CSV: image_name,coord_x,coord_y
    
    # 示例文件路径
    angle_csv = r"C:\Users\86155\Desktop\20250709\twolayer3\coordinate_mapping.csv"  # 请替换为您的实际文件路径
    coord_csv = r"C:\Users\86155\Desktop\20250709\twolayer3\segmentation_results_data\class2_centroids.csv"  # 请替换为您的实际文件路径
    
    # 加载数据
    angles_df = processor.load_angle_data(angle_csv, sep=',')
    coords_df = processor.load_coordinate_data(coord_csv, sep=',')
    
    if angles_df is None or coords_df is None:
        print("数据加载失败，请检查文件路径和格式")
        return
    
    # 2. 匹配数据
    df_merged = processor.match_data(how='inner')
    
    # 3. 检查数据质量
    quality_info = processor.check_data_quality()
    
    # 4. 可视化数据
    processor.visualize_data(save_fig=True, output_dir='output')
    
    # 5. 保存匹配数据
    matched_file = processor.save_matched_data('output/matched_gaze_data.csv')
    
    # 6. 分割数据processor.split_by_filenames(specified_train_files)split_data(test_size=0.99, random_state=42)
    eye_train, eye_test, screen_train, screen_test = processor.split_data(test_size=0.2, random_state=42)
    
    # 7. 寻找最佳多项式阶数
    print("\n" + "=" * 70)
    print("模型训练与优化")
    print("=" * 70)
    
    base_model = GazePolynomialModel(degree=2, alpha=0.0)
    best_degree, degree_results = base_model.find_best_degree(
        eye_train, screen_train, 
        max_degree=5, cv=5, test_size=0.2
    )
    
    # 8. 使用最佳阶数训练模型
    print(f"\n使用最佳阶数 {best_degree} 训练最终模型...")
    final_model = GazePolynomialModel(degree=best_degree, alpha=0.1)  # 添加一点正则化
    
    # 交叉验证
    cv_results = final_model.cross_validate(eye_train, screen_train, cv=5)
    
    # 训练模型
    final_model.fit(eye_train, screen_train)
    
    # 在测试集上评估
    print("\n测试集性能评估:")
    test_results = final_model.evaluate(eye_test, screen_test)
    final_model.save_coefficients_txt('output/gaze_model_coefficients.txt')
    # 9. 可视化预测结果
    # final_model.visualize_predictions(eye_train, screen_train, sample_size=961)
    final_model.visualize_predictions(processor.eye_points, processor.screen_points, sample_size=961)
    # 10. 保存模型信息
    model_info = {
        'degree': best_degree,
        'alpha': 0.1,
        'train_samples': len(eye_train),
        'test_samples': len(eye_test),
        'test_rmse': test_results['rmse_total'],
        'test_mae': test_results['mae_total'],
        'test_corr_x': test_results['corr_x'],
        'test_corr_y': test_results['corr_y']
    }
    
    model_info_df = pd.DataFrame([model_info])
    model_info_path = 'output/model_info.csv'
    model_info_df.to_csv(model_info_path, index=False)
    print(f"\n模型信息已保存到: {model_info_path}")
    
    print("\n" + "=" * 70)
    print("处理完成!")
    print("=" * 70)
    
    return processor, final_model, model_info


def create_sample_data():
    """创建示例数据（如果实际数据不可用）"""
    print("创建示例数据...")
    
    # 创建输出目录
    output_dir = 'sample_data'
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    
    # 生成100个样本
    np.random.seed(42)
    n_samples = 961
    
    # 生成文件名
    filenames = [f'frame_{i}.png' for i in range(1, n_samples + 1)]
    
    # 生成坐标数据（模拟瞳孔中心位置）
    coord_x = np.random.uniform(961, 500, n_samples)
    coord_y = np.random.uniform(961, 400, n_samples)
    
    # 生成角度数据（模拟注视点方向）
    # 假设简单的非线性关系
    angle_x = 0.01 * coord_x + 0.002 * coord_y - 0.00001 * coord_x * coord_y + np.random.normal(0, 0.5, n_samples)
    angle_y = 0.008 * coord_y + 0.001 * coord_x - 0.000005 * coord_x * coord_y + np.random.normal(0, 0.5, n_samples)
    
    # 添加路径前缀
    paths = [f'C:/Users/username/dataset/train/data_ori/{filename}' for filename in filenames]
    
    # 创建角度数据DataFrame
    angles_df = pd.DataFrame({
        'image_path': paths,
        'angle_x': angle_x,
        'angle_y': angle_y
    })
    
    # 创建坐标数据DataFrame
    coords_df = pd.DataFrame({
        'image_name': filenames,
        'coord_x': coord_x,
        'coord_y': coord_y
    })
    
    # 保存CSV文件
    angles_csv = os.path.join(output_dir, 'gaze_angles.csv')
    coords_csv = os.path.join(output_dir, 'eye_coordinates.csv')
    
    angles_df.to_csv(angles_csv, index=False)
    coords_df.to_csv(coords_csv, index=False)
    
    print(f"示例角度数据已保存到: {angles_csv}")
    print(f"示例坐标数据已保存到: {coords_csv}")
    print(f"数据形状: 角度={angles_df.shape}, 坐标={coords_df.shape}")
    
    return angles_csv, coords_csv

执行多项式拟合

In [None]:
if __name__ == "__main__":
    # 运行主演示函数
    processor, model, model_info = main_demo()