In [None]:
import numpy as np
print(np.__version__)



In [None]:
import os


def check_invalid_values(image_path):
    """
    检查单张影像是否存在无效值，并统计无效值比例。
    :param image_path: 影像文件路径
    :return: 无效值比例字典，键为波段索引，值为无效值比例
    """
    invalid_ratios = {}
    try:
        with rasterio.open(image_path) as src:
            data = src.read()  # 读取影像数据 (C, H, W)
            num_bands, height, width = data.shape

            for band_idx in range(num_bands):
                band_data = data[band_idx]
                total_pixels = height * width
                invalid_pixels = np.isnan(band_data).sum() + np.isinf(band_data).sum()
                invalid_ratio = invalid_pixels / total_pixels
                invalid_ratios[band_idx + 1] = invalid_ratio  # 波段从1开始编号

    except Exception as e:
        print(f"Error processing {image_path}: {e}")
        return None

    return invalid_ratios


def analyze_dataset(root_dir, output_log):
    """
    分析整个数据集，统计每张影像的无效值比例，并将结果写入日志文件。
    :param root_dir: 数据集根目录
    :param output_log: 输出日志文件路径
    """
    invalid_images = []  # 存储包含无效值的影像路径
    total_images = 0  # 总影像数

    # 打开日志文件以写入模式
    with open(output_log, "w") as log_file:
        log_file.write("Invalid Value Analysis Report\n")
        log_file.write("=" * 50 + "\n")

        # 遍历目录中的所有 .tif 文件
        for dirpath, _, filenames in os.walk(root_dir):
            for filename in filenames:
                if filename.lower().endswith('.tif'):
                    total_images += 1
                    image_path = os.path.join(dirpath, filename)
                    invalid_ratios = check_invalid_values(image_path)

                    if invalid_ratios is not None and any(ratio > 0 for ratio in invalid_ratios.values()):
                        invalid_images.append((image_path, invalid_ratios))
                        log_file.write(f"Invalid values found in {image_path}:\n")
                        for band, ratio in invalid_ratios.items():
                            log_file.write(f"  Band {band}: {ratio:.2%} invalid pixels\n")

        # 写入总结信息
        log_file.write("\nAnalysis Summary:\n")
        log_file.write(f"Total images analyzed: {total_images}\n")
        log_file.write(f"Images with invalid values: {len(invalid_images)}\n")
        if invalid_images:
            log_file.write("List of images with invalid values:\n")
            for path, _ in invalid_images:
                log_file.write(f"  {path}\n")


# 使用示例
# s1_image_dir = r"E:\postgraduate\小论文\小论文\S1_processed_cropped_index"
s2_image_dir = r"E:\postgraduate\小论文\小论文\S2\ValidImages"

# 定义日志文件路径
# s1_log_file = os.path.join(s1_image_dir, "S1_invalid_values.txt")
s2_log_file = os.path.join(s2_image_dir, "S2_invalid_values.txt")

# print("Analyzing S1 images...")
# analyze_dataset(s1_image_dir, s1_log_file)

print("\nAnalyzing S2 images...")
analyze_dataset(s2_image_dir, s2_log_file)

print("Analysis completed. Check the log files for details.")


# CNN提取并拼接S1和S2数据特征

将拼接特征保存到本地，避免重复计算和内存不足。

In [None]:

import rasterio
from collections import defaultdict
from torch.nn import Module, Conv2d, ReLU, Flatten, Linear


# 定义一个简单的CNN模型
class SimpleCNN(Module):
    def __init__(self, in_channels, input_size):
        super(SimpleCNN, self).__init__()
        # 使用2x2卷积核
        self.conv1 = Conv2d(in_channels, 64, kernel_size=2, stride=1, padding=0)
        self.relu = ReLU()
        self.flatten = Flatten()

        # 计算卷积层输出的尺寸
        conv_output_size = (input_size[0] - 1) * (input_size[1] - 1) * 64
        self.fc1 = Linear(conv_output_size, 128)
        self.fc2 = Linear(128, 64)

    def forward(self, x):
        x = self.conv1(x)
        x = self.relu(x)
        x = self.flatten(x)
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        return x

class SampleSiteDataset:
    def __init__(self, s1_image_dir, s2_image_dir, output_dir):
        """
        初始化数据集
        :param s1_image_dir: S1影像根目录
        :param s2_image_dir: S2影像根目录
        :param output_dir: 特征保存目录
        """
        self.s1_image_dir = s1_image_dir
        self.s2_image_dir = s2_image_dir
        self.output_dir = output_dir
        os.makedirs(output_dir, exist_ok=True)

        # 递归获取所有 .tif 文件
        self.s1_images = self.get_all_tif_files(s1_image_dir)
        self.s2_images = self.get_all_tif_files(s2_image_dir)

        # 获取样地列表
        s1_sample_sites = set(self.s1_images.keys())
        s2_sample_sites = set(self.s2_images.keys())

        # 取交集，确保只处理 S1 和 S2 都存在的样地
        self.sample_sites = sorted(s1_sample_sites & s2_sample_sites)

        # 打印样地列表
        print(f"S1 Sample Sites: {s1_sample_sites}")
        print(f"S2 Sample Sites: {s2_sample_sites}")
        print(f"Common Sample Sites: {self.sample_sites}")

        # 加载简单的CNN模型
        with rasterio.open(next(iter(self.s1_images.values()))[0][1]) as src:
            s1_input_size = (src.height, src.width)
        with rasterio.open(next(iter(self.s2_images.values()))[0][1]) as src:
            s2_input_size = (src.height, src.width)

        self.cnn_s1 = SimpleCNN(in_channels=4, input_size=s1_input_size)  # 假设S1有4个通道
        self.cnn_s2 = SimpleCNN(in_channels=19, input_size=s2_input_size)  # 假设S2有19个通道
        self.cnn_s1.eval()
        self.cnn_s2.eval()

    def get_all_tif_files(self, root_dir):
        tif_files = defaultdict(list)
        for dirpath, _, filenames in os.walk(root_dir):
            for filename in filenames:
                if filename.lower().endswith('.tif'):
                    sample_site = filename.split('_')[0]
                    date = self.parse_date(filename)
                    tif_files[sample_site].append((date, os.path.join(dirpath, filename)))
        return tif_files

    def parse_date(self, filename):
        parts = filename.split('_')
        date_str = parts[-1].split('.')[0]
        return datetime.strptime(date_str, '%Y%m%d' if len(date_str) == 8 else '%Y-%m-%d')

    def merge_monthly_images(self, image_paths):
        with rasterio.open(image_paths[0][1]) as src:
            profile = src.profile
            data = np.zeros((src.count, src.height, src.width), dtype=np.float32)

        for _, path in image_paths:
            with rasterio.open(path) as src:
                data += src.read()

        data /= len(image_paths)
        return data, profile

    def extract_features(self, image_data):
        # 检查图像数据的形状
        print(f"Image data shape: {image_data.shape}")

        # 确保图像数据的形状正确
        if image_data.ndim != 3 or image_data.shape[0] not in [4, 19]:
            raise ValueError(f"Image data shape {image_data.shape} is not (C, H, W) with C in [4, 19].")

        # 直接将 image_data 转换为 torch.tensor，并添加一个批次维度
        tensor = torch.tensor(image_data).unsqueeze(0)
        print(f"Tensor shape: {tensor.shape}")

        features = self.cnn_s1(tensor) if tensor.shape[1] == 4 else self.cnn_s2(tensor)
        return features.squeeze(0).detach().numpy()

    def save_features(self, sample_site, date, combined_features):
        feature_file = os.path.join(self.output_dir, f"{sample_site}_{date.strftime('%Y%m%d')}.npy")
        np.save(feature_file, combined_features)
        print(f"Saved features to {feature_file}")

    def __len__(self):
        return len(self.sample_sites)

    def __getitem__(self, idx):
        sample_site = self.sample_sites[idx]

        # 获取该样地的所有年份的S1和S2影像
        s1_images = self.s1_images[sample_site]
        s2_images = self.s2_images[sample_site]

        # 按月度合并S1影像
        s1_features = {}
        for month, paths in self.group_by_month(s1_images).items():
            merged_image, _ = self.merge_monthly_images(paths)
            s1_features[month] = self.extract_features(merged_image)

        # 提取S2影像的特征
        s2_features = {}
        for date, path in s2_images:
            with rasterio.open(path) as src:
                image_data = src.read()
            s2_feature = self.extract_features(image_data)
            s2_features[date] = (path, s2_feature)

        # 拼接特征并保存
        for month in s1_features:
            for date, (path, s2_feature) in s2_features.items():
                if date.strftime('%Y-%m') == month:
                    combined_feature = np.concatenate((s1_features[month], s2_feature))
                    self.save_features(sample_site, date, combined_feature)
                    print(f"Sample {idx}, Date {date}: S1 feature shape: {s1_features[month].shape}, S2 feature shape: {s2_feature.shape}, Combined feature shape: {combined_feature.shape}, S2 Image Path: {path}")

    def group_by_month(self, images):
        grouped = defaultdict(list)
        for date, path in images:
            month = date.strftime('%Y-%m')
            grouped[month].append((date, path))
        return grouped

