In [1]:
import threddsclient
import xarray as xr
import pandas as pd
import netCDF4
from datetime import datetime, timedelta
import re
import numpy as np
import time as time_module  # 避免与变量名冲突
import cftime
from tqdm import tqdm

In [2]:
# Define your station codes
station_ids = [
    "FR0030R", "GR0004U", "CH0005R", "CH0010U", "DE0054R", "CZ0003R",
    "FR0020R", "FI0096G", "BE0017U", "BE0008U", "BE0012U", "CH0053R",
    "CV0001G", "FR0013R", "FR0035U", "FI0039U", "FR0041U", "ES0019U",
    "DE0043G", "IT0004R", "ES0025U", "BE0007R", "CY0002R", "NO0002R",
    "DE0007R", "DE0008R", "FR0027U", "AT0002R", "FR0018R", "IE0031R",
    "ES0021U", "FR0008R", "DE0044R"
]


In [3]:
# 用户指定的时间范围
user_start_date = datetime.strptime('20000101000000', '%Y%m%d%H%M%S')
user_end_date = datetime.strptime('20241001000000', '%Y%m%d%H%M%S')
# 获取所有 OPeNDAP URL
all_opendap_urls = threddsclient.opendap_urls('https://thredds.nilu.no/thredds/catalog/ebas/catalog.xml')


In [None]:
# filtered_urls = []
# def parse_ebas_filename(filename):
#     # 移除查询参数和扩展名
#     filename = filename.split('?')[0]
#     if filename.endswith('.nc'):
#         filename = filename[:-3]
#     parts = filename.split('.')
#     if len(parts) < 3:
#         return None, None, None
#     station_id = parts[0]
#     start_date_str = parts[1]
#     end_date_str = parts[2]
#     return station_id, start_date_str, end_date_str
# 
# # 筛选符合条件的URL
# for url in all_opendap_urls:
#     filename = url.split('/')[-1]
#     station_id, start_date_str, end_date_str = parse_ebas_filename(filename)
#     if not station_id or not start_date_str or not end_date_str:
#         continue
#     if station_id not in station_ids:
#         continue
#     try:
#         start_date = datetime.strptime(start_date_str, '%Y%m%d%H%M%S')
#         end_date = datetime.strptime(end_date_str, '%Y%m%d%H%M%S')
#     except ValueError:
#         continue
#     # 检查时间范围是否重叠
#     if end_date < user_start_date or start_date > user_end_date:
#         continue
#     filtered_urls.append(url)

In [4]:
filtered_urls = []
def parse_ebas_filename(filename):
    # 移除查询参数和扩展名
    filename = filename.split('?')[0]
    if filename.endswith('.nc'):
        filename = filename[:-3]
    parts = filename.split('.')
    if len(parts) < 3:
        return None, None, None
    station_id = parts[0]
    start_date_str = parts[1]
    end_date_str = parts[2]
    return station_id, start_date_str, end_date_str

# 筛选符合条件的URL
for url in all_opendap_urls:
    filename = url.split('/')[-1]
    station_id, start_date_str, end_date_str = parse_ebas_filename(filename)
    if not station_id or not start_date_str or not end_date_str:
        continue
    try:
        start_date = datetime.strptime(start_date_str, '%Y%m%d%H%M%S')
        end_date = datetime.strptime(end_date_str, '%Y%m%d%H%M%S')
    except ValueError:
        continue
    # 检查时间范围是否重叠
    if end_date < user_start_date or start_date > user_end_date:
        continue
    filtered_urls.append(url)

In [6]:
len(filtered_urls)

13169

In [None]:
isoprene_data_list = []
temperature_data_list = []
failed_urls = []

In [None]:

def open_dataset_with_retry(url, retries=5, delay=2):
    for attempt in range(retries):
        try:
            ds = netCDF4.Dataset(url)
            return ds
        except Exception as e:
            if attempt < retries - 1:
                print(f"访问数据集 {url} 失败，重试 {attempt + 1}/{retries} 次... 错误信息: {e}")
                time_module.sleep(delay)
            else:
                print(f"无法访问数据集 {url}，已跳过。错误信息: {e}")
                return None

