In [1]:
import sys
import os
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor, as_completed
from subprocess import check_output

import xarray as xr

PACKAGE_DIR = os.path.dirname(os.getcwd())
sys.path.append(PACKAGE_DIR)

from ensabc.parse import parse_ecmwf_index_detail, parse_gfs_index_detail
from ensabc.fetch import grib_detail_download, merge_grib

## 1. Fetch Ensemble Forecast Data

> The ensemble forecast data ususally contain 1 control forecast & multi pertubation forecast <br>
> data in a single step. We can use the 'dataType' and 'number'/'perturbationNumber' keys <br>
> of message to indentify the specific forecast run. <br>
> 
> In this section, the code will download and merge the ensemble data of one variable at <br>
> one step as a single grib file.

In [2]:
batch = datetime.utcnow() - timedelta(hours=13)
batch = batch.replace(hour=int(batch.hour/12)*12)
step = 240
var = '2t'

### 1.1 ECMWF - ENS

> ECMWF open ensemble forecast data include 1 control run & 50 perturbation runs. All the runs <br>
> are included in a single grib file. The following code will only download a single variable <br>
> of all runs. 

In [3]:
grib_url = f'https://data.ecmwf.int/forecasts/{batch:%Y%m%d}/{batch.hour:02}z/0p4-beta/enfo/{batch:%Y%m%d%H}0000-{step}h-enfo-ef.grib2'
index_url = grib_url.replace('.grib2', '.index')
local_grib =  os.path.join(PACKAGE_DIR, f'data/ecmwf-enfo-{batch:%Y%m%d%H}-{step}-{var}.grb')
grib_detail = parse_ecmwf_index_detail(index_url)
select_message = grib_detail[grib_detail['param']==var]
select_message

Unnamed: 0,domain,date,time,expver,class,type,stream,step,levtype,number,param,start,_offset,end,_length,levelist
0,g,20230730,1200,1,od,pf,enfo,240,sfc,49.0,2t,0,0,239281,239281,2
1,g,20230730,1200,1,od,pf,enfo,240,sfc,38.0,2t,239281,239281,526311,287030,2
2,g,20230730,1200,1,od,pf,enfo,240,sfc,41.0,2t,526311,526311,812899,286588,2
3,g,20230730,1200,1,od,pf,enfo,240,sfc,18.0,2t,812899,812899,1099664,286765,2
4,g,20230730,1200,1,od,pf,enfo,240,sfc,4.0,2t,1099664,1099664,1386922,287258,2
5,g,20230730,1200,1,od,pf,enfo,240,sfc,45.0,2t,1386922,1386922,1626482,239560,2
6,g,20230730,1200,1,od,pf,enfo,240,sfc,20.0,2t,1626482,1626482,1915671,289189,2
7,g,20230730,1200,1,od,pf,enfo,240,sfc,32.0,2t,1915671,1915671,2201404,285733,2
8,g,20230730,1200,1,od,pf,enfo,240,sfc,10.0,2t,2201404,2201404,2439437,238033,2
9,g,20230730,1200,1,od,pf,enfo,240,sfc,12.0,2t,2439437,2439437,2678631,239194,2


In [4]:
result = grib_detail_download(
    grib_url, 
    local_grib,
    select_message, 
)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['start'] = df['start'].astype(int)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['end'] = df['end'].astype(int)
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['group'].iloc[int(i[0]-1)]=group
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user

In [5]:
grib_ls_result = check_output(f'grib_ls -p date,stepRange,dataType,number,perturbationNumber,shortName {local_grib}', shell=True)
grib_ls_result.decode().split('\n')[1:-4]

