In [1]:
import xarray as xr
import numpy as np
import pandas as pd
import glob
import os

In [2]:
arm = '/data/keeling/a/xx24/e/proj_ml/arm_data'
dir = '/data/keeling/a/xx24/e/proj_ml/code_ml_surfactant_ccn'
fp = '/data/keeling/a/xx24/e/proj_ml/code_ml_surfactant_ccn/data/'

In [3]:
site = 'MOS' # ASI BNF  COR  ENA  GUC  SGP

# CCN & CPC

In [17]:
# 1) 定义工作目录和文件列表
data_dir = f'{arm}/{site}/'
file_pattern = os.path.join(data_dir, f'{site.lower()}aosccn2colaavg*.nc')
files = sorted(glob.glob(file_pattern))

# 2) 指定我们关心的 supersaturation
targets = [0.1, 0.2, 0.4, 0.8, 1.0]

ccn_records = []
cpc_records = []

for fn in files:
    ds = xr.open_dataset(fn)
    try:
        ss_calc = ds['supersaturation_calculated'].values.astype(float)
        ss_rounded = np.round(ss_calc, 1)
        n_ccn = ds['N_CCN'].values
        n_cpc = ds['aerosol_number_concentration'].values
        times = pd.to_datetime(ds['time'].values)
        date = times[0].strftime('%Y-%m-%d')

        # 1. CCN 按 supersaturation 分组
        for tgt in targets:
            idx = np.where(ss_rounded == tgt)[0]
            if idx.size == 0:
                continue
            mean_ccn = np.nanmean(n_ccn[idx])
            ccn_records.append({
                'date': date,
                'supersaturation': tgt,
                'N_CCN': mean_ccn
            })
        
        # 2. CPC 直接每天一个均值
        mean_cpc = np.nanmean(n_cpc)
        cpc_records.append({
            'date': date,
            'N_CPC': mean_cpc
        })
    except Exception as e:
        print(f"Error in file {fn}: {e}")
    finally:
        ds.close()

# CCN pivot 表
df_ccn = pd.DataFrame(ccn_records)
df_ccn = df_ccn.groupby(['date', 'supersaturation'], as_index=False).mean()
df_ccn_pivot = df_ccn.pivot(index='date', columns='supersaturation', values='N_CCN')

# CPC 表
df_cpc = pd.DataFrame(cpc_records)
df_cpc = df_cpc.groupby('date', as_index=False).mean().set_index('date')

# 保存
df_ccn_pivot.to_csv(f'{fp}{site}_daily_N_CCN_by_supersaturation.csv')
df_cpc.to_csv(f'{fp}{site}_daily_N_CPC.csv')

# aerosol species

In [None]:
# 1) 定义工作目录及文件匹配
data_dir = f'{arm}/{site}/'
file_pattern = os.path.join(data_dir, f'{site.lower()}*acsm*.nc')
files = sorted(glob.glob(file_pattern))

# 2) 关注的 species 列表
base_species = [
    'total_organics',
    'sulfate',      
    'ammonium',  
    'nitrate',     
    'chloride'    
]

is_cdce = 'acsmcdce' in files[0]
# 动态决定变量名
species = [f"{s}_CDCE" if is_cdce else s for s in base_species]


records = []
for fn in files:
    ds = xr.open_dataset(fn)
    
    # 3) 提取日期：优先 time 维度第一个值，否则从文件名中解析 yyyymmdd 
    date = pd.to_datetime(ds['time'].values[0]).strftime('%Y-%m-%d')

    # 4) 对每个 species 过滤负值并计算平均
    rec = {'date': date}
    for sp in species:
        if sp not in ds:
            rec[sp] = np.nan
            continue
        data = ds[sp].values
        # 如果多维，以 time 维度 (假设为第一维) 为主 flatten
        flat = np.ravel(data)
        # 只保留非负值
        pos = flat[flat >= 0]
        rec[sp] = np.nan if pos.size == 0 else np.nanmean(pos)
    records.append(rec)
    ds.close()

# 5) 构造 DataFrame，并以日期排序
df_acsm = pd.DataFrame(records)
df_acsm['date'] = pd.to_datetime(df_acsm['date'])
df_acsm = df_acsm.set_index('date').sort_index()

# 6) 保存或查看
df_acsm.to_csv(f'{fp}{site}_daily_ACSM_species.csv')

# meteorology

In [None]:
# 1) 定义 MET 数据目录与文件模式
data_dir = f'{arm}/{site}/'
file_pattern = os.path.join(data_dir, f'{site.lower()}*met*')
files = sorted(glob.glob(file_pattern))