# 使用示例
s1_image_dir = r"E:\postgraduate\小论文\小论文\S1_processed_cropped_index"
s2_image_dir = r"E:\postgraduate\小论文\小论文\S2\cropped"
output_dir = r"E:\postgraduate\小论文\小论文\features"

dataset = SampleSiteDataset(s1_image_dir, s2_image_dir, output_dir)

for i in range(len(dataset)):
    dataset[i]


## 检查特征矩阵是否合理

In [None]:


def check_npy_file(file_path):
    try:
        data = np.load(file_path)
        if np.isnan(data).any() or np.isinf(data).any():
            return True, file_path
        else:
            return False, file_path
    except Exception as e:
        print(f"Error loading {file_path}: {e}")
        return None, file_path


def batch_check_npy_files(folder_path):
    problematic_files = []

    # 获取文件夹中所有.npy文件的路径
    npy_files = [os.path.join(folder_path, f) for f in os.listdir(folder_path) if f.endswith('.npy')]

    for file_path in npy_files:
        has_issue, path = check_npy_file(file_path)
        if has_issue is not None and has_issue:
            problematic_files.append(path)

    return problematic_files

# 设置文件夹路径
folder_path = r"E:\postgraduate\小论文\小论文\features"

# 检查并输出有问题的文件
problematic_files = batch_check_npy_files(folder_path)

if problematic_files:
    print("The following files contain NaN or Inf values:")
    for file in problematic_files:
        print(file)
else:
    print("All .npy files are clean.")


# 提取日期信息到csv

In [None]:


def extract_date_from_npy_filenames(feature_dir, output_csv_path):
    """
    从 .npy 文件名中提取 sample_plot_id 和 date 信息，并保存到新的 CSV 文件中。

    :param feature_dir: 包含 .npy 文件的目录路径
    :param output_csv_path: 输出 CSV 文件的路径
    """
    data = []

    for filename in os.listdir(feature_dir):
        if filename.endswith('.npy'):
            try:
                # 解析文件名
                sample_site, date_str = filename.split('_')
                date_str = date_str.replace('.npy', '')
                date = datetime.strptime(date_str, '%Y%m%d')

                # 添加到数据列表
                data.append({
                    'sample_plot_id': sample_site,
                    'date': date.strftime('%Y-%m-%d')
                })
            except Exception as e:
                print(f"Error processing file {filename}: {e}")

    # 创建 DataFrame 并去重
    df_dates = pd.DataFrame(data)
    df_dates.drop_duplicates(inplace=True)

    # 保存到 CSV 文件
    df_dates.to_csv(output_csv_path, index=False, encoding='gbk')
    print(f"Date information saved to {output_csv_path}")

# 使用示例
if __name__ == "__main__":
    feature_dir = r"E:\postgraduate\小论文\小论文\features"
    output_csv_path = r"E:\postgraduate\小论文\小论文\日期信息.csv"
    extract_date_from_npy_filenames(feature_dir, output_csv_path)


# 关联特征和树种标签

将样地id、tip、date、影像特征矩阵文件位置关联起来，生成一个包含所有信息的表格。

In [None]:
import os
import pandas as pd
from datetime import datetime

# 文件路径
csv_file_1 = r"E:\postgraduate\小论文\小论文\表格文件\npy日期信息.csv"
csv_file_2 = r"E:\postgraduate\小论文\小论文\表格文件\树种识别训练表格（保留关键特征列，手动聚类至20个）.csv"
output_csv = r"E:\postgraduate\小论文\小论文\表格文件\id_tip_date_features.csv"
npy_directory = r"E:\postgraduate\小论文\小论文\features"

# 读取第一个CSV文件中的sample_plot_id和date列
df_date = pd.read_csv(csv_file_1, usecols=['sample_plot_id', 'date'], encoding="gbk")

# 将日期格式转换为统一格式 (YYYYMMDD)
df_date['date'] = pd.to_datetime(df_date['date']).dt.strftime('%Y%m%d')

# 读取第二个CSV文件中的sample_plot_id和tip列
df_tip = pd.read_csv(csv_file_2, usecols=['sample_plot_id', 'tip'], encoding="gbk")

