In [1]:
import xarray as xr
import numpy as np
# import libs.drawFogMap as drawTools
import os
import arrow
import json
import time
import pandas as pd
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
import datetime

In [None]:
def calLatestBaseTime() -> str:
    '''
    计算最新时次起报
    :return baseTime YYYYMMDDHH
    '''
    utcnow = arrow.utcnow()
    hour = utcnow.hour
    # ECMWF 任务计划https://confluence.ecmwf.int/display/UDOC/Dissemination+schedule
    if(hour >= 7 and hour < 19):
        baseTime = f"{utcnow.format('YYYYMMDD')}00"  # 世界时7~19时用当天00时起报
    elif (hour >= 19):
        baseTime = f"{utcnow.format('YYYYMMDD')}12"  # 世界时19~00时用当天00时起报
    else:
        # 小于世界时7时用前一天12时起报
        baseTime = f"{utcnow.shift(days = -1).format('YYYYMMDD')}12"
    return baseTime

def readFromTDS(initTime: str = '2022031700', modelId: str = 'ecmwfthin', area: list = [105, 125, 15, 28]) -> dict:
    '''
    从TDS接口读取指定起报时间数据, 返回dataset
    :params initTime: 起报时间世界时YYYYMMDDHH
    :params modelId: 模式名
    :params area: 筛选区域[西, 东, 南, 北]
    :return dataset字典
    '''
    # TODO: 需要读取的要素t2mm, t2md, sstk, v100, v10m, u100, u10m, (rhum, temp) => (theta925,theta1000)
    # TODO: 高空的数据插值到地面的分辨率上
    year = initTime[0:4]
    month = initTime[4:6]
    day = initTime[6:8]
    hour = initTime[8:10]
    selectedTime = '{0}-{1}-{2} {3}:00:00'.format(year, month, day, hour)
    baseUrl = 'http://10.148.8.71:7080/thredds/dodsC/{0}/'.format(modelId)
    url_td = baseUrl + f'{year}{month}/t2md.nc'
    url_t2m = baseUrl + f'{year}{month}/t2mm.nc'
    url_sst = baseUrl + f'{year}{month}/sstk.nc'
    url_u100 = baseUrl + f'{year}{month}/u100.nc'
    url_v100 = baseUrl + f'{year}{month}/v100.nc'
    url_u10m = baseUrl + f'{year}{month}/u10m.nc'
    url_v10m = baseUrl + f'{year}{month}/v10m.nc'
    url_rhum = baseUrl + f'{year}{month}/rhum.nc'
    url_temp = baseUrl + f'{year}{month}/temp.nc'
    try:
        dataSet_td = xr.open_dataset(url_td)
        dataSet_t2m = xr.open_dataset(url_t2m)
        dataSet_sst = xr.open_dataset(url_sst)
        dataSet_u100 = xr.open_dataset(url_u100)
        dataSet_v100 = xr.open_dataset(url_v100)
        dataSet_u10m = xr.open_dataset(url_u10m)
        dataSet_v10m = xr.open_dataset(url_v10m)
        dataSet_rhum = xr.open_dataset(url_rhum)
        dataSet_temp = xr.open_dataset(url_temp)
    except Exception as e:
        print('无法获取数据源')
        raise e
    ds_td = dataSet_td.sel(time=selectedTime, level=0.0, lat=slice(
        area[2], area[3]), lon=slice(area[0], area[1]))
    ds_t2m = dataSet_t2m.sel(time=selectedTime, level=0.0, lat=slice(
        area[2], area[3]), lon=slice(area[0], area[1]))
    ds_sst = dataSet_sst.sel(time=selectedTime, level=0.0, lat=slice(
        area[2], area[3]), lon=slice(area[0], area[1]))
    ds_u100 = dataSet_u100.sel(time=selectedTime, level=0.0, lat=slice(
        area[2], area[3]), lon=slice(area[0], area[1]))
    ds_v100 = dataSet_v100.sel(time=selectedTime, level=0.0, lat=slice(
        area[2], area[3]), lon=slice(area[0], area[1]))
    ds_u10m = dataSet_u10m.sel(time=selectedTime, level=0.0, lat=slice(
        area[2], area[3]), lon=slice(area[0], area[1]))
    ds_v10m = dataSet_v10m.sel(time=selectedTime, level=0.0, lat=slice(
        area[2], area[3]), lon=slice(area[0], area[1]))
    ds_rhum = dataSet_rhum.sel(time=selectedTime, level=0.0, lat=slice(
        area[2], area[3]), lon=slice(area[0], area[1]))
    ds_temp = dataSet_temp.sel(time=selectedTime, level=0.0, lat=slice(
        area[2], area[3]), lon=slice(area[0], area[1]))
    
    return {'td': ds_td, 't2m': ds_t2m, 'sst': ds_sst}