records = []
for fn in files:
    # 2) 打开数据集
    ds = xr.open_dataset(fn)
    
    # 3) 提取日期标签
    date = pd.to_datetime(ds['time'].values[0]).strftime('%Y-%m-%d')
    
    # 4) 读取 temp_mean 与 rh_mean，并计算日平均（跳过 NaN）
    rec = {'date': date}
    for var in ('temp_mean', 'rh_mean'):    
        data = ds[var].values
        flat = np.ravel(data)
        if var == 'rh_mean':
            flat = flat / 100
        if var == 'temp_mean':
            flat = flat + 273.15
        rec[var] = np.nanmean(flat)
    
    ds.close()
    records.append(rec)

# 5) 构造 DataFrame，并按日期排序
df_met = pd.DataFrame(records)
df_met['date'] = pd.to_datetime(df_met['date'])
df_met = df_met.set_index('date').sort_index()

# 6) 保存结果或查看
df_met.to_csv(f'{fp}{site}_daily_MET.csv')

  rec[var] = np.nanmean(flat)
  rec[var] = np.nanmean(flat)
  rec[var] = np.nanmean(flat)
  rec[var] = np.nanmean(flat)
  rec[var] = np.nanmean(flat)
  rec[var] = np.nanmean(flat)
  rec[var] = np.nanmean(flat)
  rec[var] = np.nanmean(flat)
  rec[var] = np.nanmean(flat)
  rec[var] = np.nanmean(flat)
  rec[var] = np.nanmean(flat)
  rec[var] = np.nanmean(flat)
  rec[var] = np.nanmean(flat)
  rec[var] = np.nanmean(flat)
  rec[var] = np.nanmean(flat)
  rec[var] = np.nanmean(flat)


# bulk diameter

In [8]:
# 1. 定义目录和文件列表
data_dir = f'{arm}/{site}/'
# 支持所有含smps或aps的文件，不区分大小写
smps_files = sorted(glob.glob(os.path.join(data_dir, '*[sS][mM][pP][sS]*')))
aps_files = sorted(glob.glob(os.path.join(data_dir, '*[aA][pP][sS]*')))

def get_var(ds, varnames):
    """依次尝试取变量名，返回第一个存在的变量"""
    for name in varnames:
        if name in ds.variables:
            return ds[name]
    raise KeyError(f"变量 {varnames} 在该文件中都找不到")

def merge_all_size_distributions(files, is_aps=False):
    """按bin结构分组，每组单独拼接返回，最终输出所有天的数据"""
    from collections import defaultdict
    bin_groups = defaultdict(list)
    for fn in files:
        ds = xr.open_dataset(fn)
        dNdlogDp = get_var(ds, ['merged_dN_dlogDp', 'dN_dlogDp'])
        if is_aps:
            Dp_nm = get_var(ds, ['diameter_aerodynamic'])
        else:
            Dp_nm = get_var(ds, ['merged_diameter_mobility', 'diameter_mobility'])
        times = pd.to_datetime(ds['time'].values)
        # 用bin长度作为分组依据
        bin_key = len(Dp_nm)
        bin_groups[bin_key].append((dNdlogDp.values, Dp_nm.values, times))
        ds.close()
    all_days = []
    # 每组bin独立拼接
    for group in bin_groups.values():
        arr_list, Dp_list, times_list = zip(*group)
        arr_cat = np.concatenate(arr_list, axis=0)
        Dp_cat  = Dp_list[0]
        times_cat = np.concatenate(times_list)
        df_daily = calc_bulk_diameter(times_cat, arr_cat, Dp_cat)
        all_days.append(df_daily)
    # 合并所有天（日均bulk diameter, 若有重复取均值）
    df_all = pd.concat(all_days).groupby('date', as_index=False).mean()
    return df_all

def merge_smps_aps(smps_files, aps_files):
    time_smps, arr_smps, Dp_smps = merge_all_size_distributions(smps_files, is_aps=False)
    time_aps,  arr_aps,  Dp_aps  = merge_all_size_distributions(aps_files, is_aps=True)
    # 时间对齐
    common_times = np.intersect1d(time_smps, time_aps)
    if len(common_times) == 0:
        raise ValueError("SMPS和APS没有共同时间，无法merge")
    idx_smps = np.where(np.isin(time_smps, common_times))[0]
    idx_aps  = np.where(np.isin(time_aps,  common_times))[0]
    arr_smps_sel = arr_smps[idx_smps]
    arr_aps_sel  = arr_aps[idx_aps]
    arr_merged = np.concatenate([arr_smps_sel, arr_aps_sel], axis=1)
    Dp_merged  = np.concatenate([Dp_smps, Dp_aps])
    return common_times, arr_merged, Dp_merged

