In [2]:
import os
import math
import shutil
import numpy as np
import pandas as pd
import xarray as xr
from pathlib import Path
from tqdm.notebook import tqdm, trange
from concurrent.futures import ThreadPoolExecutor, as_completed

from multiprocessing import Pool
import multiprocessing as mp

In [3]:
raw_data_root_path = Path("/data6t/AIWP_TP_dataset/raw_data")
unzip_data_root_path = Path("/data6t/AIWP_TP_dataset/unzip_data")
station_data_root_path = Path("/data6t/AIWP_TP_dataset/station_data")
uniform_data_root_path = Path("/data6t/AIWP_TP_dataset/uniform_data")

GHCNd_unzip_path = unzip_data_root_path / "GHCNd"
GSOD_unzip_path = unzip_data_root_path / "GSOD"
GHCNh_unzip_path = unzip_data_root_path / "GHCNh"
ISD_unzip_path = unzip_data_root_path / "ISD"
IGLD_unzip_path = unzip_data_root_path / "IGLD"

GHCNd_station_data_path = station_data_root_path / "GHCNd"
GSOD_station_data_path = station_data_root_path / "GSOD"
GHCNh_station_data_path = station_data_root_path / "GHCNh"
ISD_station_data_path = station_data_root_path / "ISD"
IGLD_station_data_path = station_data_root_path / "IGLD"

GHCNd_uniform_data_path = uniform_data_root_path / "GHCNd"
GSOD_uniform_data_path = uniform_data_root_path / "GSOD"
GHCNh_uniform_data_path = uniform_data_root_path / "GHCNh"
ISD_uniform_data_path = uniform_data_root_path / "ISD"
IGLD_uniform_data_path = uniform_data_root_path / "IGLD"
uniform_hourly_fill_path = uniform_data_root_path / "hourly_fill"
uniform_hourly2daily_data_path = uniform_data_root_path / "hourly2daily"

Merge_data_path = Path("/data6t/AIWP_TP_dataset/merge_data")
Merge_daily_data_path = Merge_data_path / "daily_100"
# Merge_hourly_data_path = Merge_data_path / "hourly"
# Merge_hourly_to_daily_data_path = Merge_data_path / "hourly2daily"
# Merge_hourly_to_daily_timematch_data_path = Merge_data_path / "hourly2daily_timematch"
# Merge_daily_all_data_path = Merge_data_path / "daily_100"

QC_data_path = Path("/data6t/AIWP_TP_dataset/QC_data")

# Precipitation observation extraction

Extract and merge the precipitation observation of the same station from different years

## GHCNd

In [None]:
ELEMENT = "PRCP"

colspecs = [(0, 11), (12, 20), (21, 30), (31, 35), (36, 40), (41, 45)]
column_names = ["ID", "LATITUDE", "LONGITUDE", "ELEMENT", "FIRSTYEAR", "LASTYEAR"]


ghcn_stations = pd.read_fwf(raw_data_root_path / "ghcnd-inventory.txt", colspecs=colspecs, names=column_names)

filtered_stations = ghcn_stations[ghcn_stations["LASTYEAR"] >= 2021]
# filtered_stations = stations[stations["FIRSTYEAR"] <= 2024]
# filtered_stations = stations[stations["FIRSTYEAR"] <= 2020]
# filtered_stations = filtered_stations[filtered_stations["LASTYEAR"] >= 2020]
filtered_stations = filtered_stations[filtered_stations["ELEMENT"]==ELEMENT]
# filtered_stations.to_csv("GHCNd_PRCP_stations_after2020.csv", index=False)
filtered_stations

Unnamed: 0,ID,LATITUDE,LONGITUDE,ELEMENT,FIRSTYEAR,LASTYEAR
21,AE000041196,25.333,55.517,PRCP,1944,2025
25,AEM00041194,25.255,55.364,PRCP,1983,2025
29,AEM00041217,24.433,54.651,PRCP,1984,2025
33,AEM00041218,24.262,55.609,PRCP,1994,2025
42,AFM00040938,34.210,62.228,PRCP,2014,2021
...,...,...,...,...,...,...
768701,ZA000067743,-17.817,25.817,PRCP,1950,2022
768706,ZAM00067663,-14.450,28.467,PRCP,1973,2025
768718,ZI000067775,-17.917,31.133,PRCP,1956,2025
768759,ZI000067975,-20.067,30.867,PRCP,1951,2025


In [None]:
def extract_ghcnd_data(args):

    station, station_data, output_path = args
    filepath = output_path / f"{station}.csv"
    file_exists = filepath.exists()
    station_data.to_csv(filepath, mode='a', header=not file_exists, index=False)
    return 1

GHCNd_PRCP_stations_after2020 = filtered_stations["ID"].unique()

for year in trange(1990, 2027):
    data_path = GHCNd_unzip_path / f"{year}.csv"
    for chunk in pd.read_csv(
        data_path, header=None, names=["ID", "DATE", "ELEMENT", "VALUE", "M_FLAG", "Q_FLAG", "S_FLAG", "OBS_TIME"], 
        chunksize=1000000):

        chunk_prcp = chunk[chunk["ELEMENT"]=="PRCP"].drop(columns=["ELEMENT"]).rename(columns={"VALUE":"PRCP"})
        # valid_stations = np.intersect1d(GHCNd_PRCP_stations_after2020, chunk_prcp["ID"].unique())
        chunk_groups = chunk_prcp.groupby("ID")
        # 多进程写入多个站点数据
        args_list = [
            (station, station_data, GHCNd_station_data_path)
            for station, station_data in chunk_groups
        ]
        
        with Pool(processes=min(24, mp.cpu_count())) as pool:
            results = list(tqdm(
                pool.imap(extract_ghcnd_data, args_list),
                total=len(args_list),
                desc="Processing GHCNd files",
                leave=False
            ))

## GSOD

In [None]:
def extract_gsod_data(args):
    file_path, output_path = args
    station_data = pd.read_csv(file_path)[["STATION","DATE", "LATITUDE", "LONGITUDE", "ELEVATION", "PRCP", "PRCP_ATTRIBUTES", "FRSHTT"]]
    filepath = output_path / file_path.name
    file_exists = filepath.exists()
    station_data.to_csv(filepath, mode='a', header=not file_exists, index=False)
    return 1

for year in trange(1990, 2026):
    files_path_list = [f for f in (GSOD_unzip_path / f"{year}").iterdir() if f.is_file()]
    
    args_list = [(file_path, GSOD_station_data_path) for file_path in files_path_list]
    
    with Pool(processes=min(24, mp.cpu_count())) as pool:
        results = list(tqdm(
            pool.imap(extract_gsod_data, args_list),
            total=len(args_list),
            desc="Processing GSOD files"
        ))

## GHCNh

In [None]:
from QualityControl.preprocess.ISDGHCNH import ghcnh_pipeline

In [None]:
from functools import partial

for year in trange(2019, 2027):
    files_path_list = [f for f in (GHCNh_unzip_path / f"{year}").iterdir() if f.is_file()]
    
    func = partial(ghcnh_pipeline, output_dir=GHCNh_station_data_path, overwrite=False)
    
    with Pool(processes=min(24, mp.cpu_count())) as pool:
        results = list(tqdm(
            pool.imap(func, files_path_list),
            total=len(files_path_list),
            desc=f"Processing GHCNh files {year}"
        ))

        # pd.concat(results, axis=1).T

## ISD

In [None]:
from QualityControl.preprocess.ISDGHCNH import isd_pipeline

In [None]:
from functools import partial

for year in trange(2025, 2026):
    files_path_list = [f for f in (ISD_unzip_path / f"{year}").iterdir() if f.is_file()]
    
    func = partial(isd_pipeline, output_dir=ISD_station_data_path, overwrite=False)
    
    with Pool(processes=min(24, mp.cpu_count())) as pool:
        results = list(tqdm(
            pool.imap(func, files_path_list),
            total=len(files_path_list),
            desc=f"Processing ISD files {year}"
        ))

        # pd.concat(results, axis=1).T

