# KNMI 气象数据处理

本笔记本用于处理荷兰皇家气象研究所(KNMI)的气象数据，为Gelderse Poort再野化与NDVI生产力研究项目提供支持。

In [1]:
# import the necessary packages
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from datetime import datetime

# setting of directory
output_dir = 'weatherdata_postprocessing'
if not os.path.exists(output_dir):
    os.makedirs(output_dir)

## 1. 读取原始数据

In [2]:
raw_data_path = 'weatherdata_raw/weather.csv'

raw_data = pd.read_csv(raw_data_path, comment='#',  sep=',', header=0)

print(raw_data.head())

   STN  YYYYMMDD  TG  TN   TX      Q  RH  UG  EV24
0  275  20140101  66  41   83  327.0   6  85   4.0
1  275  20140102  84  57  105  147.0  13  86   2.0
2  275  20140103  82  52  117  178.0  78  85   2.0
3  275  20140104  70  49   91  209.0   3  84   3.0
4  275  20140105  53  34   72  327.0  20  88   4.0


## 2. 数据清理与转换

In [5]:
# transform of the date       
raw_data['date'] = pd.to_datetime(raw_data['YYYYMMDD'], format='%Y%m%d')

# transform of the unit and rename the column
processed_data = pd.DataFrame()
processed_data['date'] = raw_data['date']
processed_data['station_id'] = raw_data['STN']

# add the station name (according to the station id)
station_names = {275: 'Deelen', 391: 'Arcen'}
processed_data['station_name'] = processed_data['station_id'].map(station_names)

# transform of the temperature unit (0.1°C to °C)
processed_data['temp_mean_c'] = raw_data['TG'] / 10.0
processed_data['temp_min_c'] = raw_data['TN'] / 10.0
processed_data['temp_max_c'] = raw_data['TX'] / 10.0

# transform of the precipitation unit (0.1mm to mm)
processed_data['precip_mm'  ] = raw_data['RH'] / 10.0
# handle the special value (-1 represents <0.05mm)
processed_data.loc[raw_data['RH'] == -1, 'precip_mm'] = 0.025

# other datasets
processed_data['radiation_j_cm2'] = raw_data['Q']
processed_data['rel_humidity_pct'] = raw_data['UG']
processed_data['et0_mm'] = raw_data['EV24'] / 10.0

# calculate the water balance
processed_data['water_balance_mm'] = processed_data['precip_mm'] - processed_data['et0_mm']

# 检查数据中的异常值
print("检查数据中的异常值:")
# 检查负值
negative_values = {col: processed_data[col][processed_data[col] < 0].count() 
                  for col in processed_data.select_dtypes(include=[np.number]).columns}
print("负值统计:", {k: v for k, v in negative_values.items() if v > 0})

# 检查缺失值
missing_values = processed_data.isnull().sum()
print("缺失值统计:", missing_values[missing_values > 0] if any(missing_values > 0) else "无缺失值")

# 检查异常大的值（超过3个标准差）
numeric_cols = processed_data.select_dtypes(include=[np.number]).columns
outliers = {}
for col in numeric_cols:
    mean = processed_data[col].mean()
    std = processed_data[col].std()
    outlier_count = processed_data[processed_data[col] > mean + 3*std].shape[0]
    if outlier_count > 0:
        outliers[col] = outlier_count
print("异常大值统计 (>3σ):", outliers if outliers else "无明显异常值")

# view the processed data
print("\n处理后的数据预览:")
print(processed_data.head())

检查数据中的异常值:
负值统计: {'temp_mean_c': np.int64(205), 'temp_min_c': np.int64(1029), 'temp_max_c': np.int64(54), 'water_balance_mm': np.int64(4711)}
缺失值统计: radiation_j_cm2     7
et0_mm              7
water_balance_mm    7
dtype: int64
异常大值统计 (>3σ): {'temp_min_c': 1, 'temp_max_c': 4, 'precip_mm': 154, 'water_balance_mm': 141}

处理后的数据预览:
        date  station_id station_name  temp_mean_c  temp_min_c  temp_max_c  \