def calc_bulk_diameter(times, arr, Dp_nm):
    Dp_um = Dp_nm / 1e3
    logDp = np.log10(Dp_um)
    dlog = np.empty_like(logDp)
    dlog[1:] = logDp[1:] - logDp[:-1]
    dlog[0] = dlog[1]
    Ni = arr * dlog
    Ntot = np.nansum(Ni, axis=1)
    weighted_sum = np.nansum(Ni * Dp_um, axis=1)
    valid = (Ntot > 0) & (~np.isnan(Ntot))
    D_bulk_t = np.full_like(weighted_sum, np.nan)
    D_bulk_t[valid] = weighted_sum[valid] / Ntot[valid]
    df = pd.DataFrame({'time': pd.to_datetime(times), 'D_bulk (μm)': D_bulk_t})
    df['date'] = df['time'].dt.strftime('%Y-%m-%d')
    df_daily = df.groupby('date')['D_bulk (μm)'].mean().reset_index()
    return df_daily

records = []

if len(aps_files) > 0:
    times, arr, Dp_merged = merge_smps_aps(smps_files, aps_files)
    df_daily = calc_bulk_diameter(times, arr, Dp_merged)
else:
    df_daily = merge_all_size_distributions(smps_files, is_aps=False)



df_daily = df_daily.sort_values('date').reset_index(drop=True)
df_daily.to_csv(f'{fp}{site}_daily_bulk_diameter.csv', index=False)
print(df_daily.head())

         date  D_bulk (μm)
0  2019-10-11     0.066771
1  2019-10-12     0.066085
2  2019-10-13     0.042541
3  2019-10-14     0.067755
4  2019-10-15     0.048528


# merge data

In [13]:
def merge_daily_data(set_point):
    sp_str = str(set_point)

    # 1) 读取各数据文件，各自只丢掉自己关心列的 NaN
    #    —— bulk、acsm、met、cpc 只有一列到几列，直接 dropna(how='any') 是可以的
    # acsm = (
    #     pd.read_csv(
    #         f'{dir}/data/{site}_daily_ACSM_species.csv',
    #         parse_dates=['date'], index_col='date'
    #     )
    #     .dropna(how='any')
    # )
    bulk = (
        pd.read_csv(
            f'{dir}/data/{site}_daily_bulk_diameter.csv',
            parse_dates=['date'], index_col='date'
        )
        .dropna(how='any')
    )
    met = (
        pd.read_csv(
            f'{dir}/data/{site}_daily_MET.csv',
            parse_dates=['date'], index_col='date'
        )
        .dropna(how='any')
    )
    cpc = (
        pd.read_csv(
            f'{dir}/data/{site}_daily_N_CPC.csv',
            parse_dates=['date'], index_col='date'
        )
        .dropna(how='any')
    )

    # 对 ccn 只选取我们关心的那一列，再 dropna
    raw_ccn = pd.read_csv(
        f'{dir}/data/{site}_daily_N_CCN_by_supersaturation.csv',
        parse_dates=['date'], index_col='date'
    )
    # 只对 sp_str 那一列做 dropna
    df_ccn_sp = (
        raw_ccn[[sp_str]]
        .rename(columns={sp_str: 'N_CCN'})
        .dropna(how='any')
    )

    # 2) 选择我们要的列名并重命名
    bulk_col = 'D_bulk (μm)'
    df_bulk_sp = bulk[[bulk_col]].rename(columns={bulk_col: 'bulk_diameter'})

    df_cpc_sp = cpc[['N_CPC']]  # 已经只有一列 “N_CPC”

    # 3) 依次 inner join —— 只保留所有表都有值的日期
    df = (
        met[['temp', 'rh']]
        .join(df_bulk_sp,    how='inner')
        .join(df_cpc_sp,     how='inner')
        # .join(acsm,          how='inner')
        .join(df_ccn_sp,     how='inner')
    )

    # 4) 最后再统一 drop 一次（以防 join 之后还有任意 NaN）
    df = df.dropna(how='any')

    # 5) 写出结果
    df = df.reset_index()
    outfile = f'{fp}{site}_daily_merged_setpoint_{sp_str}.csv'
    df.to_csv(outfile, index=False)

In [None]:
# [0.1, 0.2, 0.4, 0.8, 1.0]
if __name__ == '__main__':
    merge_daily_data(0.1)
    merge_daily_data(0.2)
    merge_daily_data(0.4)
    merge_daily_data(0.8)
    merge_daily_data(1.0)