In [None]:
from pathlib import Path
import pandas as pd
import xarray as xr
import numpy as np
import time
import hashlib

In [None]:
file_list = list(Path('Archive').rglob('*.csv'))
file_list.sort()

### 检查文件数
判断缺失的时间

In [None]:
systime = time.localtime(time.time())
nowDate = time.strftime("%Y%m%d %H", systime)

should = pd.date_range('2022-1-1', nowDate, freq='H')
exist = pd.to_datetime([f.stem for f in file_list])
print(set(should) - set(exist))

### 合并数据

In [None]:
START = 0
if Path('cnemc.h5').exists():
    ds_archive = xr.open_dataset('cnemc.h5')
    START = len(ds_archive['timepoint'])
    print(START)

In [None]:
d_list = []  # duplicated_list
dfs = []

for f in file_list[START:]:
    df = pd.read_csv(f, parse_dates=['timepoint'])
    df.drop_duplicates(inplace=True)
    if df.duplicated(subset='stationcode').sum() > 0:
        d_list.append(f)
    else:
        dfs.append(df)

### 处理有重复的数据
检查存在重复但其中一条数据某些字段存在缺失造成不一致的情况，此时手动删除缺失行再重新合并

In [None]:
for d in d_list[:]:
    print(d)
    df = pd.read_csv(d, parse_dates=['timepoint'])
    df.drop_duplicates(inplace=True)
    print(df[df.duplicated(subset='stationcode', keep=False)].T)

### 将拼接后的表转为 xarray 数据

In [None]:
# 行拼接不要直接df.append，而要用这里的pd.concat(dfs)来提速
df_all = pd.concat(dfs)

In [None]:
def convert_xarray(df):
    df = df.set_index(['timepoint', 'stationcode'])
    ds = df.to_xarray()
    return ds

In [None]:
ds = convert_xarray(df_all)

for var in ['longitude', 'latitude', 'area', 'positionname']:
    var_unique = []

    for i in range(len(ds['stationcode'])):
        # 将该站点所有观测转为列表
        tmp = list(ds[var][:, i].values)
        # 过滤掉列表中的nan
        val = list(filter(lambda x: (type(x) == str) or (~np.isnan(x)), tmp))
        # 当该站点没有冲突值的时候进行记录
        if len(np.unique(val)) != 1:
            print(tmp)
        else:
            var_unique.append(np.unique(val)[0])

    # 替换掉原有数据的值
    ds[var] = ds[var].isel(timepoint=0)
    ds[var][:] = var_unique

ds

### 导出文件

In [None]:
ds['co'] = ds['co']*1000
ds['co_24h'] = ds['co_24h']*1000

ds['latitude'].attrs = {'units': 'degree_north'}
ds['longitude'].attrs = {'units': 'degree_east'}
ds.attrs = {'units': 'All are µg/m3 except AQI which is unitless'}

ds = ds.set_coords(['longitude', 'latitude',
                    'area', 'positionname', 'primarypollutant'])

In [None]:
# 选取需要保存的的变量
selc_vars = list(ds.keys())[:]

# 构造需要进行压缩的词典
enco_vars = {}
for d in selc_vars:
    enco_vars[d] = {"zlib": True,
                    "complevel": 9,
                    "dtype": "uint16",
                    '_FillValue': 65535
                    }

if START != 0:
    ds = ds.merge(ds_archive)
    Path('cnemc.h5').unlink()

ds[selc_vars].to_netcdf('cnemc.h5',
                        engine='netcdf4',
                        encoding=enco_vars,
                        mode='w')

### 利用服务器上的历史数据替补
需先检查一遍，可以合并后检查重复项或者看两个文件的 MD5 是否一致

In [None]:
f1 = Path('2022-02-24T00.csv')
f2 = Path('2022-02-24T00(1).csv')
h1 = hashlib.md5(open(f1, "rb").read()).hexdigest()
h2 = hashlib.md5(open(f2, "rb").read()).hexdigest()

In [None]:
if h1 == h2:
    print('f1 == f2', h1)
    f2.unlink()
else:
    t1 = f1.read_text()
    t2 = '\n'.join(f2.read_text().splitlines()[1:])
    f1.unlink()
    f2.unlink()
    f1.write_text(t1+t2)

    df = pd.read_csv(f1, parse_dates=['timepoint'])
    df.drop_duplicates(inplace=True)
    if df.duplicated(subset='stationcode').sum() > 0:
        print(df[df.duplicated(subset='stationcode', keep=False)].T)