# CAMS50 VRA2016: Collocated model results
CAMS50 runs a reanalysis with validated obrvations 2 years after the fact.

In [1]:
from glob import glob
from os.path import isfile, basename, dirname
from os import remove

import numpy as np
import pandas as pd
import xarray as xr
import xarray.ufuncs as xu
from dask.diagnostics import ProgressBar

# only 3 decimal points on df.head() and df.describe()
pd.options.display.float_format = '{:,.3f}'.format

## Datasets
- `eeaVRA`: validated surface obs for data assimilation
- `eeaVAL`: validated surface obs for model evaluataion
- `cifsBC`: CIFS boundary conditions
- `emepHC`: hindcast run (no DA), operasional version (CAMS50.201801)
- `emepSS`: hindcast run (no DA), operasional version (CAMS50.201801)
- `emepAN`: (re)analysis run (DA: NO2,O3,SO2), operasional version (CAMS50.201801; DA16)
- `emepCO`: (re)analysis run (DA: NO2,O3,SO2,CO), operasional version (CAMS50.201801; DA16)
- `emepPM`: (re)analysis run (DA: NO2,O3,SO2,PM25,PM10), development version (CAMS50.201801; DA17 wo/PM feedback)

In [2]:
lustre = "/lustre/storeA/users/alvarov/CAMS50/%s"
files = dict(
    eeaVRA=glob(lustre%'obs/VRA_2016/assimilation_*.nc'),
    eeaVAL=glob(lustre%'obs/VRA_2016/validation_*.nc'),
    cifsBC=glob(lustre%'2016_VRA/VRA_2016????_EU_EVA.nc'),
    emepHC=glob(lustre%'VRA-2016/BM_CAMS50.201801/VRA00-2016.nc'),
    emepSS=glob(lustre%'VRA-2016/BM_CAMS50.201801/VRA00SS-2016.nc'),
    emepAN=glob(lustre%'VRA-2016/BM_CAMS50.201801/VRA00AN-2016Q?.nc'),
    emepCO=glob(lustre%'VRA-2016/BM_CAMS50.201801/VRA00CO-2016Q?.nc'),
    emepPM=glob(lustre%'VRA-2016/BM_CAMS50.201801/VRA00PM-2016.nc'),
)
for k,v in files.items():
    print("%s: %3d files"%(k,len(v)))

eeaVRA:   6 files
eeaVAL:   6 files
cifsBC: 366 files
emepHC:   1 files
emepSS:   0 files
emepAN:   4 files
emepCO:   4 files
emepPM:   0 files


In [3]:
# save collocated datasets
def save2nc(ds=None, f=lustre%'vra2016colloc.nc'):
    if isfile(f):
        data = xr.open_dataset(f, autoclose=True).load()
        if ds:
            data = data.combine_first(ds)
            for param in ds.data_vars: 
                if 'units' not in data[param].attrs:
                    data[param].attrs.update(ds[param].attrs)
            data.to_netcdf(f, mode='w')
            del(ds)
        return data
    elif ds:
        ds.to_netcdf(f, mode='w')
        return ds
    else:
        return xr.Dataset()

# Validated Observations
The processing of the observatiuon datasets is dealt in a separate [notebook](stations.ipynb)

## Unique stations

In [4]:
%time stat = save2nc()[['lon','lat','alt','cls']]
%time stat = stat.sel(dataset='eeaVRA').combine_first(stat.sel(dataset='eeaVAL'))
stat

CPU times: user 312 ms, sys: 1.14 s, total: 1.45 s
Wall time: 26 s
CPU times: user 8 ms, sys: 0 ns, total: 8 ms
Wall time: 8.07 ms


<xarray.Dataset>
Dimensions:  (poll: 6, station: 2331)
Coordinates:
    cls      (poll, station) object 'background/urban' '' '' ...
  * station  (station) object 'AD0942A' 'AD0944A' 'AD0945A' 'AL0203A' ...
  * poll     (poll) object 'CO' 'NO2' 'O3' 'PM10' 'PM25' 'SO2'
Data variables:
    lon      (station) float32 1.539138 1.56525 1.716986 20.78018 19.4862 ...
    lat      (station) float32 42.509693 42.516945 42.53488 40.62593 ...
    alt      (station) float32 1080.0 1637.0 2515.0 848.0 25.0 13.0 525.0 ...