## IGLD

In [None]:
def extract_igld_data(args):
    station, station_data, target_dir = args
    filepath = target_dir / f"IGLD_{station}.csv"
    file_exists = filepath.exists()
    station_data.to_csv(filepath, mode='a', header=not file_exists, index=False)
    return 1


In [None]:
continents_list = ['Europe', 'Antarctica', 'Africa', 'Oceania', 'NorthAmerica', 'SouthAmerica', 'Asia', 'other']
year_list = np.arange(2020, 2025)
decode_columns_dict = {
    'D_DATETIME': "DATE", 'V01301': "STATION", 
    "V04001": "year", "V04002": "month", "V04003": "day", "V04004": "hour",
    'V05001': "LATITUDE", 'V06001': "LONGITUDE", 'V07001': "ELEVATION", 
    'V13011': "TP1", 'V13020': "TP3", 'V13021': "TP6", 'V13022': "TP12", 
    'Q13011': "Q1", 'Q13020': "Q3", 'Q13021': "Q6", 'Q13022': "Q12"}

fpath_list = [file.name for file in IGLD_unzip_path.iterdir() if file.is_file() and file.suffix == ".txt"]

for continent in tqdm(continents_list[:4]):

    for year in tqdm(year_list, leave=False):
        
        fname_base = f"SURF_GLB_HOR_INTEG_PROD_{continent}_{year}"
        fname_list = [fname for fname in fpath_list if fname_base in fname]
        fname_list = sorted(fname_list)

        sep = "," if (continent == "Europe") and (year == 2024) else "\t"
        names = pd.read_table(IGLD_unzip_path / fname_list[0], nrows=2, sep=sep, dtype=str).columns

        station_year_data = []
        for i in trange(len(fname_list), leave=False):
            fname = fname_list[i]
            header=0 if i == 0 else None
            fpath = IGLD_unzip_path / fname
            
            chunk = pd.read_table(fpath, sep=sep, header=header, names=names, dtype=str)
            chunk = chunk.rename(columns=decode_columns_dict)

            # chunk["DATE"] = pd.to_datetime(chunk["DATE"], format="%d/%m/%Y %H:%M:%S")
            chunk["DATE"] = pd.to_datetime(chunk[["year", "month", "day", "hour"]])
            chunk = chunk[list(decode_columns_dict.values())].drop(columns=["year", "month", "day", "hour"])
            station_year_data.append(chunk)
        station_year_data = pd.concat(station_year_data, ignore_index=True)
        station_year_data["CONTINENT"] = continent
        station_year_data_groups = station_year_data.groupby("STATION")
        # valid_stations = station_year_data["STATION"].unique()
        
        # 多进程写入多个站点数据
        args_list = [
            (station, group_data, IGLD_uniform_data_path)
            for station, group_data in station_year_data_groups
        ]
        with Pool(processes=min(24, mp.cpu_count())) as pool:
            results = list(pool.imap(extract_igld_data, args_list))

## 提取站点经纬度

In [None]:
def extract_station_lat_lon_elev(fpath):
    station_data = pd.read_csv(fpath, nrows=2, dtype=str)
    id = station_data["STATION"].unique()[0]
    lat = station_data["LATITUDE"].unique()[0] 
    lon = station_data["LONGITUDE"].unique()[0]
    elevation = station_data["ELEVATION"].unique()[0]
    return {
        "ID": id,
        "LATITUDE": lat,
        "LONGITUDE": lon,
        "ELEVATION": elevation,
    }

In [None]:
fpath_list = [f for f in GSOD_station_data_path.iterdir() if f.is_file()]

with Pool(processes=min(24, mp.cpu_count())) as pool:
        results = list(tqdm(
            pool.imap(extract_station_lat_lon_elev, fpath_list),
            total=len(fpath_list),
            desc=f"extract stations info"
        ))
gsod_station_info = pd.DataFrame(results)
gsod_station_info["ID"] = gsod_station_info["ID"].apply(lambda x: "GSOD_" + x.zfill(11))
gsod_station_info["LONGITUDE"] = gsod_station_info["LONGITUDE"].astype(float)
gsod_station_info["LATITUDE"] = gsod_station_info["LATITUDE"].astype(float)
gsod_station_info["ELEVATION"] = gsod_station_info["ELEVATION"].astype(float)
gsod_station_info["LONGITUDE"] = gsod_station_info["LONGITUDE"].apply(lambda x: 360+x if x<0 else x)
gsod_station_info.to_csv(raw_data_root_path/"gsod_station_info.csv", index=False)
gsod_station_info

In [None]:
fpath_list = [f for f in IGLD_station_data_path.iterdir() if f.is_file()]

with Pool(processes=min(24, mp.cpu_count())) as pool:
        results = list(tqdm(
            pool.imap(extract_station_lat_lon_elev, fpath_list),
            total=len(fpath_list),
            desc=f"extract stations info"
        ))
igld_station_info = pd.DataFrame(results)

igld_station_info["ID"] = igld_station_info["ID"].apply(lambda x: "IGLD_" + x)
igld_station_info["LONGITUDE"] = igld_station_info["LONGITUDE"].astype(float)
igld_station_info["LATITUDE"] = igld_station_info["LATITUDE"].astype(float)
igld_station_info["ELEVATION"] = igld_station_info["ELEVATION"].astype(float)
igld_station_info["LONGITUDE"] = igld_station_info["LONGITUDE"].apply(lambda x: 360+x if x<0 else x)
igld_station_info.to_csv(raw_data_root_path/"igld_station_info.csv", index=False)
igld_station_info

extract stations info:   0%|          | 0/9656 [00:00<?, ?it/s]

Unnamed: 0,ID,LATITUDE,LONGITUDE,ELEVATION
0,IGLD_616300,15.6500,346.7500,17.0
1,IGLD_606600,25.2670,3.7330,556.0
2,IGLD_800950,7.9303,287.4908,306.0
3,IGLD_840450,0.2500,281.2670,3185.0
4,IGLD_074810,45.7260,5.0910,250.0
...,...,...,...,...
9651,IGLD_723830,34.7440,241.2760,1379.0
9652,IGLD_711310,49.4300,251.0200,1080.0
9653,IGLD_620190,31.2000,16.5830,14.0
9654,IGLD_634740,7.1330,40.0000,2480.0


# Data format unification

## GHCNd

In [None]:
def unify_ghcnd_file(args):

    file_path, output_path = args
    filepath = output_path / ("GHCNd_" + file_path.name)
    if filepath.exists():
        return 1

    try:
        station_data = pd.read_csv(file_path)[["ID", "DATE", "PRCP"]]
        station_data["ID"] = station_data["ID"].astype(str)
        station_data[station_data==-9999] = np.nan
        station_data[station_data==9999] = np.nan
        station_data[station_data==-99.99] = np.nan
        station_data[station_data==99.99] = np.nan
        station_data["PRCP"] = station_data["PRCP"] / 10 # 0.1mm to mm
        station_data["DATE"] = pd.to_datetime(station_data["DATE"], format="%Y%m%d")
    except:
        print(f"Error reading file: {file_path}")
    if station_data["DATE"].dt.year.max() < 2021: # filter stations with data until 2021
        return 1
    station_data["DATE"] = station_data["DATE"].dt.strftime("%Y-%m-%d")

    station_data.to_csv(filepath, mode='a', index=False)
    return 1

files_path_list = [f for f in GHCNd_station_data_path.iterdir() if f.is_file()]

args_list = [(file_path, GHCNd_uniform_data_path) for file_path in files_path_list]

with Pool(processes=min(24, mp.cpu_count())) as pool:
    results = list(tqdm(
        pool.imap(unify_ghcnd_file, args_list),
        total=len(args_list),
        desc="Processing GHCNd files"
    ))

Processing GHCNd files:   0%|          | 0/94294 [00:00<?, ?it/s]

In [None]:
ghcn_stations_info = pd.read_csv(raw_data_root_path / "ghcnd-stations.csv", 
        names=["ID", "LATITUDE", "LONGITUDE", "ELEVATION", "STATE", "NAME", 
                "GSN_FLAG", "HCN_CRN_FLAG", "WMO_ID", "unknown"], low_memory=False)