0 2014-01-01         275       Deelen          6.6         4.1         8.3   
1 2014-01-02         275       Deelen          8.4         5.7        10.5   
2 2014-01-03         275       Deelen          8.2         5.2        11.7   
3 2014-01-04         275       Deelen          7.0         4.9         9.1   
4 2014-01-05         275       Deelen          5.3         3.4         7.2   

   precip_mm  radiation_j_cm2  rel_humidity_pct  et0_mm  water_balance_mm  
0        0.6            327.0                85     0.4               0.2  
1        1.3            147.0                86 

In [7]:
# 1. 缺失值处理
# 检查缺失值的分布情况
missing_rows = processed_data[processed_data.isnull().any(axis=1)]
print(f"包含缺失值的行数: {len(missing_rows)}")
if len(missing_rows) > 0:
    print("缺失值行的信息:")
    print(missing_rows[['date', 'station_id', 'station_name']])

    # 使用线性插值填充缺失值
    print("使用线性插值填充缺失值...")
    # 按站点分组进行插值
    for station_id in processed_data['station_id'].unique():
        station_mask = processed_data['station_id'] == station_id
        processed_data.loc[station_mask, 'radiation_j_cm2'] = processed_data.loc[station_mask, 'radiation_j_cm2'].interpolate(method='linear')
        processed_data.loc[station_mask, 'et0_mm'] = processed_data.loc[station_mask, 'et0_mm'].interpolate(method='linear')
        
    # 重新计算水分平衡
    processed_data['water_balance_mm'] = processed_data['precip_mm'] - processed_data['et0_mm']
    
    # 检查是否还有剩余的缺失值
    remaining_missing = processed_data.isnull().sum()
    print("填充后剩余的缺失值:", remaining_missing[remaining_missing > 0] if any(remaining_missing > 0) else "已全部填充")

# 2. 异常值处理
# 温度的负值是正常的，不需要特殊处理
print("\n温度负值是正常的气象现象，已保留。")
print(f"最低温度范围: {processed_data['temp_min_c'].min():.1f}°C 到 {processed_data['temp_min_c'].max():.1f}°C")

# 对于极端降水值，我们可以检查并可视化
extreme_precip = processed_data[processed_data['precip_mm'] > processed_data['precip_mm'].mean() + 3*processed_data['precip_mm'].std()]
print(f"\n极端降水记录数: {len(extreme_precip)}")
if len(extreme_precip) > 0:
    print("极端降水值范围:", f"{extreme_precip['precip_mm'].min():.1f}mm 到 {extreme_precip['precip_mm'].max():.1f}mm")
    print("时间分布:")
    print(extreme_precip['date'].dt.year.value_counts().sort_index())
    
    # 检查最极端的几个值
    print("\n最高降水记录:")
    print(extreme_precip.sort_values('precip_mm', ascending=False).head(5)[['date', 'station_name', 'precip_mm']])
    
    # 对于极端异常值(如>200mm/日)，可以考虑标记但不修改
    extremely_high_precip = processed_data[processed_data['precip_mm'] > 200]
    if len(extremely_high_precip) > 0:
        print(f"\n特别高的日降水量(>200mm): {len(extremely_high_precip)}个记录")
        print(extremely_high_precip[['date', 'station_name', 'precip_mm']])
        
        # 添加标记列但不改变原始值
        processed_data['precip_extreme_flag'] = processed_data['precip_mm'] > 200
    else:
        processed_data['precip_extreme_flag'] = False

# 重新计算异常值统计
print("\n处理后的异常值统计:")
outliers = {}
for col in numeric_cols:
    if col != 'precip_extreme_flag':  # 跳过标记列
        mean = processed_data[col].mean()
        std = processed_data[col].std()
        outlier_count = processed_data[processed_data[col] > mean + 3*std].shape[0]
        if outlier_count > 0:
            outliers[col] = outlier_count
print("异常大值统计 (>3σ):", outliers if outliers else "无明显异常值")

# 查看处理后的数据
print("\n处理后的数据预览:")
print(processed_data.head())