# 获取所有.npy文件的字典 {sample_plot_id: [npy文件列表]}
npy_files_dict = {}
for filename in os.listdir(npy_directory):
    if filename.endswith('.npy'):
        sample_plot_id, date_str = filename.split('_')
        npy_files_dict.setdefault(sample_plot_id, []).append(os.path.join(npy_directory, filename))

# 合并两个DataFrame，以sample_plot_id为键
df_merged = df_date.merge(df_tip, on='sample_plot_id', how='inner')

# 添加.npy文件位置列
def get_npy_location(row):
    sample_plot_id = row['sample_plot_id']
    date_str = row['date']
    if sample_plot_id in npy_files_dict:
        for npy_file in npy_files_dict[sample_plot_id]:
            if f"_{date_str}.npy" in npy_file:
                return npy_file
    return 'N/A'

df_merged['npy_location'] = df_merged.apply(get_npy_location, axis=1)

# 输出到新的CSV文件
df_merged.to_csv(output_csv, index=False)

print(f"Merged data has been saved to {output_csv}")


In [None]:
import numpy as np
import pandas as pd

# 读取 CSV 文件
csv_path = r"E:\postgraduate\小论文\小论文\表格文件\id_tip_date_features.csv"
data = pd.read_csv(csv_path)

# 打印前20个 .npy 文件的特征维度
for index, row in data.head(20).iterrows():
    npy_location = row['npy_location']
    features = np.load(npy_location)
    print(f"File: {npy_location}, Shape: {features.shape}")


In [None]:
import numpy as np

# 读取 .npy 文件
file_path = r"E:\postgraduate\小论文\小论文\features\43-410141-N-00659_20181104.npy"
data = np.load(file_path)

# 打印数据
print(data)


# 只使用默认参数进行分类训练

In [None]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report, confusion_matrix


# 定义数据集类
class TipDataset(Dataset):
    def __init__(self, csv_path, transform=None):
        self.data = pd.read_csv(csv_path, encoding='gbk')
        self.transform = transform
        self.label_encoder = LabelEncoder()
        self.labels = self.label_encoder.fit_transform(self.data['tip'])

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        npy_location = self.data.iloc[idx]['npy_location']
        features = np.load(npy_location)
        label = self.labels[idx]
        if self.transform:
            features = self.transform(features)
        return torch.tensor(features, dtype=torch.float32), torch.tensor(label, dtype=torch.long)


# 定义Transformer模型
class TransformerModel(nn.Module):
    def __init__(self, input_dim, num_heads, num_layers, num_classes):
        super(TransformerModel, self).__init__()
        self.embedding = nn.Linear(input_dim, 512)
        encoder_layer = nn.TransformerEncoderLayer(d_model=512, nhead=num_heads, batch_first=True)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.fc = nn.Linear(512, num_classes)

    def forward(self, x):
        x = self.embedding(x)
        x = x.unsqueeze(1)  # 添加序列维度 (batch_size, seq_len, embed_dim)        x = self.transformer_encoder(x)
        x = x.squeeze(1)
        x = self.fc(x)
        return x


# 超参数
input_dim = 64  # 每个.npy文件的特征维度
num_heads = 8
num_layers = 6
learning_rate = 0.0001
num_epochs = 100
batch_size = 64  # 设置batch_size

# 数据加载
csv_path = r"E:\postgraduate\小论文\小论文\表格文件\id_tip_date_features_2.csv"
dataset = TipDataset(csv_path)
train_dataset, test_dataset = train_test_split(dataset, test_size=0.2, random_state=42)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# 初始化模型、损失函数和优化器
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = TransformerModel(input_dim, num_heads, num_layers, num_classes=len(dataset.label_encoder.classes_)).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)