['date                stepRange           dataType            number              perturbationNumber  shortName           ',
 '20230730            240                 pf                  49                  49                  2t                 ',
 '20230730            240                 pf                  38                  38                  2t                 ',
 '20230730            240                 pf                  41                  41                  2t                 ',
 '20230730            240                 pf                  18                  18                  2t                 ',
 '20230730            240                 pf                  4                   4                   2t                 ',
 '20230730            240                 pf                  45                  45                  2t                 ',
 '20230730            240                 pf                  20                  20                  2t                 ',
 '20230

### 1.2 NOAA - GEFS

> NOAA GEFS forecast data include 1 control run & 30 perturbation runs. Each run exist in <br>
> a separate grib file. The following code will only download single variable of all <br>
> runs and merge as one single grib file.

In [6]:
# all forecast url & detail
url_grib_detail_dict = dict({})

# perturbation forecast
for number in range(1,31,1):
    pn_grib_url = f'noaa-gefs-pds/gefs.{batch:%Y%m%d}/{batch.hour:02}/atmos/pgrb2bp5/gep{number:02}.t{batch.hour:02}z.pgrb2b.0p50.f{step:03}'
    pn_index_url = pn_grib_url+'.idx'
    url_grib_detail_dict[f'{number:02}'] = {
        'url': pn_grib_url,
        'detail': parse_gfs_index_detail(pn_index_url), 
    }
# control forecast
ctrl_grib_url = f'noaa-gefs-pds/gefs.{batch:%Y%m%d}/{batch.hour:02}/atmos/pgrb2bp5/gec00.t{batch.hour:02}z.pgrb2b.0p50.f{step:03}'
ctrl_index_url = ctrl_grib_url+'.idx'

url_grib_detail_dict['cf'] = {
    'url': ctrl_grib_url,
    'detail': parse_gfs_index_detail(ctrl_index_url)
}

In [7]:
# download surface temperature grib files
local_grib =  os.path.join(PACKAGE_DIR, f'data/noaa-gefs-{batch:%Y%m%d%H}-{step:02}-{var}.grb')

download_list = []
for k, v in url_grib_detail_dict.items():
    detail_ = v['detail']
    download_list.append(
        (
            v['url'], 
            f'{local_grib}.{k}',
            detail_[(detail_['shortName']=='TMP') & (detail_['level']=='surface')]
        )
    )

files = []
futures = []
with ThreadPoolExecutor(max_workers=3) as exec:
    for d in download_list:
        futures.append(
            exec.submit(
                grib_detail_download,
                download_url=d[0],
                local_fp=d[1], 
                grib_detail=d[2],  
                typing='s3'
            )
        )
        files.append(d[1])

    for f in as_completed(futures):
        f.result()

# merge
merge_grib(files, local_grib)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  grib_detail.loc[:,('group')]=0
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  grib_detail.loc[:,('group')]=0
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  grib_detail.loc[:,('group')]=0
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = val

In [8]:
grib_ls_result = check_output(f'grib_ls -p date,stepRange,dataType,number,perturbationNumber,shortName {local_grib}', shell=True)
grib_ls_result.decode().split('\n')[1:-4]

['date                stepRange           dataType            number              perturbationNumber  shortName           ',
 '20230730            240                 pf                  1                   1                   t                  ',
 '20230730            240                 pf                  2                   2                   t                  ',
 '20230730            240                 pf                  3                   3                   t                  ',
 '20230730            240                 pf                  4                   4                   t                  ',
 '20230730            240                 pf                  5                   5                   t                  ',
 '20230730            240                 pf                  6                   6                   t                  ',
 '20230730            240                 pf                  7                   7                   t                  ',
 '20230

### 1.3 CMC-GEPS

> CMC GEPS forecast data include 1 control run & 20 perturbation runs. All the runs <br>
> are included in a single grib file. The following code will download a single variable <br>
> of all runs. 

In [9]:
grib_url = f'https://dd.weather.gc.ca/ensemble/geps/grib2/raw/{batch.hour:02}/{step:03}/CMC_geps-raw_{"TMP"}_{"TGL"}_{"2m"}_latlon0p5x0p5_{batch:%Y%m%d%H}_P{step:03}_allmbrs.grib2'
local_grib =  os.path.join(PACKAGE_DIR, f'data/cmc-geps-{batch:%Y%m%d%H}-{step:02}-{var}.grb')

result = grib_detail_download(
    grib_url,
    local_grib
)

In [10]:
grib_ls_result = check_output(f'grib_ls -p date,stepRange,dataType,number,perturbationNumber,shortName {local_grib}', shell=True)
grib_ls_result.decode().split('\n')[1:-4]

['date                stepRange           dataType            number              perturbationNumber  shortName           ',
 '20230730            240                 cf                  0                   0                   2t                 ',
 '20230730            240                 pf                  1                   1                   2t                 ',
 '20230730            240                 pf                  2                   2                   2t                 ',
 '20230730            240                 pf                  3                   3                   2t                 ',
 '20230730            240                 pf                  4                   4                   2t                 ',
 '20230730            240                 pf                  5                   5                   2t                 ',
 '20230730            240                 pf                  6                   6                   2t                 ',
 '20230

## 2. Convert and Standardize

In [11]:
# unify dataType(mark cf as pf and set perturbuationNumber/number as 0)
for source in ['ecmwf-enfo', 'noaa-gefs', 'cmc-geps']:
    local_grib = os.path.join(PACKAGE_DIR, f'data/{source}-{batch:%Y%m%d%H}-{step:02}-{var}.grb')
    check_output(f'grib_set -s number=0,perturbationNumber=0,dataType=pf -w dataType=cf {local_grib} {local_grib.replace(".grb", ".grib2")}', shell=True)
    os.remove(local_grib)

In [12]:
for source in ['ecmwf-enfo', 'noaa-gefs', 'cmc-geps']:
    local_grib = os.path.join(PACKAGE_DIR, f'data/{source}-{batch:%Y%m%d%H}-{step:02}-{var}.grib2')
    ds = xr.open_dataset(local_grib, indexpath=None)
    for v in ds.data_vars:
        nds = ds.expand_dims(dim='time', axis=0).reset_coords()[v].to_dataset(name='t2m')
        nds.to_netcdf(local_grib.replace(".grib2", ".nc"), format='NetCDF4',encoding={"t2m":{"zlib": True, "complevel": 4}})
    ds.close()

In [21]:
source ='ecmwf-enfo'
local_nc = os.path.join(PACKAGE_DIR, f'data/{source}-{batch:%Y%m%d%H}-{step:02}-{var}.nc')
info = check_output(f'cdo sinfo {local_nc}', shell=True)
info.decode().split('\n')



['   File format : NetCDF4 zip',
 '    -1 : Institut Source   T Steptype Levels Num    Points Num Dtype : Parameter ID',
 '     1 : unknown  unknown  v instant      51   1    405900   1  F32z : -1            ',
 '   Grid coordinates :',
 '     1 : lonlat                   : points=405900 (900x451)',
 '                        longitude : -180 to 179.6 by 0.4 degrees_east  circular',
 '                         latitude : 90 to -90 by -0.4 degrees_north',
 '   Vertical coordinates :',
 '     1 : generic                  : levels=51',
 '                           number : 0 to 50 by 1 1',
 '   Time coordinate :',
 '                             time : 1 step',
 '     RefTime =  1970-01-01 00:00:00  Units = seconds  Calendar = proleptic_gregorian',
 '  YYYY-MM-DD hh:mm:ss  YYYY-MM-DD hh:mm:ss  YYYY-MM-DD hh:mm:ss  YYYY-MM-DD hh:mm:ss',
 '  2023-07-30 12:00:00',
 '']

In [23]:
source ='noaa-gefs'
local_nc = os.path.join(PACKAGE_DIR, f'data/{source}-{batch:%Y%m%d%H}-{step:02}-{var}.nc')
info = check_output(f'cdo sinfo {local_nc}', shell=True)
info.decode().split('\n')



['   File format : NetCDF4 zip',
 '    -1 : Institut Source   T Steptype Levels Num    Points Num Dtype : Parameter ID',
 '     1 : unknown  unknown  v instant      31   1    259920   1  F32z : -1            ',
 '   Grid coordinates :',
 '     1 : lonlat                   : points=259920 (720x361)',
 '                        longitude : 0 to 359.5 by 0.5 degrees_east  circular',
 '                         latitude : 90 to -90 by -0.5 degrees_north',
 '   Vertical coordinates :',
 '     1 : generic                  : levels=31',
 '                           number : 0 to 30 by 1 1',
 '   Time coordinate :',
 '                             time : 1 step',
 '     RefTime =  1970-01-01 00:00:00  Units = seconds  Calendar = proleptic_gregorian',
 '  YYYY-MM-DD hh:mm:ss  YYYY-MM-DD hh:mm:ss  YYYY-MM-DD hh:mm:ss  YYYY-MM-DD hh:mm:ss',
 '  2023-07-30 12:00:00',
 '']

In [24]:
source ='cmc-geps'
local_nc = os.path.join(PACKAGE_DIR, f'data/{source}-{batch:%Y%m%d%H}-{step:02}-{var}.nc')
info = check_output(f'cdo sinfo {local_nc}', shell=True)
info.decode().split('\n')



['   File format : NetCDF4 zip',
 '    -1 : Institut Source   T Steptype Levels Num    Points Num Dtype : Parameter ID',
 '     1 : unknown  unknown  v instant      21   1    259920   1  F32z : -1            ',
 '   Grid coordinates :',
 '     1 : lonlat                   : points=259920 (720x361)',
 '                        longitude : 0 to 359.5 by 0.5 degrees_east  circular',
 '                         latitude : -90 to 90 by 0.5 degrees_north',
 '   Vertical coordinates :',
 '     1 : generic                  : levels=21',
 '                           number : 0 to 20 by 1 1',
 '   Time coordinate :',
 '                             time : 1 step',
 '     RefTime =  1970-01-01 00:00:00  Units = seconds  Calendar = proleptic_gregorian',
 '  YYYY-MM-DD hh:mm:ss  YYYY-MM-DD hh:mm:ss  YYYY-MM-DD hh:mm:ss  YYYY-MM-DD hh:mm:ss',
 '  2023-07-30 12:00:00',
 '']