# ... existing code ...

包含缺失值的行数: 0

温度负值是正常的气象现象，已保留。
最低温度范围: -13.6°C 到 24.4°C

极端降水记录数: 154
极端降水值范围: 17.0mm 到 131.6mm
时间分布:
date
2014    16
2015    15
2016    14
2017    18
2018     5
2019    15
2020    13
2021    11
2022    15
2023    32
Name: count, dtype: int64

最高降水记录:
           date station_name  precip_mm
208  2014-07-28       Deelen      131.6
2710 2021-06-03       Deelen       68.5
880  2016-05-30       Deelen       66.6
4534 2016-06-01        Arcen       64.0
3504 2023-08-06       Deelen       52.3

处理后的异常值统计:
异常大值统计 (>3σ): {'temp_min_c': 1, 'temp_max_c': 4, 'precip_mm': 154, 'water_balance_mm': 141}

处理后的数据预览:
        date  station_id station_name  temp_mean_c  temp_min_c  temp_max_c  \
0 2014-01-01         275       Deelen          6.6         4.1         8.3   
1 2014-01-02         275       Deelen          8.4         5.7        10.5   
2 2014-01-03         275       Deelen          8.2         5.2        11.7   
3 2014-01-04         275       Deelen          7.0         4.9         9.1   
4 2

## 3. 计算累积指标

In [8]:
# 确保数据按日期排序
processed_data = processed_data.sort_values(['station_id', 'date']).reset_index(drop=True)

# 计算滚动累积指标
result_data = processed_data.copy()

# 为每个站点单独计算
for station_id in result_data['station_id'].unique():
    mask = result_data['station_id'] == station_id
    
    # 计算滚动累积降水量
    result_data.loc[mask, 'precip_7day_mm'] = result_data.loc[mask, 'precip_mm'].rolling(window=7, min_periods=5).sum()
    result_data.loc[mask, 'precip_30day_mm'] = result_data.loc[mask, 'precip_mm'].rolling(window=30, min_periods=25).sum()
    result_data.loc[mask, 'precip_90day_mm'] = result_data.loc[mask, 'precip_mm'].rolling(window=90, min_periods=75).sum()
    
    # 计算滚动水分平衡
    result_data.loc[mask, 'water_balance_7day_mm'] = result_data.loc[mask, 'water_balance_mm'].rolling(window=7, min_periods=5).sum()
    result_data.loc[mask, 'water_balance_30day_mm'] = result_data.loc[mask, 'water_balance_mm'].rolling(window=30, min_periods=25).sum()

# 查看计算后的数据
print("\n添加累积指标后的数据示例:")
print(result_data.head())


添加累积指标后的数据示例:
        date  station_id station_name  temp_mean_c  temp_min_c  temp_max_c  \
0 2014-01-01         275       Deelen          6.6         4.1         8.3   
1 2014-01-02         275       Deelen          8.4         5.7        10.5   
2 2014-01-03         275       Deelen          8.2         5.2        11.7   
3 2014-01-04         275       Deelen          7.0         4.9         9.1   
4 2014-01-05         275       Deelen          5.3         3.4         7.2   

   precip_mm  radiation_j_cm2  rel_humidity_pct  et0_mm  water_balance_mm  \
0        0.6            327.0                85     0.4               0.2   
1        1.3            147.0                86     0.2               1.1   
2        7.8            178.0                85     0.2               7.6   
3        0.3            209.0                84     0.3               0.0   
4        2.0            327.0                88     0.4               1.6   

   precip_extreme_flag  precip_7day_mm  precip_30day_

## 4. 计算干旱指标

In [9]:
# 为每个站点单独计算简化版的干旱指标
final_data = result_data.copy()