# 训练模型
def train(model, device, train_loader, optimizer, criterion, epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % 10 == 0:
            print(f'Train Epoch: {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)} ({100. * batch_idx / len(train_loader):.0f}%)]\tLoss: {loss.item():.6f}')


# 测试模型
def test(model, device, test_loader, criterion, label_encoder):
    model.eval()
    test_loss = 0
    correct = 0
    all_preds = []
    all_targets = []

    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += criterion(output, target).item()
            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()

            # 收集预测和真实标签
            all_preds.extend(pred.cpu().numpy().flatten())
            all_targets.extend(target.cpu().numpy())

    test_loss /= len(test_loader.dataset)
    accuracy = 100. * correct / len(test_loader.dataset)
    print(f'\nTest set: Average loss: {test_loss:.4f}, Accuracy: {accuracy:.2f}%\n')

    # 打印每个类别的分类报告
    class_names = label_encoder.classes_
    report = classification_report(all_targets, all_preds, target_names=class_names, zero_division=0)
    print("Classification Report:\n", report)

    # 打印混淆矩阵
    cm = confusion_matrix(all_targets, all_preds)
    print("Confusion Matrix:\n", cm)

    return accuracy


# 主函数
if __name__ == "__main__":
    best_accuracy = 0.0
    for epoch in range(1, num_epochs + 1):
        train(model, device, train_loader, optimizer, criterion, epoch)
        accuracy = test(model, device, test_loader, criterion, dataset.label_encoder)
        if accuracy > best_accuracy:
            best_accuracy = accuracy
            torch.save(model.state_dict(), 'best_model.pth')
            print(f'Best model saved with accuracy: {best_accuracy:.2f}%')

In [None]:
import numpy as np
print(np.__version__)



In [None]:
import os


def check_invalid_values(image_path):
    """
    检查单张影像是否存在无效值，并统计无效值比例。
    :param image_path: 影像文件路径
    :return: 无效值比例字典，键为波段索引，值为无效值比例
    """
    invalid_ratios = {}
    try:
        with rasterio.open(image_path) as src:
            data = src.read()  # 读取影像数据 (C, H, W)
            num_bands, height, width = data.shape

            for band_idx in range(num_bands):
                band_data = data[band_idx]
                total_pixels = height * width
                invalid_pixels = np.isnan(band_data).sum() + np.isinf(band_data).sum()
                invalid_ratio = invalid_pixels / total_pixels
                invalid_ratios[band_idx + 1] = invalid_ratio  # 波段从1开始编号

    except Exception as e:
        print(f"Error processing {image_path}: {e}")
        return None

    return invalid_ratios


def analyze_dataset(root_dir, output_log):
    """
    分析整个数据集，统计每张影像的无效值比例，并将结果写入日志文件。
    :param root_dir: 数据集根目录
    :param output_log: 输出日志文件路径
    """
    invalid_images = []  # 存储包含无效值的影像路径
    total_images = 0  # 总影像数

    # 打开日志文件以写入模式
    with open(output_log, "w") as log_file:
        log_file.write("Invalid Value Analysis Report\n")
        log_file.write("=" * 50 + "\n")

        # 遍历目录中的所有 .tif 文件
        for dirpath, _, filenames in os.walk(root_dir):
            for filename in filenames:
                if filename.lower().endswith('.tif'):
                    total_images += 1
                    image_path = os.path.join(dirpath, filename)
                    invalid_ratios = check_invalid_values(image_path)

                    if invalid_ratios is not None and any(ratio > 0 for ratio in invalid_ratios.values()):
                        invalid_images.append((image_path, invalid_ratios))
                        log_file.write(f"Invalid values found in {image_path}:\n")
                        for band, ratio in invalid_ratios.items():
                            log_file.write(f"  Band {band}: {ratio:.2%} invalid pixels\n")

        # 写入总结信息
        log_file.write("\nAnalysis Summary:\n")
        log_file.write(f"Total images analyzed: {total_images}\n")
        log_file.write(f"Images with invalid values: {len(invalid_images)}\n")
        if invalid_images:
            log_file.write("List of images with invalid values:\n")
            for path, _ in invalid_images:
                log_file.write(f"  {path}\n")


# 使用示例
# s1_image_dir = r"E:\postgraduate\小论文\小论文\S1_processed_cropped_index"
s2_image_dir = r"E:\postgraduate\小论文\小论文\S2\ValidImages"

# 定义日志文件路径
# s1_log_file = os.path.join(s1_image_dir, "S1_invalid_values.txt")
s2_log_file = os.path.join(s2_image_dir, "S2_invalid_values.txt")

# print("Analyzing S1 images...")
# analyze_dataset(s1_image_dir, s1_log_file)

print("\nAnalyzing S2 images...")
analyze_dataset(s2_image_dir, s2_log_file)

print("Analysis completed. Check the log files for details.")


# CNN提取并拼接S1和S2数据特征

将拼接特征保存到本地，避免重复计算和内存不足。

## 提取3*3区域尺寸像素特征

In [None]:

import rasterio
from collections import defaultdict
from torch.nn import Module, Conv2d, ReLU, Flatten, Linear


# 定义一个简单的CNN模型
class SimpleCNN(Module):
    def __init__(self, in_channels, input_size):
        super(SimpleCNN, self).__init__()
        # 使用2x2卷积核
        self.conv1 = Conv2d(in_channels, 64, kernel_size=2, stride=1, padding=0)
        self.relu = ReLU()
        self.flatten = Flatten()

        # 计算卷积层输出的尺寸
        conv_output_size = (input_size[0] - 1) * (input_size[1] - 1) * 64
        self.fc1 = Linear(conv_output_size, 128)
        self.fc2 = Linear(128, 64)

    def forward(self, x):
        x = self.conv1(x)
        x = self.relu(x)
        x = self.flatten(x)
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        return x

class SampleSiteDataset:
    def __init__(self, s1_image_dir, s2_image_dir, output_dir):
        """
        初始化数据集
        :param s1_image_dir: S1影像根目录
        :param s2_image_dir: S2影像根目录
        :param output_dir: 特征保存目录
        """
        self.s1_image_dir = s1_image_dir
        self.s2_image_dir = s2_image_dir
        self.output_dir = output_dir
        os.makedirs(output_dir, exist_ok=True)

        # 递归获取所有 .tif 文件
        self.s1_images = self.get_all_tif_files(s1_image_dir)
        self.s2_images = self.get_all_tif_files(s2_image_dir)

        # 获取样地列表
        s1_sample_sites = set(self.s1_images.keys())
        s2_sample_sites = set(self.s2_images.keys())

        # 取交集，确保只处理 S1 和 S2 都存在的样地
        self.sample_sites = sorted(s1_sample_sites & s2_sample_sites)

        # 打印样地列表
        print(f"S1 Sample Sites: {s1_sample_sites}")
        print(f"S2 Sample Sites: {s2_sample_sites}")
        print(f"Common Sample Sites: {self.sample_sites}")

        # 加载简单的CNN模型
        with rasterio.open(next(iter(self.s1_images.values()))[0][1]) as src:
            s1_input_size = (src.height, src.width)
        with rasterio.open(next(iter(self.s2_images.values()))[0][1]) as src:
            s2_input_size = (src.height, src.width)

        self.cnn_s1 = SimpleCNN(in_channels=4, input_size=s1_input_size)  # 假设S1有4个通道
        self.cnn_s2 = SimpleCNN(in_channels=19, input_size=s2_input_size)  # 假设S2有19个通道
        self.cnn_s1.eval()
        self.cnn_s2.eval()

    def get_all_tif_files(self, root_dir):
        tif_files = defaultdict(list)
        for dirpath, _, filenames in os.walk(root_dir):
            for filename in filenames:
                if filename.lower().endswith('.tif'):
                    sample_site = filename.split('_')[0]
                    date = self.parse_date(filename)
                    tif_files[sample_site].append((date, os.path.join(dirpath, filename)))
        return tif_files

    def parse_date(self, filename):
        parts = filename.split('_')
        date_str = parts[-1].split('.')[0]
        return datetime.strptime(date_str, '%Y%m%d' if len(date_str) == 8 else '%Y-%m-%d')

    def merge_monthly_images(self, image_paths):
        with rasterio.open(image_paths[0][1]) as src:
            profile = src.profile
            data = np.zeros((src.count, src.height, src.width), dtype=np.float32)

        for _, path in image_paths:
            with rasterio.open(path) as src:
                data += src.read()

        data /= len(image_paths)
        return data, profile

    def extract_features(self, image_data):
        # 检查图像数据的形状
        print(f"Image data shape: {image_data.shape}")

        # 确保图像数据的形状正确
        if image_data.ndim != 3 or image_data.shape[0] not in [4, 19]:
            raise ValueError(f"Image data shape {image_data.shape} is not (C, H, W) with C in [4, 19].")

        # 直接将 image_data 转换为 torch.tensor，并添加一个批次维度
        tensor = torch.tensor(image_data).unsqueeze(0)
        print(f"Tensor shape: {tensor.shape}")

        features = self.cnn_s1(tensor) if tensor.shape[1] == 4 else self.cnn_s2(tensor)
        return features.squeeze(0).detach().numpy()

    def save_features(self, sample_site, date, combined_features):
        feature_file = os.path.join(self.output_dir, f"{sample_site}_{date.strftime('%Y%m%d')}.npy")
        np.save(feature_file, combined_features)
        print(f"Saved features to {feature_file}")

    def __len__(self):
        return len(self.sample_sites)

    def __getitem__(self, idx):
        sample_site = self.sample_sites[idx]

        # 获取该样地的所有年份的S1和S2影像
        s1_images = self.s1_images[sample_site]
        s2_images = self.s2_images[sample_site]

        # 按月度合并S1影像
        s1_features = {}
        for month, paths in self.group_by_month(s1_images).items():
            merged_image, _ = self.merge_monthly_images(paths)
            s1_features[month] = self.extract_features(merged_image)

        # 提取S2影像的特征
        s2_features = {}
        for date, path in s2_images:
            with rasterio.open(path) as src:
                image_data = src.read()
            s2_feature = self.extract_features(image_data)
            s2_features[date] = (path, s2_feature)

        # 拼接特征并保存
        for month in s1_features:
            for date, (path, s2_feature) in s2_features.items():
                if date.strftime('%Y-%m') == month:
                    combined_feature = np.concatenate((s1_features[month], s2_feature))
                    self.save_features(sample_site, date, combined_feature)
                    print(f"Sample {idx}, Date {date}: S1 feature shape: {s1_features[month].shape}, S2 feature shape: {s2_feature.shape}, Combined feature shape: {combined_feature.shape}, S2 Image Path: {path}")

    def group_by_month(self, images):
        grouped = defaultdict(list)
        for date, path in images:
            month = date.strftime('%Y-%m')
            grouped[month].append((date, path))
        return grouped

# 使用示例
s1_image_dir = r"E:\postgraduate\小论文\小论文\S1_processed_cropped_index"
s2_image_dir = r"E:\postgraduate\小论文\小论文\S2\cropped"
output_dir = r"E:\postgraduate\小论文\小论文\features"

dataset = SampleSiteDataset(s1_image_dir, s2_image_dir, output_dir)

for i in range(len(dataset)):
    dataset[i]


# 只简单提取提取中心像元特征并拼接S1\S2数据

In [None]:
import os
import numpy as np
import datetime
import rasterio

class SampleSiteDataset:
    def __init__(self, s1_image_dir, s2_image_dir, output_dir):
        """
        初始化数据集
        :param s1_image_dir: S1影像根目录
        :param s2_image_dir: S2影像根目录
        :param output_dir: 特征保存目录
        """
        self.s1_image_dir = s1_image_dir
        self.s2_image_dir = s2_image_dir
        self.output_dir = output_dir
        os.makedirs(output_dir, exist_ok=True)

        # 递归获取所有 .tif 文件
        self.s1_images = self.get_all_tif_files(s1_image_dir)
        self.s2_images = self.get_all_tif_files(s2_image_dir)

        # 获取样地列表
        s1_sample_sites = set(self.s1_images.keys())
        s2_sample_sites = set(self.s2_images.keys())

        # 取交集，确保只处理 S1 和 S2 都存在的样地
        self.sample_sites = sorted(s1_sample_sites & s2_sample_sites)

        # 打印样地列表
        print(f"S1 Sample Sites: {s1_sample_sites}")
        print(f"S2 Sample Sites: {s2_sample_sites}")
        print(f"Common Sample Sites: {self.sample_sites}")

    def get_all_tif_files(self, root_dir):
        tif_files = defaultdict(list)
        for dirpath, _, filenames in os.walk(root_dir):
            for filename in filenames:
                if filename.lower().endswith('.tif'):
                    sample_site = filename.split('_')[0]
                    date = self.parse_date(filename)
                    tif_files[sample_site].append((date, os.path.join(dirpath, filename)))
        return tif_files

    def parse_date(self, filename):
        parts = filename.split('_')
        date_str = parts[-1].split('.')[0]
        return datetime.datetime.strptime(date_str, '%Y%m%d' if len(date_str) == 8 else '%Y-%m-%d')

    def extract_center_pixel(self, image_data):
        # 检查图像是否为3x3
        if image_data.shape[1] != 3 or image_data.shape[2] != 3:
            raise ValueError("Image size must be 3x3")

        # 提取中心像元的波段值
        center_pixel_values = image_data[:, 1, 1]
        return center_pixel_values

    def save_features(self, sample_site, date, combined_features):
        feature_file = os.path.join(self.output_dir, f"{sample_site}_{date.strftime('%Y%m%d')}.npy")
        np.save(feature_file, combined_features)
        print(f"Saved features to {feature_file}")

    def __len__(self):
        return len(self.sample_sites)

    def __getitem__(self, idx):
        sample_site = self.sample_sites[idx]

        # 获取该样地的所有年份的S1和S2影像
        s1_images = self.s1_images[sample_site]
        s2_images = self.s2_images[sample_site]

        # 按月度合并S1影像
        s1_features = {}
        for month, paths in self.group_by_month(s1_images).items():
            merged_image, _ = self.merge_monthly_images(paths)
            s1_features[month] = self.extract_center_pixel(merged_image)

        # 提取S2影像的特征
        s2_features = {}
        for date, path in s2_images:
            with rasterio.open(path) as src:
                image_data = src.read()
            s2_feature = self.extract_center_pixel(image_data)
            s2_features[date] = (path, s2_feature)

        # 拼接特征并保存
        for month in s1_features:
            for date, (path, s2_feature) in s2_features.items():
                if date.strftime('%Y-%m') == month:
                    combined_feature = np.concatenate((s1_features[month], s2_feature))
                    self.save_features(sample_site, date, combined_feature)
                    print(f"Sample {idx}, Date {date}: S1 feature shape: {s1_features[month].shape}, S2 feature shape: {s2_feature.shape}, Combined feature shape: {combined_feature.shape}, S2 Image Path: {path}")

    def group_by_month(self, images):
        grouped = defaultdict(list)
        for date, path in images:
            month = date.strftime('%Y-%m')
            grouped[month].append((date, path))
        return grouped

    def merge_monthly_images(self, image_paths):
        with rasterio.open(image_paths[0][1]) as src:
            profile = src.profile
            data = np.zeros((src.count, src.height, src.width), dtype=np.float32)

        for _, path in image_paths:
            with rasterio.open(path) as src:
                data += src.read()

        data /= len(image_paths)
        return data, profile

# 使用示例
s1_image_dir = r"E:\postgraduate\小论文\小论文\S1_processed_cropped_index"
s2_image_dir = r"E:\postgraduate\小论文\小论文\S2\ValidImages"
output_dir = r"E:\postgraduate\小论文\小论文\features_(center)"

dataset = SampleSiteDataset(s1_image_dir, s2_image_dir, output_dir)

for i in range(len(dataset)):
    dataset[i]


## 检查特征矩阵是否合理

In [None]:


def check_npy_file(file_path):
    try:
        data = np.load(file_path)
        if np.isnan(data).any() or np.isinf(data).any():
            return True, file_path
        else:
            return False, file_path
    except Exception as e:
        print(f"Error loading {file_path}: {e}")
        return None, file_path


def batch_check_npy_files(folder_path):
    problematic_files = []

    # 获取文件夹中所有.npy文件的路径
    npy_files = [os.path.join(folder_path, f) for f in os.listdir(folder_path) if f.endswith('.npy')]

    for file_path in npy_files:
        has_issue, path = check_npy_file(file_path)
        if has_issue is not None and has_issue:
            problematic_files.append(path)

    return problematic_files

# 设置文件夹路径
folder_path = r"E:\postgraduate\小论文\小论文\features"

# 检查并输出有问题的文件
problematic_files = batch_check_npy_files(folder_path)

if problematic_files:
    print("The following files contain NaN or Inf values:")
    for file in problematic_files:
        print(file)
else:
    print("All .npy files are clean.")


# 提取日期信息到csv

In [None]:


def extract_date_from_npy_filenames(feature_dir, output_csv_path):
    """
    从 .npy 文件名中提取 sample_plot_id 和 date 信息，并保存到新的 CSV 文件中。

    :param feature_dir: 包含 .npy 文件的目录路径
    :param output_csv_path: 输出 CSV 文件的路径
    """
    data = []

    for filename in os.listdir(feature_dir):
        if filename.endswith('.npy'):
            try:
                # 解析文件名
                sample_site, date_str = filename.split('_')
                date_str = date_str.replace('.npy', '')
                date = datetime.strptime(date_str, '%Y%m%d')

                # 添加到数据列表
                data.append({
                    'sample_plot_id': sample_site,
                    'date': date.strftime('%Y-%m-%d')
                })
            except Exception as e:
                print(f"Error processing file {filename}: {e}")

    # 创建 DataFrame 并去重
    df_dates = pd.DataFrame(data)
    df_dates.drop_duplicates(inplace=True)

    # 保存到 CSV 文件
    df_dates.to_csv(output_csv_path, index=False, encoding='gbk')
    print(f"Date information saved to {output_csv_path}")

# 使用示例
if __name__ == "__main__":
    feature_dir = r"E:\postgraduate\小论文\小论文\features"
    output_csv_path = r"E:\postgraduate\小论文\小论文\日期信息.csv"
    extract_date_from_npy_filenames(feature_dir, output_csv_path)


# 关联特征和树种标签

将样地id、tip、date、影像特征矩阵文件位置关联起来，生成一个包含所有信息的表格。

In [None]:
import os
import pandas as pd
from datetime import datetime

# 文件路径
csv_file_1 = r"E:\postgraduate\小论文\小论文\表格文件\npy日期信息.csv"
csv_file_2 = r"E:\postgraduate\小论文\小论文\表格文件\树种识别训练表格（保留关键特征列，手动聚类至20个）.csv"
output_csv = r"E:\postgraduate\小论文\小论文\表格文件\id_tip_date_features.csv"
npy_directory = r"E:\postgraduate\小论文\小论文\features"

# 读取第一个CSV文件中的sample_plot_id和date列
df_date = pd.read_csv(csv_file_1, usecols=['sample_plot_id', 'date'], encoding="gbk")

# 将日期格式转换为统一格式 (YYYYMMDD)
df_date['date'] = pd.to_datetime(df_date['date']).dt.strftime('%Y%m%d')

# 读取第二个CSV文件中的sample_plot_id和tip列
df_tip = pd.read_csv(csv_file_2, usecols=['sample_plot_id', 'tip'], encoding="gbk")

# 获取所有.npy文件的字典 {sample_plot_id: [npy文件列表]}
npy_files_dict = {}
for filename in os.listdir(npy_directory):
    if filename.endswith('.npy'):
        sample_plot_id, date_str = filename.split('_')
        npy_files_dict.setdefault(sample_plot_id, []).append(os.path.join(npy_directory, filename))

# 合并两个DataFrame，以sample_plot_id为键
df_merged = df_date.merge(df_tip, on='sample_plot_id', how='inner')

# 添加.npy文件位置列
def get_npy_location(row):
    sample_plot_id = row['sample_plot_id']
    date_str = row['date']
    if sample_plot_id in npy_files_dict:
        for npy_file in npy_files_dict[sample_plot_id]:
            if f"_{date_str}.npy" in npy_file:
                return npy_file
    return 'N/A'

df_merged['npy_location'] = df_merged.apply(get_npy_location, axis=1)

# 输出到新的CSV文件
df_merged.to_csv(output_csv, index=False)

print(f"Merged data has been saved to {output_csv}")


In [None]:
import numpy as np
import pandas as pd

# 读取 CSV 文件
csv_path = r"E:\postgraduate\小论文\小论文\表格文件\id_tip_date_features.csv"
data = pd.read_csv(csv_path)

# 打印前20个 .npy 文件的特征维度
for index, row in data.head(20).iterrows():
    npy_location = row['npy_location']
    features = np.load(npy_location)
    print(f"File: {npy_location}, Shape: {features.shape}")


In [None]:
import numpy as np

# 读取 .npy 文件
file_path = r"E:\postgraduate\小论文\小论文\features\43-410141-N-00659_20181104.npy"
data = np.load(file_path)

# 打印数据
print(data)


# 只使用默认参数进行分类训练

## 训练提取的3*3区域特征

In [None]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report, confusion_matrix


# 定义数据集类
class TipDataset(Dataset):
    def __init__(self, csv_path, transform=None):
        self.data = pd.read_csv(csv_path, encoding='gbk')
        self.transform = transform
        self.label_encoder = LabelEncoder()
        self.labels = self.label_encoder.fit_transform(self.data['tip'])

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        npy_location = self.data.iloc[idx]['npy_location']
        features = np.load(npy_location)
        label = self.labels[idx]
        if self.transform:
            features = self.transform(features)
        return torch.tensor(features, dtype=torch.float32), torch.tensor(label, dtype=torch.long)


# 定义Transformer模型
class TransformerModel(nn.Module):
    def __init__(self, input_dim, num_heads, num_layers, num_classes):
        super(TransformerModel, self).__init__()
        self.embedding = nn.Linear(input_dim, 512)
        encoder_layer = nn.TransformerEncoderLayer(d_model=512, nhead=num_heads, batch_first=True)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.fc = nn.Linear(512, num_classes)

    def forward(self, x):
        x = self.embedding(x)
        x = x.unsqueeze(1)  # 添加序列维度 (batch_size, seq_len, embed_dim)        x = self.transformer_encoder(x)
        x = x.squeeze(1)
        x = self.fc(x)
        return x


# 超参数
input_dim = 64  # 每个.npy文件的特征维度
num_heads = 8
num_layers = 6
learning_rate = 0.0001
num_epochs = 100
batch_size = 64  # 设置batch_size

# 数据加载
csv_path = r"E:\postgraduate\小论文\小论文\表格文件\id_tip_date_features_2.csv"
dataset = TipDataset(csv_path)
train_dataset, test_dataset = train_test_split(dataset, test_size=0.2, random_state=42)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# 初始化模型、损失函数和优化器
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = TransformerModel(input_dim, num_heads, num_layers, num_classes=len(dataset.label_encoder.classes_)).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)


