In [10]:
import os
import numpy as np
import torch

def load_3levels_data(base_dir, year, levels=("225hPa", "500hPa", "700hPa")):
    """加载某一年在3个高度层上的 u、v、z 数据，返回 shape = (N, 3, H, W) 的数组"""
    u_list, v_list, z_list = [], [], []

    for level in levels:
        level_path = os.path.join(base_dir, level)
        print(os.path.join(level_path, f"UWind{year}.npy"))
        u = np.load(os.path.join(level_path, f"UWind{year}.npy"))
        v = np.load(os.path.join(level_path, f"VWind{year}.npy"))
        z = np.load(os.path.join(level_path, f"Geopotential{year}.npy"))

        u_list.append(u)
        v_list.append(v)
        z_list.append(z)

    # 堆叠为 shape: (N, 3, H, W)
    u = np.stack(u_list, axis=1)
    v= np.stack(v_list, axis=1)
    z = np.stack(z_list, axis=1)
    assert z.shape == u.shape == v.shape, "三个变量的shape必须一致"
    data = np.concatenate([u, v, z], axis=1)  # shape: (N, 9, 40, 40)
    print(data.shape)
    return torch.tensor(data, dtype=torch.float32)  # 转换为 Tensor，便于送入CNN

In [11]:
def process_multiple_years_separated_levels(base_dir, years, crop_size=40):
    """
    处理多个年份的数据（气压层分文件夹），拼接为统一张量。
    返回 shape: (total_samples, 9, crop_size, crop_size)
    """
    all_data = []

    for year in years:
        try:
            tensor = load_3levels_data(base_dir, year)
            all_data.append(tensor)
        except Exception as e:
            print(f"[!] 加载 {year} 失败: {e}")
            continue

    if not all_data:
        raise ValueError("未加载到任何有效数据")

    return torch.cat(all_data, dim=0)


In [12]:
years = [2001, 2002, 2003,2004,2005]
base_dir = "E:/Dataset/ERA5/Extracted/"  # 存放 225hpa、500hpa、700hpa 子目录的根目录

data_tensor = process_multiple_years_separated_levels(base_dir, years)
print(data_tensor.shape)  # 应该是 (N_total, 9, 40, 40)


E:/Dataset/ERA5/Extracted/225hPa\UWind2001.npy
E:/Dataset/ERA5/Extracted/500hPa\UWind2001.npy
E:/Dataset/ERA5/Extracted/700hPa\UWind2001.npy
(863, 9, 40, 40)
E:/Dataset/ERA5/Extracted/225hPa\UWind2002.npy
E:/Dataset/ERA5/Extracted/500hPa\UWind2002.npy
E:/Dataset/ERA5/Extracted/700hPa\UWind2002.npy
(804, 9, 40, 40)
E:/Dataset/ERA5/Extracted/225hPa\UWind2003.npy
E:/Dataset/ERA5/Extracted/500hPa\UWind2003.npy
E:/Dataset/ERA5/Extracted/700hPa\UWind2003.npy
(767, 9, 40, 40)
E:/Dataset/ERA5/Extracted/225hPa\UWind2004.npy
E:/Dataset/ERA5/Extracted/500hPa\UWind2004.npy
E:/Dataset/ERA5/Extracted/700hPa\UWind2004.npy
(1092, 9, 40, 40)
E:/Dataset/ERA5/Extracted/225hPa\UWind2005.npy
E:/Dataset/ERA5/Extracted/500hPa\UWind2005.npy
E:/Dataset/ERA5/Extracted/700hPa\UWind2005.npy
(769, 9, 40, 40)
torch.Size([4295, 9, 40, 40])


In [13]:
# 假设数据是 PyTorch Tensor（形状：[样本数, 通道数, 高度, 宽度]）
data = data_tensor  # 示例数据（替换为真实数据）

# 划分比例（不打乱顺序，按原始顺序连续划分）
train_ratio = 0.8
val_ratio = 0.1
test_ratio = 0.1

# 计算各部分样本数（确保整数，按原始顺序连续划分）
total_samples = data.shape[0]
train_size = int(total_samples * train_ratio)  # 4295*0.8=3436
val_size = int(total_samples * val_ratio)      # 4295*0.1=429
test_size = total_samples - train_size - val_size  # 4295-3436-429=430

In [14]:
# 训练集：前 train_size 个样本（索引 0 → train_size-1）
train_indices = slice(0, train_size)

# 验证集：中间 val_size 个样本（索引 train_size → train_size+val_size-1）
val_indices = slice(train_size, train_size + val_size)

# 测试集：剩余样本（索引 train_size+val_size → 末尾）
test_indices = slice(train_size + val_size, None)

In [15]:
# 按连续索引切片提取数据（保持原始顺序）
train_data = data[train_indices].numpy()
val_data = data[val_indices].numpy()
test_data = data[test_indices].numpy()

# 打印各部分形状（验证是否正确）
print("训练集形状:", train_data.shape)  # 输出 (3436, 9, 40, 40)
print("验证集形状:", val_data.shape)    # 输出 (429, 9, 40, 40)
print("测试集形状:", test_data.shape)   # 输出 (430, 9, 40, 40)

训练集形状: (3436, 9, 40, 40)
验证集形状: (429, 9, 40, 40)
测试集形状: (430, 9, 40, 40)


In [17]:
# 保存路径（可自定义）
save_dir = "./dataset/"
np.save(f"{save_dir}train.npy", train_data)
np.save(f"{save_dir}val.npy", val_data)
np.save(f"{save_dir}test.npy", test_data)

print("数据集按顺序划分及保存完成！")

数据集按顺序划分及保存完成！


In [18]:
import numpy as np
from collections import defaultdict