Attributes:
    source:   /home/alvarov/obs4cwf/2016_AirBase/data.background.assimilation...

# Collocation
For point-wise collocation, the lon/lat indexers need to be xarray.DataArrays.

In [5]:
def collocate(ds, lon=stat.lon, lat=stat.lat, dlon=1/4, dlat=1/8):
    """
    collocate dataset to coordinates
      for point-wise selection lon/lat need to be DataArrays (and ds.load())
      .sel(.., tolerance=max(dlat,dlon)) raise a KeyError for points outside domain
    """
    col = ds.load().sel(lon=lon, lat=lat, method='nearest')
    return col.where(abs(col.lon-lon)<dlon*0.5)\
              .where(abs(col.lat-lat)<dlat*0.5)\
              .reset_coords()

# Boundary conditions
From CIFS reanalysis. Daily files with 3-hourly records. 
- 366 files ~333M each, total 119Gb.

In [6]:
%time ds = xr.open_dataset(files['cifsBC'][0])
ds

CPU times: user 60 ms, sys: 4 ms, total: 64 ms
Wall time: 617 ms


<xarray.Dataset>
Dimensions:    (ilevel: 61, latitude: 65, level: 60, longitude: 207, time: 8)
Coordinates:
  * longitude  (longitude) float32 -115.875 -114.75 -113.625 -112.5 -111.375 ...
  * latitude   (latitude) float32 81.0 79.875 78.75 77.625 76.5 75.375 74.25 ...
  * level      (level) int32 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 ...
  * time       (time) datetime64[ns] 2016-02-09 2016-02-09T03:00:00 ...