for station_id in final_data['station_id'].unique():
    station_mask = final_data['station_id'] == station_id
    station_data = final_data[station_mask].copy()
    
    # 为每个日历日计算历史均值和标准差
    station_data['month_day'] = station_data['date'].dt.strftime('%m-%d')
    
    # 计算每个日历日的统计值
    agg_data = station_data.groupby('month_day').agg({
        'precip_30day_mm': ['mean', 'std'],
        'water_balance_30day_mm': ['mean', 'std']
    })
    
    # 平展多重索引
    agg_data.columns = ['_'.join(col) for col in agg_data.columns]
    
    # 将统计值合并回原始数据
    station_data = station_data.merge(agg_data, left_on='month_day', right_index=True)
    
    # 计算简化版的SPI和SPEI (基于Z-分数)
    station_data['simplified_spi_1month'] = np.nan
    station_data['simplified_spei_1month'] = np.nan
    
    # 只在标准差大于0的地方计算标准化指数
    valid_mask = station_data['precip_30day_mm_std'] > 0
    station_data.loc[valid_mask, 'simplified_spi_1month'] = (
        station_data.loc[valid_mask, 'precip_30day_mm'] - 
        station_data.loc[valid_mask, 'precip_30day_mm_mean']
    ) / station_data.loc[valid_mask, 'precip_30day_mm_std']
    
    valid_mask = station_data['water_balance_30day_mm_std'] > 0
    station_data.loc[valid_mask, 'simplified_spei_1month'] = (
        station_data.loc[valid_mask, 'water_balance_30day_mm'] - 
        station_data.loc[valid_mask, 'water_balance_30day_mm_mean']
    ) / station_data.loc[valid_mask, 'water_balance_30day_mm_std']
    
    # 只保留需要的列
    keep_columns = final_data.columns.tolist() + ['simplified_spi_1month', 'simplified_spei_1month']
    station_data = station_data[keep_columns]
    
    # 更新最终数据
    final_data.loc[station_mask] = station_data

# 查看最终数据
print("\n最终处理后的数据示例:")
print(final_data.head())


最终处理后的数据示例:
        date  station_id station_name  temp_mean_c  temp_min_c  temp_max_c  \
0 2014-01-01         275       Deelen          6.6         4.1         8.3   
1 2014-01-02         275       Deelen          8.4         5.7        10.5   
2 2014-01-03         275       Deelen          8.2         5.2        11.7   
3 2014-01-04         275       Deelen          7.0         4.9         9.1   
4 2014-01-05         275       Deelen          5.3         3.4         7.2   

   precip_mm  radiation_j_cm2  rel_humidity_pct  et0_mm  water_balance_mm  \
0        0.6            327.0                85     0.4               0.2   
1        1.3            147.0                86     0.2               1.1   
2        7.8            178.0                85     0.2               7.6   
3        0.3            209.0                84     0.3               0.0   
4        2.0            327.0                88     0.4               1.6   

   precip_extreme_flag  precip_7day_mm  precip_30day_mm

## 5. 识别极端事件

In [10]:
# 定义识别极端事件的函数
def identify_extreme_events(df, spi_threshold=-1.5, spei_threshold=-1.5, precip_threshold=30):
    """识别极端干旱和洪水事件"""
    extreme_events = {'drought': [], 'flood': []}
    
    for station_id, station_data in df.groupby('station_id'):
        station_name = station_data['station_name'].iloc[0]
        
        # 识别干旱事件 - 连续14天以上低SPI或SPEI
        drought_mask = (station_data['simplified_spi_1month'] <= spi_threshold) | \
                      (station_data['simplified_spei_1month'] <= spei_threshold)
        
        if drought_mask.any():
            # 找到干旱天
            drought_days = station_data[drought_mask][['date', 'simplified_spi_1month', 'simplified_spei_1month']]
            
            # 确定干旱事件的开始和结束日期 (简化版)
            if len(drought_days) >= 14:
                start_date = drought_days['date'].min()
                end_date = drought_days['date'].max()
                duration = (end_date - start_date).days + 1
                
                # 计算事件严重程度
                min_spi = drought_days['simplified_spi_1month'].min()
                min_spei = drought_days['simplified_spei_1month'].min()
                severity = 'Severe' if (min_spi < -2.0 or min_spei < -2.0) else 'Moderate'
                
                extreme_events['drought'].append({
                    'station_id': station_id,
                    'station_name': station_name,
                    'start_date': start_date,
                    'end_date': end_date,
                    'duration_days': duration,
                    'min_spi': min_spi,
                    'min_spei': min_spei,
                    'severity': severity
                })
        
        # 识别洪水事件 - 单日降水量超过阈值
        flood_mask = station_data['precip_mm'] >= precip_threshold
        
        if flood_mask.any():
            flood_days = station_data[flood_mask][['date', 'precip_mm']]
            
            # 简化版: 将每一个超过阈值的降水日视为一次洪水事件
            for _, row in flood_days.iterrows():
                extreme_events['flood'].append({
                    'station_id': station_id,
                    'station_name': station_name,
                    'date': row['date'],
                    'precipitation_mm': row['precip_mm'],
                    'severity': 'Severe' if row['precip_mm'] >= 50 else 'Moderate'
                })
    
    return extreme_events