# 遍历筛选后的URL，提取数据
for url in tqdm(filtered_urls, desc="Processing datasets"):
    try:
        ds = open_dataset_with_retry(url)
        if ds is None:
            failed_urls.append(url)  # 记录失败的URL
            continue
        variables = ds.variables
        isoprene_vars = [
            'isoprene_ng_per_m3_amean',
            'isoprene_ng_per_m3_amean_qc',
            'isoprene_ng_per_m3_precision'
        ]
        temperature_vars = ['temperature', 'temperature_qc']
        has_isoprene = all(var in variables for var in isoprene_vars)
        has_temperature = all(var in variables for var in temperature_vars)
        if not has_isoprene and not has_temperature:
            ds.close()
            continue
        # 获取元数据（调整为 netCDF4 的形式）
        matrix = getattr(ds, 'ebas_matrix', '')
        instrument = getattr(ds, 'ebas_instrument_type', '')
        station_name = getattr(ds, 'ebas_station_name', '')
        station_id = getattr(ds, 'ebas_station_code', '')
        longitude = getattr(ds, 'ebas_station_longitude', None)
        latitude = getattr(ds, 'ebas_station_latitude', None)
        time_resolution = getattr(ds, 'ebas_resolution_code', '')
        instrument_name = getattr(ds, 'ebas_instrument_name', '')
        monitoring_equipment = getattr(ds, 'ebas_instrument_type', '')
        # 处理时间变量
        time_var = ds.variables.get('time')
        if time_var is None:
            print(f"数据集 {url} 中不存在 'time' 变量，已跳过。")
            ds.close()
            failed_urls.append(url)
            continue
        try:
            time_units = getattr(time_var, 'units', '').strip()
            time_calendar = getattr(time_var, 'calendar', 'standard')
            if time_units:
                # 使用 netCDF4.num2date 解析时间
                time_values = netCDF4.num2date(
                    time_var[:],
                    units=time_units,
                    calendar=time_calendar,
                    only_use_cftime_datetimes=False,
                    only_use_python_datetimes=True
                )
                # 转换为 pandas datetime
                if isinstance(time_values[0], cftime.datetime):
                    time_values = pd.to_datetime([t.isoformat() for t in time_values])
                else:
                    time_values = pd.to_datetime(time_values)
            elif isinstance(time_var[0], (np.datetime64, datetime)):
                # 时间变量已经是 datetime64 类型，直接转换
                time_values = pd.to_datetime(time_var[:])
            elif hasattr(ds, 'time_coverage_start'):
                # 使用基准时间计算实际时间
                base_time = pd.to_datetime(getattr(ds, 'time_coverage_start'))
                time_values = [base_time + timedelta(seconds=float(t)) for t in time_var[:]]
                time_values = pd.to_datetime(time_values)
            else:
                # 无法解析时间
                print(f"数据集 {url} 的时间变量无法解析，已跳过。错误信息: 无效的时间单位，且无法获取基准时间。")
                ds.close()
                failed_urls.append(url)  # 记录失败的URL
                continue
        except Exception as e:
            print(f"数据集 {url} 的时间变量无法解析，已跳过。错误信息: {e}")
            ds.close()
            failed_urls.append(url)  # 记录失败的URL
            continue
        # 确保时间是一维的
        if np.array(time_values).ndim > 1:
            time_values = np.array(time_values).flatten()
        # 处理异戊二烯数据
        if has_isoprene:
            if matrix != 'air':
                ds.close()
                continue
            if instrument not in ['online_gc', 'steel_canister', 'ads_tube', 'PTR-MS']:
                ds.close()
                continue
            # 提取变量并确保是一维的
            isoprene = ds.variables['isoprene_ng_per_m3_amean'][:]
            isoprene_qc = ds.variables['isoprene_ng_per_m3_amean_qc'][:]
            isoprene_precision = ds.variables['isoprene_ng_per_m3_precision'][:]
            # 检查维度并展开为一维
            if isoprene.ndim > 1:
                isoprene = isoprene.flatten()
            if isoprene_qc.ndim > 1:
                isoprene_qc = isoprene_qc.flatten()
            if isoprene_precision.ndim > 1:
                isoprene_precision = isoprene_precision.flatten()
            # 确保长度一致
            min_length = min(len(time_values), len(isoprene), len(isoprene_qc), len(isoprene_precision))
            time_values_iso = time_values[:min_length]
            isoprene = isoprene[:min_length]
            isoprene_qc = isoprene_qc[:min_length]
            isoprene_precision = isoprene_precision[:min_length]
            df_iso = pd.DataFrame({
                'Date': time_values_iso,
                'isoprene_ng_per_m3_amean': isoprene,
                'isoprene_ng_per_m3_amean_qc': isoprene_qc,
                'isoprene_ng_per_m3_precision': isoprene_precision,
                'longitude': longitude,
                'latitude': latitude,
                'monitoring_equipment': monitoring_equipment,
                'time_resolution': time_resolution,
                'instrument_type': instrument,
                'instrument_name': instrument_name,
                'station_name': station_name,
                'station_id': station_id
            })
            isoprene_data_list.append(df_iso)
        # 处理温度数据
        if has_temperature:
            if matrix not in ['aerosol', 'air', 'instrument', 'met', 'pm1', 'pm10', 'pm2.5']:
                ds.close()
                continue
            # 提取变量并确保是一维的
            temperature = ds.variables['temperature'][:]
            temperature_qc = ds.variables['temperature_qc'][:]
            # 获取温度单位
            temp_units = getattr(ds.variables['temperature'], 'units', '').lower()
            # 检查维度并调整形状
            if temperature.ndim > 1:
                temperature = temperature.flatten()
            if temperature_qc.ndim > 1:
                temperature_qc = temperature_qc.flatten()
            # 如果温度是华氏度，转换为摄氏度
            if 'fahrenheit' in temp_units:
                temperature = (temperature - 32) * 5.0 / 9.0
                temp_units = 'deg c'
            if 'k' in temp_units:
                temperature = temperature[:] - 273.15
                temp_units = 'deg c'
                
            # 确保长度一致
            min_length = min(len(time_values), len(temperature), len(temperature_qc))
            time_values_temp = time_values[:min_length]
            temperature = temperature[:min_length]
            temperature_qc = temperature_qc[:min_length]
            df_temp = pd.DataFrame({
                'Date': time_values_temp,
                'temperature': temperature,
                'temperature_qc': temperature_qc,
                'temperature_units': temp_units,
                'longitude': longitude,
                'latitude': latitude,
                'monitoring_equipment': monitoring_equipment,
                'time_resolution': time_resolution,
                'instrument_type': instrument,
                'instrument_name': instrument_name,
                'station_name': station_name,
                'station_id': station_id
            })
            temperature_data_list.append(df_temp)
        ds.close()  # 关闭数据集
    except Exception as e:
        print(f"处理数据集 {url} 时出错: {e}")
        failed_urls.append(url)  # 记录失败的URL
        continue