Dimensions without coordinates: ilevel
Data variables:
    t          (time, level, latitude, longitude) float32 ...
    aermr01    (time, level, latitude, longitude) float32 ...
    aermr02    (time, level, latitude, longitude) float32 ...
    aermr03    (time, level, latitude, longitude) float32 ...
    aermr04    (time, level, latitude, longitude) float32 ...
    aermr05    (time, level, latitude, longitude) float32 ...
    aermr06    (time, level, latitude, longitude) float32 ...
    aermr07    (time, level, latitude, longitude) float32 ...
    aermr08    (time, leve

In [7]:
surfBCs = lambda ds: ds.rename(dict(
    longitude='lon',
    latitude='lat',
    co='CO',
    no2='NO2',
    so2='SO2',
    go3='O3',
)).sel(level=60).drop('level')
""" PM*
    aermr01='SEASALT_F',
    aermr02='SEASALT_C',
   #aermr03='SEASALT_C',    # not used
    aermr04='DUST_SAH_F',
    aermr05='DUST_SAH_F',
    aermr06*.15='DUST_SAH_F',
    aermr06*.35='DUST_SAH_C',
   #aermr07*1.7='FFIRE_OM', # not used
   #aermr08*1.7='FFIRE_OM', # not used
    aermr09='FFIRE_BC',     # not used
    aermr10='FFIRE_BC',     # not used
    aermr11='SO4',
   #aermr12='SO2',          # not used
"""

dropBCs = "aermr01 aermr02 aermr03 aermr04 aermr05 aermr06 aermr07 aermr08 aermr09 aermr10 aermr11 aermr12 hno3 pan no hcho ch4 c5h8 oh n2o5 c2h6 c3h8 hyai hybi".split()

%time collocate(surfBCs(ds.drop(dropBCs)), dlon=1.125, dlat=1.125)

CPU times: user 32 ms, sys: 4 ms, total: 36 ms
Wall time: 405 ms


<xarray.Dataset>
Dimensions:  (station: 2331, time: 8)
Coordinates:
  * time     (time) datetime64[ns] 2016-02-09 2016-02-09T03:00:00 ...
  * station  (station) object 'AD0942A' 'AD0944A' 'AD0945A' 'AL0203A' ...
Data variables:
    t        (time, station) float32 279.74927 279.74927 281.19107 278.21692 ...
    NO2      (time, station) float32 1.5324733e-08 1.5324733e-08 ...
    SO2      (time, station) float32 3.3162166e-09 3.3162166e-09 ...
    CO       (time, station) float32 2.3641746e-07 2.3641746e-07 ...
    O3       (time, station) float32 2.1464075e-08 2.1464075e-08 ...
    lnsp     (time, station) float32 11.428396 11.428396 11.4627495 ...
    lon      (station) float32 1.125 1.125 2.25 20.25 19.125 19.125 13.5 ...
    lat      (station) float32 42.75 42.75 42.75 40.5 40.5 42.75 48.375 ...

In [8]:
%%time
ds = xr.open_mfdataset(   
    files['cifsBC'], chunks={'time':10}, concat_dim='time', autoclose=True,
    preprocess=surfBCs, drop_variables=dropBCs,
).assign_coords(dataset='cifsBC').expand_dims('dataset')

CPU times: user 15.7 s, sys: 2.85 s, total: 18.6 s
Wall time: 5min 58s


In [9]:
%%time
with ProgressBar():
    cifs = collocate(ds)

[########################################] | 100% Completed | 28min  9.6s
CPU times: user 2min 16s, sys: 1min 32s, total: 3min 49s
Wall time: 28min 15s


In [10]:
cifs

<xarray.Dataset>
Dimensions:  (dataset: 1, station: 2331, time: 2928)
Coordinates:
  * time     (time) datetime64[ns] 2016-02-09 2016-02-09T03:00:00 ...
  * dataset  (dataset) <U6 'cifsBC'
  * station  (station) object 'AD0942A' 'AD0944A' 'AD0945A' 'AL0203A' ...
Data variables:
    t        (dataset, time, station) float32 nan nan nan nan nan nan nan ...
    NO2      (dataset, time, station) float32 nan nan nan nan nan nan nan ...
    SO2      (dataset, time, station) float32 nan nan nan nan nan nan nan ...
    CO       (dataset, time, station) float32 nan nan nan nan nan nan nan ...
    O3       (dataset, time, station) float32 nan nan nan nan nan nan nan ...
    lnsp     (dataset, time, station) float32 nan nan nan nan nan nan nan ...
    lon      (station) float32 1.125 1.125 2.25 20.25 19.125 19.125 13.5 ...
    lat      (station) float32 42.75 42.75 42.75 40.5 40.5 42.75 48.375 ...

## Unit conversion
CIFS concentrations come in `kg/kg`, observations are in `ug/m3`

In [11]:
def unitConv(ds):
    rho = xu.exp(ds.lnsp)/(287.05 * ds.t)    
    for param in ds.data_vars: 
        if ds[param].attrs.get('units',None) == 'kg kg**-1':
            ds[param] *= 1e9*rho
            ds[param].attrs['units'] = 'ug/m3'
    return ds.drop(['t','lnsp'])
    
%time cifs = unitConv(cifs)

cifs.drop(['lon','lat']).to_dataframe().describe()

CPU times: user 132 ms, sys: 240 ms, total: 372 ms
Wall time: 371 ms


Unnamed: 0,NO2,SO2,CO,O3
count,204960.0,204960.0,204960.0,204960.0
mean,12.166,4.201,218.72,49.309
std,10.857,6.482,118.87,27.202
min,0.118,-0.0,62.659,-0.001
25%,3.425,0.954,146.016,27.812
50%,8.627,1.954,185.37,50.665
75%,18.483,4.49,252.259,70.593
max,65.431,96.754,1648.183,159.906


## Save collocated dataset

In [12]:
%time data = save2nc(cifs)
data.sel(dataset='cifsBC').drop(['lon','lat','alt']).to_dataframe().describe()

  """


CPU times: user 7.1 s, sys: 29.9 s, total: 37 s
Wall time: 54.4 s


Unnamed: 0,CO,NO2,PM10,PM25,SO2,O3
count,1229760.0,1229760.0,0.0,0.0,1229760.0,1229760.0
mean,218.727,12.166,,,4.201,49.302
std,118.833,10.855,,,6.48,27.196
min,62.659,0.118,,,-0.0,-0.001
25%,146.016,3.425,,,0.954,27.812
50%,185.37,8.627,,,1.954,50.665
75%,252.259,18.483,,,4.49,70.593
max,1648.183,65.431,,,96.754,159.906


# Model runs
The EMEP domain has 3 times the records and ~8 times more grid points than the CIFS domain.
- `emepHC`,  `emepSS`:
  Single hindcast run, producing one **29Gb** hourly output file.
- `emepAN`, `emepCO`: 
  4 overlaping analysis runs, each producing **~8G** hourly output files.
- `emepPM`:
  Single analysis run, producing one **29Gb** hourly output file.

In [13]:
def readRun(run):   
    ds = xr.Dataset()
    for fname in files[run]:
        ds = ds.combine_first(xr.open_dataset(fname, chunks={'time':6}))
    return ds.assign_coords(dataset=run).expand_dims('dataset')

# Hindcast run

In [14]:
%time ds = readRun('emepHC')
ds

CPU times: user 1.82 s, sys: 1.27 s, total: 3.09 s
Wall time: 6min 5s


<xarray.Dataset>
Dimensions:            (dataset: 1, ilev: 9, lat: 369, lev: 8, lon: 301, time: 8785)
Coordinates:
  * lon                (lon) float64 -30.0 -29.75 -29.5 -29.25 -29.0 -28.75 ...
  * lat                (lat) float64 30.0 30.12 30.25 30.38 30.5 30.62 30.75 ...
  * lev                (lev) float64 0.9946 0.9838 0.9703 0.9509 0.8932 ...
  * ilev               (ilev) float64 0.9892 0.9784 0.9621 0.9396 0.8756 ...
  * time               (time) datetime64[ns] 2016-01-01 2016-01-01T01:00:00 ...
  * dataset            (dataset) <U6 'emepHC'
Data variables:
    P0                 (dataset) float64 1.013e+03
    hyam               (dataset, lev) float64 dask.array<shape=(1, 8), chunksize=(1, 8)>
    hybm               (dataset, lev) float64 dask.array<shape=(1, 8), chunksize=(1, 8)>
    hyai               (dataset, ilev) float64 dask.array<shape=(1, 9), chunksize=(1, 9)>
    hybi               (dataset, ilev) float64 dask.array<shape=(1, 9), chunksize=(1, 9)>
    SURF_ug_O3      

## Collocate

In [15]:
surfEMEP = dict(
    SURF_ug_O3='O3',
    SURF_ug_NO2='NO2',
    SURF_ug_SO2='SO2',
    SURF_ug_CO='CO',
    SURF_ug_PM25_rh50='PM25',
    SURF_ug_PM10_rh50='PM10',
)

dropEMEP = 'P0 lev ilev hyam hybm hyai hybi COLUMN_NO2_k20 COLUMN_O3_k20 AOD_550nm'.split()

In [15]:
with ProgressBar():
    %time emep = collocate(ds.drop(dropEMEP).rename(surfEMEP), dlon=1/4, dlat=1/8)

[########################################] | 100% Completed | 48min 24.0s
CPU times: user 12min 11s, sys: 1min, total: 13min 12s
Wall time: 48min 43s


In [16]:
emep

<xarray.Dataset>
Dimensions:  (dataset: 1, station: 2331, time: 8785)
Coordinates:
  * time     (time) datetime64[ns] 2016-01-01 2016-01-01T01:00:00 ...
  * dataset  (dataset) <U6 'emepHC'
  * station  (station) object 'AD0942A' 'AD0944A' 'AD0945A' 'AL0203A' ...
Data variables:
    O3       (dataset, time, station) float32 51.35521 51.35521 43.02802 ...
    NO2      (dataset, time, station) float32 1.6257424 1.6257424 2.5954952 ...
    PM25     (dataset, time, station) float32 0.8374309 0.8374309 1.9163927 ...
    PM10     (dataset, time, station) float32 1.0202171 1.0202171 2.105609 ...
    SO2      (dataset, time, station) float32 0.0035920602 0.0035920602 ...
    CO       (dataset, time, station) float32 115.959785 115.959785 ...
    lon      (station) float64 1.5 1.5 1.75 20.75 19.5 19.5 13.75 16.75 16.0 ...
    lat      (station) float64 42.5 42.5 42.5 40.62 40.38 42.38 48.38 47.75 ...

## Save collocated dataset

In [17]:
%time data = save2nc(emep)
data.sel(dataset='emepHC').drop(['lon','lat','alt']).to_dataframe().describe()

  """


CPU times: user 8.65 s, sys: 4.76 s, total: 13.4 s
Wall time: 18.9 s


Unnamed: 0,CO,NO2,PM10,PM25,SO2,O3
count,120547770.0,120547770.0,120547770.0,120547770.0,120547770.0,120547770.0
mean,76.621,5.231,8.092,5.943,2.65,17.988
std,144.101,9.478,13.68,10.781,8.363,34.15
min,0.482,0.0,0.401,0.401,0.0,0.0
25%,130.466,2.515,6.402,4.19,0.417,44.404
50%,161.577,5.465,11.631,8.088,1.224,59.66
75%,205.244,11.385,19.418,14.78,3.48,73.892
max,9503.524,139.917,536.673,456.186,431.704,253.486


# Sea salt corrected BCs
Hindcast run, same set-up as `emepHC`, but with SS correction factors

In [18]:
def processEMEP(run, drop=dropEMEP, surf=surfEMEP):
    if not files.get(run, None):
        return
    ds = readRun(run)
    emep = collocate(ds.drop(drop).rename(surf), dlon=1/4, dlat=1/8)
    data = save2nc(emep)
    return data.sel(dataset=run).drop(['lon','lat','alt']).to_dataframe().describe()

%time processEMEP('emepSS')

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 16.7 µs


# (Re)Analysis runs
Assimilate `O3`, `NO2` & `SO2` observations from surface stations and `NO2`  trop. columns from  OMI. Current operational setup (CAMS50.201801; DA16 modules).

In [19]:
%time processEMEP('emepAN')

  """


CPU times: user 13min 13s, sys: 5min 21s, total: 18min 35s
Wall time: 28min 39s


Unnamed: 0,CO,NO2,PM10,PM25,SO2,O3
count,120547770.0,120547770.0,120547770.0,120547770.0,120547770.0,120547770.0
mean,76.378,7.718,9.737,7.778,1.724,18.601
std,143.791,12.123,16.754,12.734,4.42,33.92
min,0.479,0.0,0.401,0.401,0.0,0.0
25%,127.283,5.284,8.309,5.445,0.613,25.436
50%,159.818,10.119,14.988,10.491,1.451,46.913
75%,204.279,18.223,24.88,18.68,3.165,66.72
max,9501.21,194.609,537.954,461.395,331.148,263.503


# Assimilate CO observations
Analysis run, same set-up as `emepAN`, but with addtional `CO` surface observations.  Same source code as operational set-up (CAMS50.201801; DA16 modules),
with minor modification to enhable `CO` assimilation.

In [20]:
%time processEMEP('emepCO')

  """


CPU times: user 13min 11s, sys: 5min 30s, total: 18min 41s
Wall time: 28min 39s


Unnamed: 0,CO,NO2,PM10,PM25,SO2,O3
count,120547770.0,120547770.0,120547770.0,120547770.0,120547770.0,120547770.0
mean,78.067,7.717,9.728,7.774,1.724,18.605
std,146.805,12.122,16.725,12.732,4.419,33.909
min,0.0,0.0,0.401,0.401,0.0,0.0
25%,111.473,5.283,8.317,5.45,0.613,25.433
50%,158.313,10.118,15.007,10.502,1.451,46.905
75%,219.576,18.223,24.897,18.683,3.164,66.701
max,9623.118,194.596,537.838,461.434,330.139,263.601


# Assimilate PM observations
Analysis run, same set-up as `emepAN`, but with addtional `PM2.5` and `PM10` surface observations.  Development version of the assimilation modules (CAMS50.201801; DA17 modules), configured for `PM` assimilation without feerback.

In [21]:
%%time

processEMEP(
    'emepPM',
     drop=dropEMEP+['SURF_ug_PM25_rh50','SURF_ug_PM10_rh50'],
     surf=dict(
        SURF_ug_O3='O3',
        SURF_ug_NO2='NO2',
        SURF_ug_SO2='SO2',
        SURF_ug_CO='CO',
        SURF_ug_PM25_OA='PM25', # no feedback output
        SURF_ug_PM10_OA='PM10', # no feedback output
    ),
)

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 22.4 µs