# 训练模型
def train(model, device, train_loader, optimizer, criterion, epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % 10 == 0:
            print(f'Train Epoch: {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)} ({100. * batch_idx / len(train_loader):.0f}%)]\tLoss: {loss.item():.6f}')


# 测试模型
def test(model, device, test_loader, criterion, label_encoder):
    model.eval()
    test_loss = 0
    correct = 0
    all_preds = []
    all_targets = []

    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += criterion(output, target).item()
            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()

            # 收集预测和真实标签
            all_preds.extend(pred.cpu().numpy().flatten())
            all_targets.extend(target.cpu().numpy())

    test_loss /= len(test_loader.dataset)
    accuracy = 100. * correct / len(test_loader.dataset)
    print(f'\nTest set: Average loss: {test_loss:.4f}, Accuracy: {accuracy:.2f}%\n')

    # 打印每个类别的分类报告
    class_names = label_encoder.classes_
    report = classification_report(all_targets, all_preds, target_names=class_names, zero_division=0)
    print("Classification Report:\n", report)

    # 打印混淆矩阵
    cm = confusion_matrix(all_targets, all_preds)
    print("Confusion Matrix:\n", cm)

    return accuracy


# 主函数
if __name__ == "__main__":
    best_accuracy = 0.0
    for epoch in range(1, num_epochs + 1):
        train(model, device, train_loader, optimizer, criterion, epoch)
        accuracy = test(model, device, test_loader, criterion, dataset.label_encoder)
        if accuracy > best_accuracy:
            best_accuracy = accuracy
            torch.save(model.state_dict(), 'best_model.pth')
            print(f'Best model saved with accuracy: {best_accuracy:.2f}%')

## 训练提取的中心像元特征

In [None]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report, confusion_matrix
import chardet

def detect_encoding(file_path):
    with open(file_path, 'rb') as f:
        raw_data = f.read()
    result = chardet.detect(raw_data)
    return result['encoding']


# 定义数据集类
class TipDataset(Dataset):
    def __init__(self, csv_path, transform=None):
        # 自动检测文件编码
        encoding = detect_encoding(csv_path)
        self.data = pd.read_csv(csv_path, encoding=encoding)
        self.transform = transform
        self.label_encoder = LabelEncoder()
        self.labels = self.label_encoder.fit_transform(self.data['tip'])

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        npy_location = self.data.iloc[idx]['npy_location']
        features = np.load(npy_location)
        label = self.labels[idx]
        if self.transform:
            features = self.transform(features)
        return torch.tensor(features, dtype=torch.float32), torch.tensor(label, dtype=torch.long)


# 定义Transformer模型
class TransformerModel(nn.Module):
    def __init__(self, input_dim, num_heads, num_layers, num_classes):
        super(TransformerModel, self).__init__()
        self.embedding = nn.Linear(input_dim, 512)
        encoder_layer = nn.TransformerEncoderLayer(d_model=512, nhead=num_heads, batch_first=True)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.fc = nn.Linear(512, num_classes)

    def forward(self, x):
        x = self.embedding(x)
        x = x.unsqueeze(1)  # 添加序列维度 (batch_size, seq_len, embed_dim)        x = self.transformer_encoder(x)
        x = x.squeeze(1)
        x = self.fc(x)
        return x


# 超参数
input_dim = 23  # 每个.npy文件的特征维度
num_heads = 8
num_layers = 6
learning_rate = 0.0001
num_epochs = 100
batch_size = 64  # 设置batch_size

# 数据加载
csv_path = r"E:\postgraduate\小论文\小论文\表格文件\id_tip_date_features(center).csv"
dataset = TipDataset(csv_path)
train_dataset, test_dataset = train_test_split(dataset, test_size=0.2, random_state=42)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# 初始化模型、损失函数和优化器
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = TransformerModel(input_dim, num_heads, num_layers, num_classes=len(dataset.label_encoder.classes_)).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)