# 识别极端事件
extreme_events = identify_extreme_events(final_data)

# 查看识别到的极端事件
print("\n识别到的干旱事件:")
if extreme_events['drought']:
    for event in extreme_events['drought']:
        print(f"站点: {event['station_name']}, 开始: {event['start_date'].date()}, 结束: {event['end_date'].date()}, 持续: {event['duration_days']}天, 严重程度: {event['severity']}")
else:
    print("未识别到干旱事件")
    
print("\n识别到的洪水事件:")
if extreme_events['flood']:
    for event in extreme_events['flood'][:5]:  # 只显示前5个
        print(f"站点: {event['station_name']}, 日期: {event['date'].date()}, 降水量: {event['precipitation_mm']}mm, 严重程度: {event['severity']}")
    if len(extreme_events['flood']) > 5:
        print(f"... 以及{len(extreme_events['flood'])-5}个其他洪水事件")
else:
    print("未识别到洪水事件")

KeyError: 'simplified_spi_1month'

## 6. 保存处理后的数据

In [None]:
# 保存处理后的气象数据
output_file = os.path.join(output_dir, 'processed_weather_data.csv')
final_data.to_csv(output_file, index=False)
print(f"\n处理后的气象数据已保存至: {output_file}")

# 保存极端事件信息
if extreme_events['drought']:
    drought_df = pd.DataFrame(extreme_events['drought'])
    drought_file = os.path.join(output_dir, 'drought_events.csv')
    drought_df.to_csv(drought_file, index=False)
    print(f"干旱事件信息已保存至: {drought_file}")

if extreme_events['flood']:
    flood_df = pd.DataFrame(extreme_events['flood'])
    flood_file = os.path.join(output_dir, 'flood_events.csv')
    flood_df.to_csv(flood_file, index=False)
    print(f"洪水事件信息已保存至: {flood_file}")

## 7. 可视化结果

In [None]:
# 绘制降水和干旱指标时间序列
plt.figure(figsize=(12, 8))

# 为每个站点绘制单独的图形
for station_id, station_data in final_data.groupby('station_id'):
    plt.subplot(2, 1, 1 if station_id == 275 else 2)
    
    # 绘制月降水量
    plt.plot(station_data['date'], station_data['precip_30day_mm'], 'b-', label='30天累积降水(mm)')
    
    # 绘制干旱指标
    plt.plot(station_data['date'], station_data['simplified_spi_1month'], 'r-', label='简化SPI(1个月)')
    
    # 添加干旱和湿润阈值线
    plt.axhline(y=-1.5, color='r', linestyle='--', alpha=0.7)
    plt.axhline(y=1.5, color='b', linestyle='--', alpha=0.7)
    
    plt.title(f"{station_data['station_name'].iloc[0]} 站点降水和干旱指标")
    plt.ylabel('降水量(mm) / SPI值')
    plt.grid(True, alpha=0.3)
    plt.legend()

plt.tight_layout()
fig_file = os.path.join(output_dir, 'precipitation_drought_timeseries.png')
plt.savefig(fig_file, dpi=300)
plt.close()
print(f"\n降水和干旱指标时间序列图已保存至: {fig_file}")