In [None]:
def main(time):
    if(not time):
        initTime = calLatestBaseTime()  # '2022031500'
    else:
        initTime = time
    arrow_initTime = arrow.get(initTime, 'YYYYMMDDHH')
    try:
        selected_ds = readFromTDS(initTime=initTime)
    except Exception as e:
        return e
    ds_td = selected_ds['td']
    ds_t2m = selected_ds['t2m']
    ds_sst = selected_ds['sst']
    infoList = []
    isLogExists = False
    ### 读取日志文件 ###
    logInfo = loadLog(arrow_initTime)
    if(logInfo['success']):
        logInfo = json.loads(logInfo['data'])
        isLogExists = True

    for iTimeStep in timeStrList:
        # 读取JSON中的数据, 有成功匹配的值则跳过
        if(isLogExists):  # 如果日志存在并且上次已经成功则跳过此项
            log_item = findLogDataBytimeStep(logInfo, iTimeStep)
            if(log_item and log_item['success']):
                infoList.append(log_item)
                continue

        tdVarName = f't2md{iTimeStep}'
        t2mVarName = f't2mm{iTimeStep}'
        sstVarName = f'sstk{iTimeStep}'
        td = ds_td[tdVarName]
        t2m = ds_t2m[t2mVarName]
        sst = ds_sst[sstVarName]
        if(td.min() < -999.0 or t2m.min() < -999.0 or sst.min() < -999.0):
            outputInfo = {
                'status': 'error',
                'success': False,
                'initTime': initTime,
                'message': 'No Data -999.9',
                'timeStep': iTimeStep,
            }
            print(outputInfo)
            infoList.append(outputInfo)
            continue
        else:
            td = xr.where(td > 50, td, -999.9)
            td = convertDataArray(td, config['t2md'])
            t2m = xr.where(t2m > 50, t2m, -999.9)
            t2m = convertDataArray(t2m, config['t2mm'])
            sst = xr.where(sst > 50, sst, -999.9)
            sst = convertDataArray(sst, config['sstk'])
            t2m_td = t2m - td  # 温度露点差
            td_sst = td - sst  # 露点海温差
            t2m_td = t2m - td  # 温度露点差
            td_sst = td - sst  # 露点海温差
            fog = xr.where(np.logical_and(
                t2m_td >= 0.8, td_sst > 0), -0.5, td_sst)
            fog = convertDataArray(fog, config['fog'])
            ds = xr.Dataset()
            ds['fog'] = fog
            ds['td_sst'] = td_sst
            ds['t2m_td'] = t2m_td
            ds['sst'] = sst
            ds['td'] = td
            ds['t2m'] = t2m

            ####### 储存文件 #######
            fileDir = os.path.join(current_dir, f'../{dataBaseDir}{arrow_initTime.format("YYYY/MM/DDHH/")}')
            fileDir = os.path.normpath(fileDir)
            try:
                drawTools.createDir(fileDir)
                filePath = os.path.join(
                    fileDir, f'fog_{initTime}_{iTimeStep}.nc')
                filePath = os.path.normpath(filePath)
                ds.to_netcdf(filePath)
                print('储存netCDF: '+filePath)
            except Exception as e:
                print(e)

            #####################
            try:
                imgPath = drawTools.drawFogMap(
                    ds, initTime, iTimeStep, imgBaseDir)
                outputInfo = {
                    'status': 'success',
                    'success': True,
                    'initTime': initTime,
                    'message': f'complete {imgPath}',
                    'timeStep': iTimeStep,
                }
                infoList.append(outputInfo)
            except Exception as e:
                outputInfo = {
                    'status': 'error',
                    'success': False,
                    'initTime': initTime,
                    'message': 'Draw Figure Error',
                    'timeStep': iTimeStep,
                }
                infoList.append(outputInfo)
            ds.close()