ghcn_stations_info["NAME"] = ghcn_stations_info["NAME"].str.strip()
ghcn_stations_info["ID"] = ghcn_stations_info["ID"].apply(lambda x: "GHCNd_"+x)
ghcn_stations_info.set_index("ID", inplace=True)

# ghcn_stations_info = ghcn_stations_info.loc[ghcn_stations["ID"],:]
ghcnd_id_list = [f.name[:-4] for f in GHCNd_uniform_data_path.iterdir() if f.is_file()]
ghcn_stations_info = ghcn_stations_info.loc[np.intersect1d(ghcnd_id_list, ghcn_stations_info.index.values), 
    ["LATITUDE", "LONGITUDE", "ELEVATION"]]
ghcn_stations_info

Unnamed: 0_level_0,LATITUDE,LONGITUDE,ELEVATION
ID,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
GHCNd_AE000041196,25.333,55.517,34.0
GHCNd_AEM00041194,25.255,55.364,10.4
GHCNd_AEM00041217,24.433,54.651,26.8
GHCNd_AEM00041218,24.262,55.609,264.9
GHCNd_AFM00040938,34.210,62.228,977.2
...,...,...,...
GHCNd_ZA000067743,-17.817,25.817,986.0
GHCNd_ZAM00067663,-14.450,28.467,1207.0
GHCNd_ZI000067775,-17.917,31.133,1480.0
GHCNd_ZI000067975,-20.067,30.867,1095.0


In [None]:
ELEMENT = "PRCP"

colspecs = [(0, 11), (12, 20), (21, 30), (31, 35), (36, 40), (41, 45)]
column_names = ["ID", "LATITUDE", "LONGITUDE", "ELEMENT", "FIRSTYEAR", "LASTYEAR"]

ghcnd_stations = pd.read_fwf(raw_data_root_path / "ghcnd-inventory.txt", colspecs=colspecs, names=column_names)
ghcnd_stations = ghcnd_stations[ghcnd_stations["LASTYEAR"] >= 2021]
ghcnd_stations = ghcnd_stations[ghcnd_stations["ELEMENT"]==ELEMENT]
ghcnd_stations["ID"] = ghcnd_stations["ID"].apply(lambda x: "GHCNd_"+x)
ghcnd_stations.set_index("ID", inplace=True)


ghcnd_id_list = [f.name[:-4] for f in GHCNd_uniform_data_path.iterdir() if f.is_file()]
ghcnd_stations = ghcnd_stations.loc[np.intersect1d(ghcnd_id_list, ghcnd_stations.index.values), 
    ["LATITUDE", "LONGITUDE"]]
ghcnd_stations

Unnamed: 0_level_0,LATITUDE,LONGITUDE
ID,Unnamed: 1_level_1,Unnamed: 2_level_1
GHCNd_AE000041196,25.333,55.517
GHCNd_AEM00041194,25.255,55.364
GHCNd_AEM00041217,24.433,54.651
GHCNd_AEM00041218,24.262,55.609
GHCNd_AFM00040938,34.210,62.228
...,...,...
GHCNd_ZA000067743,-17.817,25.817
GHCNd_ZAM00067663,-14.450,28.467
GHCNd_ZI000067775,-17.917,31.133
GHCNd_ZI000067975,-20.067,30.867


In [None]:
info_dict = []
for f in GHCNd_uniform_data_path.iterdir():
    station_id = f.name[:-4]
    if station_id in ghcn_stations_info.index:
        lat = ghcn_stations_info.loc[station_id, "LATITUDE"]
        lon = ghcn_stations_info.loc[station_id, "LONGITUDE"]
        elev = ghcn_stations_info.loc[station_id, "ELEVATION"]
    elif station_id in ghcnd_stations.index:
        lat = ghcnd_stations.loc[station_id, "LATITUDE"]
        lon = ghcnd_stations.loc[station_id, "LONGITUDE"]
        elev = np.nan
    else:
        continue
    info_dict.append({
        "ID": station_id,
        "LATITUDE": lat,
        "LONGITUDE": lon,
        "ELEVATION": elev,
    })
ghcnd_stations_info = pd.DataFrame(info_dict)
ghcnd_stations_info["LONGITUDE"] = ghcnd_stations_info["LONGITUDE"].apply(lambda x: 360+x if x<0 else x)
ghcnd_stations_info.to_csv(uniform_data_root_path/"ghcnd_station_info.csv", index=False)
ghcnd_stations_info

Unnamed: 0,ID,LATITUDE,LONGITUDE,ELEVATION
0,GHCNd_US1MSJC0035,30.4057,271.2334,20.1
1,GHCNd_LOE00105562,47.8667,18.1831,115.0
2,GHCNd_US1NMSC0063,34.0799,252.7839,2187.5
3,GHCNd_NLE00109274,52.7831,4.8000,0.0
4,GHCNd_US1COAU0059,37.2655,252.9154,2283.6
...,...,...,...,...
49713,GHCNd_US1MOCY0019,39.3820,265.4888,286.8
49714,GHCNd_FIE00145787,64.9305,28.7503,223.0
49715,GHCNd_US1TXCLL097,32.9909,263.3206,185.0
49716,GHCNd_CA1BC000010,49.4935,242.7124,595.9


## GSOD

In [None]:
from multiprocessing import Pool, Lock, Manager
import multiprocessing as mp

station_info_lock = None

def unify_gsod_file(args):
    file_path, output_path = args
    filepath = output_path / ("GSOD_" + file_path.name)
    if filepath.exists():
        return 1
    
    station_data = pd.read_csv(file_path)[["STATION", "DATE", "LATITUDE", "LONGITUDE", "ELEVATION", "PRCP", "PRCP_ATTRIBUTES", "FRSHTT"]]
    station_data["STATION"] = station_data["STATION"].astype(str)
    station_data[station_data==-99.99] = np.nan
    station_data[station_data==99.99] = np.nan
    station_data[station_data==999.9] = np.nan
    station_data[station_data==9999.9] = np.nan
    station_data["PRCP"] = station_data["PRCP"] * 25.4 # inch to mm
    station_data = station_data.rename(columns={"STATION":"ID"})

    if pd.to_datetime(station_data["DATE"]).dt.year.max() < 2021: # filter stations with data until 2021
        return 1
    
    # station_info = pd.DataFrame(
    #     {"ID": station_data["ID"].values[0], 
    #     "LATITUDE": station_data["LATITUDE"].values[0], 
    #     "LONGITUDE": station_data["LONGITUDE"].values[0], 
    #     "ELEVATION": station_data["ELEVATION"].values[0]},
    #     index=[station_data["ID"].values[0]])

    # with lock:
    #     station_info.to_csv(station_info_path, mode='a', index=False)
    
    station_data = station_data[["ID", "DATE", "PRCP"]]
    station_data.to_csv(filepath, mode='a', index=False)
    return 1


files_path_list = [f for f in GSOD_station_data_path.iterdir() if f.is_file()]
station_info_path = GSOD_uniform_data_path.parent / "gsod_station_info.csv"

lock = Lock()

args_list = [(file_path, GSOD_uniform_data_path) for file_path in files_path_list]

with Pool(processes=min(24, mp.cpu_count())) as pool:
    results = list(tqdm(
        pool.imap(unify_gsod_file, args_list),
        total=len(args_list),
        desc="Processing GSOD files"
    ))

In [None]:
gsod_station_info = pd.read_csv(raw_data_root_path/"gsod_station_info.csv")
gsod_station_info

Unnamed: 0,ID,LATITUDE,LONGITUDE,ELEVATION
0,GSOD_83887099999,-27.400000,308.800000,947.0
1,GSOD_13021099999,46.483000,15.117000,452.0
2,GSOD_10509099999,50.983000,6.900000,49.0
3,GSOD_99999900277,,,
4,GSOD_68342099999,-26.816667,26.016667,1500.0
...,...,...,...,...
21610,GSOD_28421099999,56.917000,55.600000,133.0
21611,GSOD_72282303749,39.645000,282.532000,563.0
21612,GSOD_72050100155,37.850000,283.117000,41.1
21613,GSOD_70277026438,60.129270,210.581510,4.1