def process_cma_data(input_path):
    """ 处理CMA台风最佳路径数据 """
    yearly_data = defaultdict(list)
    
    with open(input_path, 'r') as f:
        for line in f:
            # 跳过标题行（示例中的66666开头行）
            if line.startswith('66666'):
                continue
            
            # 清洗数据行
            clean_line = ' '.join(line.strip().split())
            parts = clean_line.split()
            
            # 验证数据有效性
            if len(parts) < 6:
                continue

            try:
                # 解析时间和年份
                timestamp = parts[0]
                year = int(timestamp[:4])
                
                # 筛选目标年份
                if 2001 <= year <= 2005:
                    # 提取并转换特征数据
                    lat = float(parts[2])/10    # 纬度（转换为实际度数）
                    lon = float(parts[3])/10     # 经度（转换为实际度数）
                    pressure = float(parts[4])   # 中心气压（hPa）
                    wind = float(parts[5])      # 最大风速（m/s）
                    
                    # 组合特征数组
                    features = np.array([lat, lon, pressure, wind])
                    yearly_data[year].append(features)
                    
            except (ValueError, IndexError) as e:
                print(f"解析错误：{line.strip()} | 错误信息：{str(e)}")
                continue

    # 保存每年数据
    for year, data in yearly_data.items():
        if data:
            output_file = f"CH{year}BST.npy"
            np.save(output_file, np.array(data))
            print(f"已保存 {year} 年数据：{output_file}（{len(data)} 条记录）")

if __name__ == "__main__":
    for i in range(2001, 2006):
        input_file = "E:\Dataset\CMA\CMABSTdata\CH"+str(i)+"BST.txt"  # 请替换为实际文件路径
        process_cma_data(input_file)

已保存 2001 年数据：CH2001BST.npy（863 条记录）
已保存 2002 年数据：CH2002BST.npy（804 条记录）
已保存 2003 年数据：CH2003BST.npy（767 条记录）
已保存 2004 年数据：CH2004BST.npy（1092 条记录）
已保存 2005 年数据：CH2005BST.npy（769 条记录）


In [19]:
import numpy as np
data_2003 = np.load("CH2003BST.npy")
print(f"2003年数据维度：{data_2003.shape}")
print("首条记录样本：", data_2003[0])

2003年数据维度：(767, 4)
首条记录样本： [   6.9  163.1 1004.    12. ]


In [20]:
import numpy as np
import os

def merge_and_split(data_years=(2001, 2005), ratios=(0.8, 0.1, 0.1)):
    """
    合并指定年份数据并按比例顺序划分数据集
    参数：
        data_years : (start_year, end_year) 年份范围
        ratios     : (train, val, test) 划分比例
    """
    # 合并所有数据
    merged = []
    for year in range(data_years[0], data_years[1]+1):
        file_path = f"CH{year}BST.npy"
        if os.path.exists(file_path):
            data = np.load(file_path)
            merged.append(data)
            print(f"已加载 {year} 年数据：{data.shape[0]} 条记录")
    
    if not merged:
        raise ValueError("未找到任何数据文件")
    
    full_data = np.concatenate(merged, axis=0)
    print(f"\n总数据量：{full_data.shape[0]} 条记录")

    # 按顺序划分数据集
    total = full_data.shape[0]
    train_end = int(total * ratios[0])
    val_end = train_end + int(total * ratios[1])

    train = full_data[:train_end]
    val = full_data[train_end:val_end]
    test = full_data[val_end:]

    # 验证划分比例
    print("\n数据集划分结果：")
    print(f"训练集：{train.shape[0]} 条 ({train.shape[0]/total:.1%})")
    print(f"验证集：{val.shape[0]} 条 ({val.shape[0]/total:.1%})")
    print(f"测试集：{test.shape[0]} 条 ({test.shape[0]/total:.1%})")

    # 保存数据集
    np.save("dataset_train.npy", train)
    np.save("dataset_val.npy", val)
    np.save("dataset_test.npy", test)
    print("\n数据集已保存为：")
    print("  - dataset_train.npy")
    print("  - dataset_val.npy")
    print("  - dataset_test.npy")

In [21]:
if __name__ == "__main__":
    # 使用示例（处理2001-2005年数据）
    merge_and_split(data_years=(2001, 2005))

已加载 2001 年数据：863 条记录
已加载 2002 年数据：804 条记录
已加载 2003 年数据：767 条记录
已加载 2004 年数据：1092 条记录
已加载 2005 年数据：769 条记录

总数据量：4295 条记录

数据集划分结果：
训练集：3436 条 (80.0%)
验证集：429 条 (10.0%)
测试集：430 条 (10.0%)

数据集已保存为：
  - dataset_train.npy
  - dataset_val.npy
  - dataset_test.npy


In [1]:
import pandas as pd

# 定义输入输出路径
input_txt_path = "/home/hy4080/wplyh/CMA/CMABSTdata/CH2005BST.txt"  # 输入的TXT文件路径
output_csv_path = "bst_data_2005.csv"  # 输出的CSV文件路径

# 读取TXT文件（假设列之间由空格分隔）
df = pd.read_csv(
    input_txt_path, 
    sep=r'\s+',          # 正则表达式匹配任意多空格
    header=None,         # 无表头
    names=[              # 自定义列名（根据实际数据调整）
        'Year', 
        'Month', 
        'Day', 
        'Hour', 
        'Lon', 
        'Lat', 
        'Pressure', 
        'WindSpeed'
    ],
    engine='python'      # 确保正则分隔符兼容性
)

# 保存为CSV
df.to_csv(output_csv_path, index=False)  # 不保留行索引

print(f"转换完成！文件已保存至 {output_csv_path}")

转换完成！文件已保存至 bst_data_2005.csv
