In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import pandas as pd
import numpy as np
import xarray as xr
import intake

import config
import oco2mip_ff
import util

In [3]:
use_dask = False

if use_dask:
    cluster, client = util.get_ClusterClient(memory='64GB')
    cluster.scale(12) #adapt(minimum_jobs=2, maximum_jobs=5)
    cluster

ValueError: 'ext' is not a valid network interface. Valid network interfaces are: ['lo', 'bond0', 'hsn0', 'enp65s0']

In [None]:
version = 'v2020.1'
yr1, yr2 = 2000, 2020

date_range = pd.date_range(start=f'{yr1:04d}-01-01', end=f'{yr2:04d}-12-31', freq='1M')
date_range

In [None]:
if use_dask:
    ds_list_list = client.map(oco2mip_ff.retrieve_datasets, date_range, version=version)
    ds_list_list = client.gather(ds_list_list)
else:
    ds_list_list = []
    for date in date_range:
        ds_list_list.append(oco2mip_ff.retrieve_datasets(date, version=version))
        
ds_list = [ds for ds_list in ds_list_list for ds in ds_list]

ds = xr.concat([ds[['SFCO2_FF', 'time_bnds']] for ds in ds_list], dim='time', compat='override', coords='minimal').squeeze()
ds['area'] = ds_list[0].area
for v in ds.variables:
    ds[v].encoding = ds_list[0][v].encoding

ds

In [None]:
with xr.set_options(keep_attrs=True):
    sfco2_global = ds.SFCO2_FF.weighted(ds.area).sum(['lat', 'lon'])
    sfco2_global *= 12.01e-15 * 86400.0 * 365.0
sfco2_global.attrs['units'] = 'Pg C/yr'
sfco2_global['time'] = [np.datetime64(f'{d.year}-{d.month:02d}-{d.day:02d}') for d in sfco2_global.time.values]
sfco2_global.plot()

In [None]:
%%time
file_out = f"{config.flux_product_dir}/SFCO2_FF.OCO2-MIP-FF.{version}.{yr1:04d}0101-{yr2:04d}1231.nc"
ds.attrs['description'] = f'daily integral of Fossil Fuel CO2 Emissions for the OCO2 Model Intercomparison Project (MIP)'
ds.attrs['source'] = 'https://zenodo.org/record/4776925'
ds.attrs['authors'] = 'Basu, Sourish;  Nassar, Ray'
ds.attrs['note'] = 'daily integration and compilation by Matt Long (NCAR)'
util.to_netcdf_clean(ds, file_out, format='NETCDF4')

In [None]:
curator = util.curate_flux_products()
curator.add_source(
        key=f"SFCO2_FF.OCO2-MIP.{version}",
        urlpath=file_out,
        description='daily integral of Fossil Fuel CO₂ Emissions for the OCO2 Model Intercomparison Project (MIP)',
    )

In [None]:
if use_dask:
    del client
    del cluster