In [None]:
mask_id = np.intersect1d(gsod_station_info["ID"].values, 
    [f.name[:-4] for f in GSOD_uniform_data_path.iterdir() if f.is_file()])
gsod_station_info = gsod_station_info.set_index("ID") .loc[mask_id, :].reset_index()
gsod_station_info.to_csv(uniform_data_root_path / "gsod_station_info.csv", index=False)
gsod_station_info

Unnamed: 0,ID,LATITUDE,LONGITUDE,ELEVATION
0,GSOD_01001099999,70.933333,351.333333,9.00
1,GSOD_01001499999,59.791925,5.340850,48.76
2,GSOD_01002099999,80.050000,16.250000,8.00
3,GSOD_01003099999,77.000000,15.500000,12.00
4,GSOD_01006099999,78.250000,22.816667,14.00
...,...,...,...,...
12871,GSOD_A0735500241,43.579000,269.087000,394.10
12872,GSOD_A0735700182,45.986000,264.008000,367.30
12873,GSOD_A0735900240,42.938000,274.939000,249.00
12874,GSOD_A5125500445,32.463830,272.045950,34.10


## GHCNh

In [3]:
from QualityControl.preprocess.ISDGHCNH import hourly_uniform_multiprocess

In [4]:
ghcnh_station_info = pd.read_csv(raw_data_root_path/"ghcnh-station-list.csv")
ghcnh_station_info["ID"] = ghcnh_station_info["GHCN_ID"].apply(lambda row: "GHCNh_" + row)
ghcnh_station_info["LONGITUDE"] = ghcnh_station_info["LONGITUDE"].apply(lambda x: 360+x if x<0 else x)
ghcnh_station_info