# 训练模型
def train(model, device, train_loader, optimizer, criterion, epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % 10 == 0:
            print(f'Train Epoch: {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)} ({100. * batch_idx / len(train_loader):.0f}%)]\tLoss: {loss.item():.6f}')


# 测试模型
def test(model, device, test_loader, criterion, label_encoder):
    model.eval()
    test_loss = 0
    correct = 0
    all_preds = []
    all_targets = []

    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += criterion(output, target).item()
            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()

            # 收集预测和真实标签
            all_preds.extend(pred.cpu().numpy().flatten())
            all_targets.extend(target.cpu().numpy())

    test_loss /= len(test_loader.dataset)
    accuracy = 100. * correct / len(test_loader.dataset)
    print(f'\nTest set: Average loss: {test_loss:.4f}, Accuracy: {accuracy:.2f}%\n')

    # 打印每个类别的分类报告
    class_names = label_encoder.classes_
    report = classification_report(all_targets, all_preds, target_names=class_names, zero_division=0)
    print("Classification Report:\n", report)

    # 打印混淆矩阵
    cm = confusion_matrix(all_targets, all_preds)
    print("Confusion Matrix:\n", cm)

    return accuracy


# 主函数
if __name__ == "__main__":
    best_accuracy = 0.0
    for epoch in range(1, num_epochs + 1):
        train(model, device, train_loader, optimizer, criterion, epoch)
        accuracy = test(model, device, test_loader, criterion, dataset.label_encoder)
        if accuracy > best_accuracy:
            best_accuracy = accuracy
            torch.save(model.state_dict(), 'best_model.pth')
            print(f'Best model saved with accuracy: {best_accuracy:.2f}%')

## 分两个tip列训练

In [None]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report, confusion_matrix
import chardet

def detect_encoding(file_path):
    with open(file_path, 'rb') as f:
        raw_data = f.read()
    result = chardet.detect(raw_data)
    return result['encoding']

# 定义数据集类
class TipDataset(Dataset):
    def __init__(self, csv_path, transform=None):
        # 自动检测文件编码
        encoding = detect_encoding(csv_path)
        self.data = pd.read_csv(csv_path, encoding=encoding)
        self.transform = transform

        # 对 tip1 和 tip2 分别进行编码
        self.label_encoder_tip1 = LabelEncoder()
        self.labels_tip1 = self.label_encoder_tip1.fit_transform(self.data['tip1'])

        self.label_encoder_tip2 = LabelEncoder()
        self.labels_tip2 = self.label_encoder_tip2.fit_transform(self.data['tip2'])

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        npy_location = self.data.iloc[idx]['npy_location']
        features = np.load(npy_location)
        label_tip1 = self.labels_tip1[idx]
        label_tip2 = self.labels_tip2[idx]
        if self.transform:
            features = self.transform(features)
        return torch.tensor(features, dtype=torch.float32), torch.tensor(label_tip1, dtype=torch.long), torch.tensor(label_tip2, dtype=torch.long)

# 定义Transformer模型
class TransformerModel(nn.Module):
    def __init__(self, input_dim, num_heads, num_layers, num_classes_tip1, num_classes_tip2):
        super(TransformerModel, self).__init__()
        self.embedding = nn.Linear(input_dim, 512)
        encoder_layer = nn.TransformerEncoderLayer(d_model=512, nhead=num_heads, batch_first=True)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.fc_tip1 = nn.Linear(512, num_classes_tip1)  # 输出 tip1 类别
        self.fc_tip2 = nn.Linear(512, num_classes_tip2)  # 输出 tip2 类别

    def forward(self, x):
        x = self.embedding(x)
        x = x.unsqueeze(1)  # 添加序列维度 (batch_size, seq_len, embed_dim)
        x = self.transformer_encoder(x)
        x = x.squeeze(1)
        out_tip1 = self.fc_tip1(x)
        out_tip2 = self.fc_tip2(x)
        return out_tip1, out_tip2

# 超参数
input_dim = 23  # 每个.npy文件的特征维度
num_heads = 8
num_layers = 6
learning_rate = 0.0001
num_epochs = 100
batch_size = 64  # 设置batch_size

# 数据加载
csv_path = r"E:\postgraduate\小论文\小论文\表格文件\id_tip_date_features(center).csv"
dataset = TipDataset(csv_path)
train_dataset, test_dataset = train_test_split(dataset, test_size=0.2, random_state=42)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# 初始化模型、损失函数和优化器
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = TransformerModel(input_dim, num_heads, num_layers,
                         num_classes_tip1=len(dataset.label_encoder_tip1.classes_),
                         num_classes_tip2=len(dataset.label_encoder_tip2.classes_)).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# 训练模型
def train(model, device, train_loader, optimizer, criterion, epoch):
    model.train()
    for batch_idx, (data, target_tip1, target_tip2) in enumerate(train_loader):
        data, target_tip1, target_tip2 = data.to(device), target_tip1.to(device), target_tip2.to(device)
        optimizer.zero_grad()
        output_tip1, output_tip2 = model(data)
        loss_tip1 = criterion(output_tip1, target_tip1)
        loss_tip2 = criterion(output_tip2, target_tip2)
        total_loss = loss_tip1 + loss_tip2  # 可以根据实际情况调整权重
        total_loss.backward()
        optimizer.step()
        if batch_idx % 10 == 0:
            print(f'Train Epoch: {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)} ({100. * batch_idx / len(train_loader):.0f}%)]\tLoss: {total_loss.item():.6f}')

# 测试模型
def test(model, device, test_loader, criterion, label_encoder_tip1, label_encoder_tip2):
    model.eval()
    test_loss = 0
    correct_tip1 = 0
    correct_tip2 = 0
    all_preds_tip1 = []
    all_targets_tip1 = []
    all_preds_tip2 = []
    all_targets_tip2 = []

    with torch.no_grad():
        for data, target_tip1, target_tip2 in test_loader:
            data, target_tip1, target_tip2 = data.to(device), target_tip1.to(device), target_tip2.to(device)
            output_tip1, output_tip2 = model(data)
            loss_tip1 = criterion(output_tip1, target_tip1)
            loss_tip2 = criterion(output_tip2, target_tip2)
            test_loss += (loss_tip1 + loss_tip2).item()

            pred_tip1 = output_tip1.argmax(dim=1, keepdim=True)
            pred_tip2 = output_tip2.argmax(dim=1, keepdim=True)

            correct_tip1 += pred_tip1.eq(target_tip1.view_as(pred_tip1)).sum().item()
            correct_tip2 += pred_tip2.eq(target_tip2.view_as(pred_tip2)).sum().item()

            # 收集预测和真实标签
            all_preds_tip1.extend(pred_tip1.cpu().numpy().flatten())
            all_targets_tip1.extend(target_tip1.cpu().numpy())
            all_preds_tip2.extend(pred_tip2.cpu().numpy().flatten())
            all_targets_tip2.extend(target_tip2.cpu().numpy())

    test_loss /= len(test_loader.dataset)
    accuracy_tip1 = 100. * correct_tip1 / len(test_loader.dataset)
    accuracy_tip2 = 100. * correct_tip2 / len(test_loader.dataset)
    print(f'\nTest set: Average loss: {test_loss:.4f}, Accuracy tip1: {accuracy_tip1:.2f}%, Accuracy tip2: {accuracy_tip2:.2f}%\n')

    # 打印每个类别的分类报告
    class_names_tip1 = label_encoder_tip1.classes_
    report_tip1 = classification_report(all_targets_tip1, all_preds_tip1, target_names=class_names_tip1, zero_division=0)
    print("Classification Report for tip1:\n", report_tip1)

    class_names_tip2 = label_encoder_tip2.classes_
    report_tip2 = classification_report(all_targets_tip2, all_preds_tip2, target_names=class_names_tip2, zero_division=0)
    print("Classification Report for tip2:\n", report_tip2)

    # 打印混淆矩阵
    cm_tip1 = confusion_matrix(all_targets_tip1, all_preds_tip1)
    print("Confusion Matrix for tip1:\n", cm_tip1)

    cm_tip2 = confusion_matrix(all_targets_tip2, all_preds_tip2)
    print("Confusion Matrix for tip2:\n", cm_tip2)

    return accuracy_tip1, accuracy_tip2

# 主函数
if __name__ == "__main__":
    best_accuracy_tip1 = 0.0
    best_accuracy_tip2 = 0.0
    for epoch in range(1, num_epochs + 1):
        train(model, device, train_loader, optimizer, criterion, epoch)
        accuracy_tip1, accuracy_tip2 = test(model, device, test_loader, criterion, dataset.label_encoder_tip1, dataset.label_encoder_tip2)
        if accuracy_tip1 > best_accuracy_tip1 or accuracy_tip2 > best_accuracy_tip2:
            best_accuracy_tip1 = accuracy_tip1
            best_accuracy_tip2 = accuracy_tip2
            torch.save(model.state_dict(), 'best_model.pth')
            print(f'Best model saved with accuracy tip1: {best_accuracy_tip1:.2f}%, accuracy tip2: {best_accuracy_tip2:.2f}%')