# 将处理失败的URL写入txt文件
if failed_urls:
    with open('failed_urls.txt', 'w') as f:
        for url in failed_urls:
            f.write(url + '\n')
    print(f"共有 {len(failed_urls)} 个数据集处理失败，已写入 failed_urls.txt。")
else:
    print("所有数据集均处理成功。")

In [None]:
# 合并并保存异戊二烯数据
if isoprene_data_list:
    isoprene_df = pd.concat(isoprene_data_list, ignore_index=True)
    # 将 'Date' 转换为 datetime
    isoprene_df['Date'] = pd.to_datetime(isoprene_df['Date'])
    # 筛选时间范围内的数据
    isoprene_df = isoprene_df[(isoprene_df['Date'] >= user_start_date) & (isoprene_df['Date'] <= user_end_date)]
    # 保存为CSV文件
    isoprene_df.to_csv('isoprene_data.csv', index=False)
    # 保存为pkl文件
    isoprene_df.to_pickle('isoprene_data.pkl')

In [None]:
# 合并并保存温度数据
if temperature_data_list:
    temperature_df = pd.concat(temperature_data_list, ignore_index=True)
    # 将 'Date' 转换为 datetime
    temperature_df['Date'] = pd.to_datetime(temperature_df['Date'])
    # 筛选时间范围内的数据
    temperature_df = temperature_df[
        (temperature_df['Date'] >= user_start_date) & (temperature_df['Date'] <= user_end_date)]
    # 保存为CSV文件
    temperature_df.to_csv('temperature_data.csv', index=False)
    temperature_df.to_pickle('temperature_df.pkl')