Unnamed: 0,GHCN_ID,LATITUDE,LONGITUDE,ELEVATION,STATE,NAME,GSN,(US)HCN_(US)CRN,WMO_ID,ICAO,ID
0,AAI0000TNCA,12.5014,289.9848,18.3,,REINA BEATRIX INTL,,,,,GHCNh_AAI0000TNCA
1,ACL000BARA9,17.5910,298.1790,5.0,TX,BARBUDA,,,,,GHCNh_ACL000BARA9
2,ACM00078861,17.1167,298.2167,10.0,,COOLIDGE FIELD ANTIGUA (AUX.,,,,,GHCNh_ACM00078861
3,ACU55-00189,18.6000,296.5300,10.0,,SOMBRERO,,,,,GHCNh_ACU55-00189
4,ACU55-00190,18.6000,296.5300,12.0,,SOMBRERO,,,,,GHCNh_ACU55-00190
...,...,...,...,...,...,...,...,...,...,...,...
34176,ZIM00067967,-21.3833,28.9833,770.0,,TULI,,,,,GHCNh_ZIM00067967
34177,ZIM00067974,-19.8333,30.7833,-999.9,,MAKOHOLI,,,,,GHCNh_ZIM00067974
34178,ZIM00067976,-20.5500,31.0833,700.0,,RUPIKE,,,,,GHCNh_ZIM00067976
34179,ZIM00067979,-20.3333,31.4667,770.0,,ZAKA,,,,,GHCNh_ZIM00067979


In [5]:
ghcnh_station_info_freq = hourly_uniform_multiprocess(GHCNh_station_data_path, uniform_hourly_fill_path, ghcnh_station_info)
ghcnh_station_info_freq.to_csv(uniform_data_root_path/"ghcnh_station_info.csv", index=None)
ghcnh_station_info_freq

Processing files: 100%|██████████| 15321/15321 [23:15<00:00, 10.98it/s]


Unnamed: 0,ID,LATITUDE,LONGITUDE,ELEVATION,FREQUENCY
0,GHCNh_AAI0000TNCA,12.5014,289.9848,18.3,3
1,GHCNh_ACW00011647,17.1333,298.2167,19.2,6
2,GHCNh_AEI0000OMAA,24.4330,54.6511,26.8,12
3,GHCNh_AEI0000OMAD,24.4283,54.4581,4.9,12
4,GHCNh_AEI0000OMAL,24.2617,55.6092,264.9,12
...,...,...,...,...,...
7850,GHCNh_ZIM00067899,-18.6167,31.5667,1380.0,24
7851,GHCNh_ZIM00067961,-20.9167,28.4500,1020.0,24
7852,GHCNh_ZIM00067963,-20.3833,28.5000,1350.0,24
7853,GHCNh_ZIM00067976,-20.5500,31.0833,700.0,24


## ISD

In [6]:
isd_station_info = pd.read_csv(raw_data_root_path/"isd-history.csv")
isd_station_info["ID"] = isd_station_info["USAF"].astype(str).str.zfill(6) + isd_station_info["WBAN"].astype(str).str.zfill(5)
isd_station_info["ID"] = isd_station_info["ID"].apply(lambda row: "ISD_" + row)
isd_station_info = isd_station_info.rename(columns={"LAT": "LATITUDE", "LON": "LONGITUDE", "ELEV(M)": "ELEVATION"})
isd_station_info["LONGITUDE"] = isd_station_info["LONGITUDE"].apply(lambda x: 360+x if x<0 else x)
isd_station_info

Unnamed: 0,USAF,WBAN,STATION NAME,CTRY,STATE,ICAO,LATITUDE,LONGITUDE,ELEVATION,BEGIN,END,ID
0,007018,99999,WXPOD 7018,,,,0.000,0.000,7018.0,20110309,20130730,ISD_00701899999
1,007026,99999,WXPOD 7026,AF,,,0.000,0.000,7026.0,20120713,20170822,ISD_00702699999
2,007070,99999,WXPOD 7070,AF,,,0.000,0.000,7070.0,20140923,20150926,ISD_00707099999
3,008260,99999,WXPOD8270,,,,0.000,0.000,0.0,20050101,20120731,ISD_00826099999
4,008268,99999,WXPOD8278,AF,,,32.950,65.567,1156.7,20100519,20120323,ISD_00826899999
...,...,...,...,...,...,...,...,...,...,...,...,...
29656,A07355,241,VIROQUA MUNICIPAL AIRPORT,US,WI,KY51,43.579,269.087,394.1,20140731,20250825,ISD_A0735500241
29657,A07357,182,ELBOW LAKE MUNICIPAL PRIDE OF THE PRAIRIE AIRPORT,US,MN,KY63,45.986,264.008,367.3,20140731,20250825,ISD_A0735700182
29658,A07359,240,IONIA COUNTY AIRPORT,US,MI,KY70,42.938,274.939,249.0,20140731,20250825,ISD_A0735900240
29659,A51255,445,DEMOPOLIS MUNICIPAL AIRPORT,US,AL,KDYA,32.464,272.046,34.1,20140731,20250826,ISD_A5125500445


In [None]:
isd_station_info_freq = hourly_uniform_multiprocess(ISD_station_data_path, uniform_hourly_fill_path, isd_station_info)
isd_station_info_freq.to_csv(uniform_data_root_path/"isd_station_info.csv", index=None)
isd_station_info_freq

Processing files:  14%|█▍        | 1619/11674 [05:33<22:16,  7.53it/s] 

## IGLD

In [None]:
igld_station_info = pd.read_csv(raw_data_root_path/"igld_station_info.csv")
igld_station_info["LONGITUDE"] = igld_station_info["LONGITUDE"].apply(lambda x: 360+x if x<0 else x)
igld_station_info

In [None]:
igld_station_info_freq = hourly_uniform_multiprocess(IGLD_station_data_path, uniform_hourly_fill_path, igld_station_info)
igld_station_info_freq.to_csv(uniform_data_root_path/"igld_station_info.csv", index=None)
igld_station_info_freq

# Overlapping station merging

## Infer the time shift of subdaily data with UTC

In [None]:
import warnings

warnings.filterwarnings("ignore", category=RuntimeWarning)

In [None]:
BENCHMARK_DIR = Path("/data6t/AIWP_TP_dataset/benchmark")
ERA5_daily_tp_path = BENCHMARK_DIR / "ERA5_tp_24h_rolling.zarr"

In [None]:
ERA5_daily_tp = xr.open_zarr(ERA5_daily_tp_path)
ERA5_daily_tp

Unnamed: 0,Array,Chunk
Bytes,169.60 GiB,1.87 MiB
Shape,"(43849, 721, 1440)","(744, 11, 60)"
Dask graph,93456 chunks in 2 graph layers,93456 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 169.60 GiB 1.87 MiB Shape (43849, 721, 1440) (744, 11, 60) Dask graph 93456 chunks in 2 graph layers Data type float32 numpy.ndarray",1440  721  43849,

Unnamed: 0,Array,Chunk
Bytes,169.60 GiB,1.87 MiB
Shape,"(43849, 721, 1440)","(744, 11, 60)"
Dask graph,93456 chunks in 2 graph layers,93456 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


In [None]:
isd_info = pd.read_csv(uniform_data_root_path/"isd_station_info_fill.csv")
igld_info =pd.read_csv(uniform_data_root_path/"igld_station_info_fill.csv")
ghcnh_info = pd.read_csv(uniform_data_root_path/"ghcnh_station_info_fill.csv")
hourly_station_info = pd.concat([isd_info, igld_info, ghcnh_info], axis=0)
hourly_station_info

Unnamed: 0,ID,LATITUDE,LONGITUDE,ELEVATION,FREQUENCY
0,ISD_01001099999,70.9330,351.3330,9.0,24
1,ISD_01003099999,77.0000,15.5000,12.0,6
2,ISD_01007099999,78.9170,11.9330,7.7,12
3,ISD_01008099999,78.2460,15.4660,26.8,1
4,ISD_01010099999,69.2930,16.1440,13.1,1
...,...,...,...,...,...
6043,GHCNh_ZAM00067751,-16.0000,27.6000,1018.0,24
6044,GHCNh_ZII0000FVFA,-18.0959,25.8390,1063.8,24
6045,GHCNh_ZII0000FVJN,-20.0213,28.6265,1330.8,24
6046,GHCNh_ZII0000FVRG,-17.9302,31.0943,1489.6,24


In [None]:
station_id = "GHCNh_AEI0000OMDB"
station_data = pd.read_csv(
        uniform_hourly_fill_path / f"{station_id}.csv",
        parse_dates=["DATE"],index_col="DATE",
        dtype={"ID": str}
    ).rename(columns={"DATE": "time"})["TP24"].values

In [None]:
def calc_hourly_station_time_shift_corr(args):
    station_info, ERA5_daily_tp_path = args
    # print(station_info)
    station_id = station_info["ID"]
    lat, lon = station_info["LATITUDE"], station_info["LONGITUDE"]
    station_qc_tp = pd.read_csv(
        uniform_hourly_fill_path / f"{station_id}.csv",
        parse_dates=["DATE"],index_col="DATE",
        dtype={"ID": str}
    ).rename(columns={"DATE": "time"})["TP24"]

    # station_qc_tp["time"] = station_qc_tp["time"] + np.timedelta64(24, 'h') 

    station_era5_tp = xr.open_zarr(ERA5_daily_tp_path)["tp"].sel(
        lat=lat, lon=lon, method="nearest").load()

    original_station_date = station_qc_tp.index
    original_era5_date = station_era5_tp.time.data
    
    station_corrs = {"station": station_id}
    for shift_hour in range(-48, 49): 
        shift_date = original_station_date + np.timedelta64(shift_hour, 'h')
        station_qc_tp.index = shift_date

        merge_date_range = np.intersect1d(shift_date, original_era5_date)

        station_qc_tp_sel = station_qc_tp.loc[merge_date_range]
        station_data = station_qc_tp_sel.values

        n_valid = (~np.isnan(station_data)).sum()
        if n_valid < 30:
            continue        

        era5_data = station_era5_tp.sel(time=merge_date_range).data

        valid_mask = ~np.isnan(station_data) & ~np.isnan(era5_data)

        correlation = np.corrcoef(station_data[valid_mask], era5_data[valid_mask])[0, 1]
        station_corrs[shift_hour] = correlation.item()
    return station_corrs
    #     print(f"Station {station_id.item()} Shift Hour {shift_hour} Correlation: {correlation:.4f} with {n_valid} valid samples.")
    #     # break
    #     # if correlation >= 0.7:
    #     #     print(f"Best shift hour for station {station.item()} is {shift_hour} with correlation {correlation:.4f}")
    # break

In [None]:
import multiprocessing as mp
from multiprocessing import Pool, Manager


args_list = [
    (station_info, ERA5_daily_tp_path) 
    for _, station_info in hourly_station_info.iterrows()
]

# 使用进程池处理
with Pool(min(mp.cpu_count(), 4)) as pool:
    results = list(tqdm(
        pool.imap_unordered(calc_hourly_station_time_shift_corr, args_list),
        total=len(hourly_station_info),
        desc="处理进度"
    ))

处理进度:   0%|          | 0/16411 [00:00<?, ?it/s]

In [None]:
pd.DataFrame(results).to_csv(f"Station_era5_shift_corr_hourly.csv", index=None)

In [None]:
pd.DataFrame(results)

Unnamed: 0,station,-48,-47,-46,-45,-44,-43,-42,-41,-40,...,39,40,41,42,43,44,45,46,47,48
0,ISD_01001099999,-0.043170,-0.051571,-0.056937,-0.064212,-0.072266,-0.079608,-0.085510,-0.089168,-0.087876,...,-0.020840,-0.009624,0.006008,0.022901,0.039545,0.055870,0.063899,0.069925,0.074489,0.073920
1,ISD_01010099999,0.128654,0.133690,0.136738,0.138484,0.138140,0.137035,0.134157,0.133898,0.134859,...,0.178945,0.173882,0.169322,0.167332,0.164956,0.163786,0.162510,0.161104,0.160686,0.161863
2,ISD_01007099999,-0.108290,-0.107687,-0.107054,-0.106460,-0.107167,-0.108479,-0.110992,-0.113812,-0.115585,...,0.037718,0.059399,0.078974,0.096654,0.111038,0.121025,0.127059,0.130578,0.132624,0.135144
3,ISD_01003099999,-0.092390,-0.095168,-0.097970,-0.102363,-0.106791,-0.110818,-0.113686,-0.117584,-0.123113,...,-0.103432,-0.101148,-0.098666,-0.095806,-0.093211,-0.090791,-0.090449,-0.091007,-0.090535,-0.088650
4,ISD_01008099999,-0.012396,-0.013407,-0.014619,-0.017861,-0.022598,-0.026898,-0.029618,-0.031319,-0.031772,...,-0.166965,-0.161363,-0.154512,-0.147581,-0.138127,-0.129328,-0.121507,-0.113160,-0.106735,-0.099481
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
16406,GHCNh_ZII0000FVFA,,,,,,,,,,...,,,,,,,,,,
16407,GHCNh_ZAM00067751,,,,,,,,,,...,,,,,,,,,,
16408,GHCNh_ZII0000FVJN,,,,,,,,,,...,,,,,,,,,,
16409,GHCNh_ZII0000FVRG,,,,,,,,,,...,,,,,,,,,,


In [None]:
reporting_times = pd.DataFrame(results).set_index('station').idxmax(axis=1).reset_index()
reporting_times.columns = ["ID", "best_shift_hour"]
reporting_times["reporting_time"] = -reporting_times["best_shift_hour"]
reporting_times.to_csv(f"hourly_station_era5_reporting_times.csv", index=None)
reporting_times

  reporting_times = pd.DataFrame(results).set_index('station').idxmax(axis=1).reset_index()


Unnamed: 0,ID,best_shift_hour,reporting_time
0,ISD_01001099999,-2,2
1,ISD_01010099999,-1,1
2,ISD_01007099999,48,-48
3,ISD_01003099999,48,-48
4,ISD_01008099999,-1,1
...,...,...,...
16406,GHCNh_ZII0000FVFA,,
16407,GHCNh_ZAM00067751,,
16408,GHCNh_ZII0000FVJN,,
16409,GHCNh_ZII0000FVRG,,


In [None]:
hourly_station_info = pd.merge(hourly_station_info, reporting_times, left_on="ID", right_on="ID")
hourly_station_info.to_csv(uniform_data_root_path/f"hourly_station_fill_info.csv", index=None)
hourly_station_info

Unnamed: 0,ID,LATITUDE,LONGITUDE,ELEVATION,FREQUENCY,best_shift_hour,reporting_time
0,ISD_01001099999,70.9330,351.3330,9.0,24,-2,2
1,ISD_01003099999,77.0000,15.5000,12.0,6,48,-48
2,ISD_01007099999,78.9170,11.9330,7.7,12,48,-48
3,ISD_01008099999,78.2460,15.4660,26.8,1,-1,1
4,ISD_01010099999,69.2930,16.1440,13.1,1,-1,1
...,...,...,...,...,...,...,...
16406,GHCNh_ZAM00067751,-16.0000,27.6000,1018.0,24,,
16407,GHCNh_ZII0000FVFA,-18.0959,25.8390,1063.8,24,,
16408,GHCNh_ZII0000FVJN,-20.0213,28.6265,1330.8,24,,
16409,GHCNh_ZII0000FVRG,-17.9302,31.0943,1489.6,24,,


## Convert subdaily data to daily data

In [13]:
hourly_station_info = pd.read_csv(uniform_data_root_path/"hourly_station_fill_info.csv")
hourly_station_info

Unnamed: 0,ID,LATITUDE,LONGITUDE,ELEVATION,FREQUENCY,best_shift_hour,reporting_time
0,ISD_01001099999,70.9330,351.3330,9.0,24,-2.0,2.0
1,ISD_01003099999,77.0000,15.5000,12.0,6,48.0,-48.0
2,ISD_01007099999,78.9170,11.9330,7.7,12,48.0,-48.0
3,ISD_01008099999,78.2460,15.4660,26.8,1,-1.0,1.0
4,ISD_01010099999,69.2930,16.1440,13.1,1,-1.0,1.0
...,...,...,...,...,...,...,...
16406,GHCNh_ZAM00067751,-16.0000,27.6000,1018.0,24,,
16407,GHCNh_ZII0000FVFA,-18.0959,25.8390,1063.8,24,,
16408,GHCNh_ZII0000FVJN,-20.0213,28.6265,1330.8,24,,
16409,GHCNh_ZII0000FVRG,-17.9302,31.0943,1489.6,24,,


In [None]:
def process_hourly_to_daily(args):
    id, freq, timeshift, target_dir = args
    source_dir = uniform_hourly_fill_path
    
    source_path = source_dir / (f"{id}.csv")
    target_path = target_dir / (f"{id}.csv")
    # if target_path.exists():
    #     print(f"{target_path} exists, skip.")
    #     return

    df = pd.read_csv(source_path, parse_dates=["DATE"])[
        ["DATE", "TP24"]].rename(columns={"TP24": "PRCP"})
    # df.set_index("DATE", inplace=True)
    df = df.dropna(subset=["PRCP"]).sort_values("DATE")
    df = df[~df.index.duplicated(keep="last")]
    if len(df) < 30:
        return None
    if timeshift == timeshift:
        df["DATE"] = df["DATE"] + pd.Timedelta(hours=timeshift)

    hour_counts = df["DATE"].dt.hour.value_counts()

    report_hour= hour_counts.idxmax().item()
    if freq==1 or report_hour==0:
        report_hour = 0
        df = df[df["DATE"].dt.hour == report_hour]
        df["DATE"] = df["DATE"] - pd.Timedelta(hours=24)
    elif report_hour < 2:
        report_hour = report_hour
        df = df[df["DATE"].dt.hour == report_hour]
        df["DATE"] = df["DATE"].dt.floor("d") - pd.Timedelta(hours=24)
    elif report_hour > 21:
        report_hour = report_hour
        df = df[df["DATE"].dt.hour == report_hour]
        df["DATE"] = df["DATE"].dt.floor("d")
    elif freq == 3:
        report_hour = report_hour % freq
        df = df[df["DATE"].dt.hour == report_hour]
        df["DATE"] = df["DATE"].dt.floor("d") - pd.Timedelta(hours=24)
    elif freq in [6, 12]:
        if report_hour <= (freq-1):
            report_hour = 24 - (freq-report_hour)
            df = df[df["DATE"].dt.hour == report_hour]
            df["DATE"] = df["DATE"].dt.floor("d")
        else:
            report_hour = report_hour % freq
            df = df[df["DATE"].dt.hour == report_hour]
            df["DATE"] = df["DATE"].dt.floor("d") - pd.Timedelta(hours=24)
    elif freq == 24:
        report_hour = report_hour
        if report_hour <= 12:
            df = df[df["DATE"].dt.hour == report_hour]
            df["DATE"] = df["DATE"].dt.floor("d") - pd.Timedelta(hours=24)
        else:
            df = df[df["DATE"].dt.hour == report_hour]
            df["DATE"] = df["DATE"].dt.floor("d")
    else:
        raise ValueError("Condition that are not considered")

    if len(df) < 30:
        return None
    # print(freq, hour_counts)
    df.to_csv(target_path, index=None)
    # print(f"Station {id} time shift: {timeshift}, report hour: {report_hour} with frequency {freq}")
    return {id: report_hour}

freqs = hourly_station_info.set_index("ID")['FREQUENCY'].to_dict()
timeshifts = hourly_station_info.set_index("ID")['best_shift_hour'].to_dict()

id_list = hourly_station_info["ID"].tolist()
args_list = [(id, freqs.get(id, None), timeshifts.get(id, None), uniform_hourly2daily_data_path) for id in id_list]

with Pool(processes=min(12, mp.cpu_count())) as pool:
    results = list(tqdm(
        pool.imap(process_hourly_to_daily, args_list),
        total=len(args_list),
        desc="Processing hourly data to daily data"
    ))

In [None]:
station_reporting_hour = {}
for res in results:
    if res is not None:
        # print(list(res.keys())[0])
        station_reporting_hour[list(res.keys())[0]] = list(res.values())[0]
station_reporting_hour = pd.Series(station_reporting_hour).reset_index()
station_reporting_hour.columns = ["ID", "REPORTING_HOUR_UTC"]
hourly_station_info = pd.merge(hourly_station_info.set_index("ID"), station_reporting_hour.set_index("ID"), left_index=True, right_index=True)
hourly_station_info.to_csv(uniform_data_root_path / "Hourly2daily_station_info_fill.csv")
hourly_station_info

Unnamed: 0_level_0,LATITUDE,LONGITUDE,ELEVATION,FREQUENCY,best_shift_hour,reporting_time,REPORTING_HOUR_UTC
ID,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
ISD_01001099999,70.9330,351.3330,9.0,24,-2.0,2.0,4
ISD_01007099999,78.9170,11.9330,7.7,12,48.0,-48.0,18
ISD_01025099999,69.6830,18.9190,9.4,12,48.0,-48.0,18
ISD_01028099999,74.5170,19.0170,18.0,12,-1.0,1.0,17
ISD_01044099999,70.4870,22.1400,6.4,24,,,6
...,...,...,...,...,...,...,...
GHCNh_ZAM00067751,-16.0000,27.6000,1018.0,24,,,6
GHCNh_ZII0000FVFA,-18.0959,25.8390,1063.8,24,,,6
GHCNh_ZII0000FVJN,-20.0213,28.6265,1330.8,24,,,6
GHCNh_ZII0000FVRG,-17.9302,31.0943,1489.6,24,,,6


## Merge overlapping stations

In [None]:
from QualityControl.utils import haversine_distance

In [None]:
hourly_station_info = pd.read_csv(uniform_data_root_path / "Hourly2daily_station_info_fill.csv")
hourly_station_info

Unnamed: 0,ID,LATITUDE,LONGITUDE,ELEVATION,FREQUENCY,best_shift_hour,reporting_time,REPORTING_HOUR_UTC
0,ISD_01001099999,70.9330,351.3330,9.0,24,-2.0,2.0,4
1,ISD_01007099999,78.9170,11.9330,7.7,12,48.0,-48.0,18
2,ISD_01025099999,69.6830,18.9190,9.4,12,48.0,-48.0,18
3,ISD_01028099999,74.5170,19.0170,18.0,12,-1.0,1.0,17
4,ISD_01044099999,70.4870,22.1400,6.4,24,,,6
...,...,...,...,...,...,...,...,...
7227,GHCNh_ZAM00067751,-16.0000,27.6000,1018.0,24,,,6
7228,GHCNh_ZII0000FVFA,-18.0959,25.8390,1063.8,24,,,6
7229,GHCNh_ZII0000FVJN,-20.0213,28.6265,1330.8,24,,,6
7230,GHCNh_ZII0000FVRG,-17.9302,31.0943,1489.6,24,,,6


In [None]:
gsod_station_info = pd.read_csv(uniform_data_root_path / "gsod_station_info.csv")
ghcnd_station_info = pd.read_csv(uniform_data_root_path / "ghcnd_station_info.csv")

daily_station_info = pd.concat([ghcnd_station_info, gsod_station_info], ignore_index=True)
daily_station_info["FREQUENCY"] = 24
daily_station_info

Unnamed: 0,ID,LATITUDE,LONGITUDE,ELEVATION,FREQUENCY
0,GHCNd_US1MSJC0035,30.40570,271.23340,20.1,24
1,GHCNd_LOE00105562,47.86670,18.18310,115.0,24
2,GHCNd_US1NMSC0063,34.07990,252.78390,2187.5,24
3,GHCNd_NLE00109274,52.78310,4.80000,0.0,24
4,GHCNd_US1COAU0059,37.26550,252.91540,2283.6,24
...,...,...,...,...,...
62588,GSOD_A0735500241,43.57900,269.08700,394.1,24
62589,GSOD_A0735700182,45.98600,264.00800,367.3,24
62590,GSOD_A0735900240,42.93800,274.93900,249.0,24
62591,GSOD_A5125500445,32.46383,272.04595,34.1,24


In [None]:
station_info = pd.concat([hourly_station_info, daily_station_info], ignore_index=True)
station_info

Unnamed: 0,ID,LATITUDE,LONGITUDE,ELEVATION,FREQUENCY,best_shift_hour,reporting_time,REPORTING_HOUR_UTC
0,ISD_01001099999,70.93300,351.33300,9.0,24,-2.0,2.0,4.0
1,ISD_01007099999,78.91700,11.93300,7.7,12,48.0,-48.0,18.0
2,ISD_01025099999,69.68300,18.91900,9.4,12,48.0,-48.0,18.0
3,ISD_01028099999,74.51700,19.01700,18.0,12,-1.0,1.0,17.0
4,ISD_01044099999,70.48700,22.14000,6.4,24,,,6.0
...,...,...,...,...,...,...,...,...
69820,GSOD_A0735500241,43.57900,269.08700,394.1,24,,,
69821,GSOD_A0735700182,45.98600,264.00800,367.3,24,,,
69822,GSOD_A0735900240,42.93800,274.93900,249.0,24,,,
69823,GSOD_A5125500445,32.46383,272.04595,34.1,24,,,


In [None]:
from multiprocessing import Pool, Manager
import multiprocessing as mp


def duplicate_station_merge(args):

    select_station_info, station_info, target_dir = args
    if not target_dir.exists():
        target_dir.mkdir(parents=True, exist_ok=True)
    source_dir_dict = {"ISD": uniform_hourly2daily_data_path, 
                       "GHCNh": uniform_hourly2daily_data_path, 
                       "IGLD": uniform_hourly2daily_data_path,
                       "GHCNd": GHCNd_uniform_data_path,
                       "GSOD": GSOD_uniform_data_path}
    source_sort_dict = {"ISD": 4, "GHCNh": 3, "IGLD": 5, "GHCNd": 1, "GSOD": 2}
    id = select_station_info["ID"]
    source = id.split("_")[0]
    source_dir = source_dir_dict[source]
    source_sort = source_sort_dict[source]
    lat, lon = select_station_info["LATITUDE"], select_station_info["LONGITUDE"]
    # lat, lon = round(lat, 4), round(lon, 4)
    freq = select_station_info['FREQUENCY']

    station_info = station_info[station_info["ID"] != id]

    distances = station_info.apply(
        lambda row: haversine_distance(row["LATITUDE"], row["LONGITUDE"], lat, lon),  
        axis=1)
    min_distance = distances.min()
    
    # distance_threshold = 10*1e-3
    distance_threshold = 100*1e-3

    # nearby_stations = []
    
    remove_flag = False
    try:

        if min_distance > 25:
            remove_flag = False
        else:
            data = pd.read_csv(source_dir / f"{id}.csv", parse_dates=["DATE"]).rename(columns={"TP24_best": "PRCP"})
            # data["DATE"] = pd.to_datetime(data["DATE"], format="%Y-%m-%d")
            data = data[data["DATE"] <= pd.to_datetime("2024-12-31")]
            data = data[data["DATE"] >= pd.to_datetime("2020-01-01")]

            data.set_index("DATE", inplace=True)
            data = data.sort_index()
            data = data[~data.index.duplicated(keep="last")]


            nearby_stations_df = station_info[distances <= distance_threshold]
            if (not remove_flag) and (len(nearby_stations_df) >= 1):
                for _, row in nearby_stations_df.iterrows():

                    # near_freq = row['FREQUENCY']
                    # if freq < near_freq:  
                    #     # remove_flag = False
                    #     # nearby_stations.append(row['ID'])
                    #     continue
                    near_source = row['ID'].split("_")[0]
                    near_source_dir = source_dir_dict[near_source]
                    near_source_sort = source_sort_dict[near_source]

                    near_data = pd.read_csv(
                        near_source_dir / f"{row['ID']}.csv", parse_dates=["DATE"]).rename(columns={"TP24_best": "PRCP"})
                    # near_data["DATE"] = pd.to_datetime(near_data["DATE"], format="%Y-%m-%d")
                    near_data = near_data[near_data["DATE"] <= pd.to_datetime("2024-12-31 23:00:00")]
                    near_data = near_data[near_data["DATE"] >= pd.to_datetime("2020-01-01 00:00:00")]

                    near_data.set_index("DATE", inplace=True)
                    near_data = near_data.sort_index()
                    near_data = near_data[~near_data.index.duplicated(keep="last")]

                    if len(data.dropna(subset=["PRCP"])) > len(near_data.dropna(subset=["PRCP"])):
                        pass
                    elif len(data.dropna(subset=["PRCP"])) < len(near_data.dropna(subset=["PRCP"])):
                        remove_flag = True
                        break
                    else:
                        if source_sort < near_source_sort:  
                            # nearby_stations.append(row['ID'])
                            remove_flag = True
                            break

            # 10*1e-3 < min_distance <= 25: 
            nearby_stations_df = station_info[(distances <= 25) & (distances > distance_threshold)]
            if (not remove_flag) and (len(nearby_stations_df) >= 1):
                for _, row in nearby_stations_df.iterrows():
                    near_source = row['ID'].split("_")[0]
                    near_source_dir = source_dir_dict[near_source]
                    near_source_sort = source_sort_dict[near_source]
                    near_data = pd.read_csv(
                        near_source_dir / f"{row['ID']}.csv", parse_dates=["DATE"]).rename(columns={"TP24_best": "PRCP"})
                    # near_data["DATE"] = pd.to_datetime(near_data["DATE"], format="%Y-%m-%d")
                    near_data = near_data[near_data["DATE"] <= pd.to_datetime("2024-12-31 23:00:00")]
                    near_data = near_data[near_data["DATE"] >= pd.to_datetime("2020-01-01 00:00:00")]
                    near_data.set_index("DATE", inplace=True)
                    near_data = near_data.sort_index()
                    near_data = near_data[~near_data.index.duplicated(keep="last")]   

                    merge_data = pd.concat([
                        data[["PRCP"]],
                        near_data[["PRCP"]].rename(columns={"PRCP": "NEAR_PRCP"})
                    ], axis=1).dropna()
                    merge_data_neg1 = pd.concat([
                        data[["PRCP"]].shift(-1),
                        near_data[["PRCP"]].rename(columns={"PRCP": "NEAR_PRCP"})
                    ], axis=1).dropna()
                    merge_data_pos1 = pd.concat([
                        data[["PRCP"]].shift(1),
                        near_data[["PRCP"]].rename(columns={"PRCP": "NEAR_PRCP"})
                    ], axis=1).dropna()

                    if len(merge_data) > 30:
                        mae = np.abs(merge_data["PRCP"] - merge_data["NEAR_PRCP"]).mean()
                        cc = merge_data.corr().iloc[0, 1] # shift days of -1, 0, 1
                        cc_shift_neg1 = merge_data_neg1.corr().iloc[0, 1]
                        cc_shift_pos1 = merge_data_pos1.corr().iloc[0, 1]
                        cc = max(cc, cc_shift_neg1, cc_shift_pos1)

                        if mae < 0.1 and cc > 0.9999:
                            near_freq = row['FREQUENCY']
                            if freq < near_freq: 
                                # nearby_stations.append(row['ID'])
                                # remove_flag = False
                                continue
                            if len(data.dropna(subset=["PRCP"])) > len(near_data.dropna(subset=["PRCP"])):
                                # remove_flag = False
                                pass
                            elif len(data.dropna(subset=["PRCP"])) < len(near_data.dropna(subset=["PRCP"])):
                                remove_flag = True
                                break
                            else:
                                if source_sort < near_source_sort: 
                                    # nearby_stations.append(row['ID'])
                                    remove_flag = True
                                    break


    except Exception as e:
        # print(data[["TP24_best"]].rename(columns={"TP24_best": "PRCP"}).shift(-1),
        #             near_data[["TP24_best"]].rename(columns={"TP24_best": "NEAR_PRCP"}))
        print(f"Error processing {id}: {e}")
    if remove_flag:
        local_drop_id = id #(id, nearby_stations)
    else:
        shutil.copyfile(
            source_dir / f"{id}.csv",
            target_dir / f"{id}.csv"
        )
        local_drop_id = None
    return local_drop_id


# args_list = [(row, station_info, Merge_daily_all_data_path) for _, row in station_info.iterrows()]
args_list = [(row, station_info, Merge_data_path / "daily_100") for _, row in station_info.iterrows()]

with Pool(processes=min(12, mp.cpu_count())) as pool:
    results = list(tqdm(
        pool.imap(duplicate_station_merge, args_list),
        total=len(args_list),
        desc="Processing hourly stations"
    ))
# drop_station_id_dict = {a[0]: a[1] for a in results if a is not None}
# drop_station_id = list(drop_station_id_dict.keys())
# len(drop_station_id)
drop_station_id = [a for a in results if a is not None]
print(len(drop_station_id))

station_reserved_info = station_info.set_index("ID").drop(index=drop_station_id)
station_reserved_info.to_csv(Merge_data_path / f"daily_100_station_info.csv")

Processing hourly stations:   0%|          | 0/69825 [00:00<?, ?it/s]

11032


# Get station information

In [None]:
ghcn_stations_info = pd.read_csv(raw_data_root_path / "ghcnd-stations.csv", 
        names=["ID", "LATITUDE", "LONGITUDE", "ELEVATION", "STATE", "NAME", 
                "GSN_FLAG", "HCN_CRN_FLAG", "WMO_ID", "unknown"], low_memory=False)
ghcn_stations_info["NAME"] = ghcn_stations_info["NAME"].str.strip()
ghcn_stations_info.set_index("ID", inplace=True)
# ghcn_stations_info = ghcn_stations_info.loc[ghcn_stations["ID"],:]
ghcn_stations_info

Unnamed: 0_level_0,LATITUDE,LONGITUDE,ELEVATION,STATE,NAME,GSN_FLAG,HCN_CRN_FLAG,WMO_ID,unknown
ID,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
ACW00011604,17.1167,-61.7833,10.1,,ST JOHNS COOLIDGE FLD,,,,
ACW00011647,17.1333,-61.7833,19.2,,ST JOHNS,,,,
AE000041196,25.3330,55.5170,34.0,,SHARJAH INTER. AIRP,GSN,,41196,
AEM00041194,25.2550,55.3640,10.4,,DUBAI INTL,,,41194,
AEM00041217,24.4330,54.6510,26.8,,ABU DHABI INTL,,,41217,
...,...,...,...,...,...,...,...,...,...
ZI000067969,-21.0500,29.3670,861.0,,WEST NICHOLSON,,,67969,
ZI000067975,-20.0670,30.8670,1095.0,,MASVINGO,,,67975,
ZI000067977,-21.0170,31.5830,430.0,,BUFFALO RANGE,,,67977,
ZI000067983,-20.2000,32.6160,1132.0,,CHIPINGE,GSN,,67983,


In [None]:
ghcnd_station_select = pd.read_csv(uniform_data_root_path / "ghcnd_station_info.csv", dtype={"ID": str}).dropna(subset=["LATITUDE", "LONGITUDE"])

gsod_station_select = pd.read_csv(uniform_data_root_path / "gsod_station_info.csv", dtype={"ID": str}).dropna(subset=["LATITUDE", "LONGITUDE"])
gsod_station_select["ID"] = gsod_station_select["ID"].apply(lambda x: x.zfill(11))
gsod_station_select

Unnamed: 0,ID,LATITUDE,LONGITUDE,ELEVATION
0,GSOD_01001099999,70.933333,351.333333,9.00
1,GSOD_01001499999,59.791925,5.340850,48.76
2,GSOD_01002099999,80.050000,16.250000,8.00
3,GSOD_01003099999,77.000000,15.500000,12.00
4,GSOD_01006099999,78.250000,22.816667,14.00
...,...,...,...,...
12871,GSOD_A0735500241,43.579000,269.087000,394.10
12872,GSOD_A0735700182,45.986000,264.008000,367.30
12873,GSOD_A0735900240,42.938000,274.939000,249.00
12874,GSOD_A5125500445,32.463830,272.045950,34.10


In [None]:
id_list = [f.name[:-4] for f in Merge_daily_data_path.iterdir() if f.is_file()]
id_info_dict = []
for id in tqdm(id_list):
    if id in ghcn_stations_info.index:
        row_info = ghcn_stations_info.loc[id]
        lon, lat, evaluation = row_info["LONGITUDE"], row_info["LATITUDE"], row_info["ELEVATION"]
        source = "GHCNd"
    elif id in gsod_station_select["ID"].values:
        row_info = gsod_station_select[gsod_station_select["ID"]==id].iloc[0]
        lon, lat, evaluation = row_info["LONGITUDE"], row_info["LATITUDE"], row_info["ELEVATION"]
        source = "GSOD"
    elif id in ghcnd_station_select["ID"].values:
        row_info = ghcnd_station_select[ghcnd_station_select["ID"]==id].iloc[0]
        lon, lat = row_info["LONGITUDE"], row_info["LATITUDE"]
        evaluation = np.nan
        source = "GHCNd"
    else:
        raise ValueError(f"Station ID {id} not found in station info dataframes.")
    id_dict = {"ID": id, "LONGITUDE": lon, "LATITUDE": lat, "ELEVATION": evaluation, "SOURCE": source}
    id_info_dict.append(id_dict)
id_info_df = pd.DataFrame(id_info_dict)
id_info_df

In [None]:
id_info_df.to_csv(Merge_data_path / "Daily_station_info.csv", index=False)