In [None]:
%pylab inline
%run ../lib/preprocess_fluxnet.py
%run ../lib/classify_soil.py

import os
import time
import intake
import pandas as pd
from dask.distributed import Client
import seaborn as sns
from numba import jit
import matplotlib as mpl
import xarray as xr
import warnings
from glob import glob
from tqdm.notebook import tqdm
warnings.filterwarnings('ignore')
from joblib import Parallel, delayed

user = os.environ['USER']
os.environ['OMP_NUM_THREADS'] = '1'
contact = 'andrbenn@uw.edu'
NPROCS=4

In [None]:
template_ds = xr.open_dataset('../summa_setup_template_3/forcings/test.nc')
for site in tqdm(all_sites):
    with xr.open_dataset(f'../../fluxnet/netcdf_processed/{site}.nc') as test_ds:
        ds = test_ds.load()
    ds['data_step'] = template_ds['data_step']
    ds['data_step'].values = 1800.
    ds['hruId'] = template_ds['hruId']
    ds['time'].encoding['dtype'] = dtype('float64')
    ds['time'].encoding['units'] = 'hours since 1990-01-01'
    ds.to_netcdf(f'../../fluxnet/netcdf_processed/{site}.nc')

In [None]:
client = Client(n_workers=NPROCS, threads_per_worker=1)
client

# Load in data and merge together

In [None]:
cat = intake.Catalog('../fluxnet/fluxnet_data/catalog.yml')
era_cat = intake.Catalog('../fluxnet/erai_data/catalog.yml')
all_site_meta = pd.read_excel("../fluxnet/fluxnet_data/FLX_AA-Flx_BIF_LATEST.xlsx").set_index(
    ["SITE_ID", "VARIABLE"]
)["DATAVALUE"]

# Sites selected for study
all_sites =  ['BE-Vie', 'RU-Fyo', 'CA-Qfo', 'BE-Lon', 'US-Prr', 'NL-Hor',
              'IT-MBo', 'IT-Tor', 'IT-SRo', 'AU-Cpr', 'AT-Neu', 'ES-LJu',
              'US-NR1', 'US-Var', 'US-Los', 'FI-Hyy', 'CA-TP3', 'DE-Hai',
              'DE-Gri', 'FI-Let', 'CZ-wet', 'DK-Eng', 'DE-Tha', 'US-Whs',
              'CA-TPD', 'IT-Lav', 'FR-LBr', 'US-KS2', 'US-Goo', 'US-WCr',
              'US-IB2', 'CA-Gro', 'IT-Noe', 'US-Blo', 'AU-Wac', 'AU-Wom',
              'CH-Cha', 'AU-ASM', 'DE-Kli', 'US-Ton', 'FI-Sod', 'CA-TP1',
              'DE-Obe', 'US-CRT', 'AU-DaS', 'IT-Cpz', 'US-Syv', 'IT-Ro2',
              'FR-Pue', 'DE-Geb', 'US-AR2', 'AU-How', 'US-GLE', 'AU-Stp',
              'IT-Ren', 'ES-Amo', 'CH-Fru', 'FI-Jok', 'CN-HaM', 'US-ARM']

# Filter out sites missing half hourly data
for site in all_sites:
    if not len(glob(f'../../fluxnet/fluxnet_data/FLX_{site}_FLUXNET2015_FULLSET_HH_*.csv')):
        all_sites.remove(site)

all_site_meta = all_site_meta.loc[all_sites]

In [None]:
df = load_fluxnet(cat, all_site_meta)
all_sites = list(df.index.get_level_values(0).unique())
era_df = load_era(era_cat, all_site_meta)
era_sites = list(era_df.index.get_level_values(0).unique())
all_sites = list(set(all_sites).intersection(set(era_sites)))
merged = {}
for s in tqdm(all_sites):
    merged[s] = merge_fluxnet_era(df.loc[s], era_df.loc[s])

In [None]:
ebc_filter = ~df['LE_CORR'].isna()
ml_df = df[ebc_filter]
ml_df.to_csv('../data/ml_summa_all_training.csv')

# Filter data for SUMMA runs

In [None]:
%%time
out = get_longest_sequence(df, all_sites, n_workers=NPROCS, min_length=3*17520, good_frac=0.85)
selected_sites = np.unique(out.index.get_level_values(0))
test_dfs = [out.loc[selected_sites[i]] for i in range(len(selected_sites))]

In [None]:
def write_forcings(site, test_df):
    template_ds = xr.open_dataset('../../fluxnet/summa_setup_template_3/forcings/test.nc')
    raw_df = test_df
    filled_df = gap_fill(raw_df)
    raw_ds = to_summa_ds(filled_df)
    
    file_attrs = {}
    file_attrs['Site name'] = site
    file_attrs['Contact'] = contact
    file_attrs['Production time'] = time.ctime()
    
    attrs_ds = populate_metadata(raw_ds, file_attrs)
    attrs_ds['data_step'] = template_ds['data_step']
    attrs_ds['data_step'].values = 1800.
    attrs_ds['hruId'] = template_ds['hruId']
    attrs_ds['time'].encoding['dtype'] = dtype('float64')
    attrs_ds['time'].encoding['units'] = 'minutes since 2000-01-01'
    
    attrs_ds.to_netcdf(f'../netcdf_processed/{site}.nc')

In [None]:
Parallel(n_jobs=NPROCS)(delayed(write_forcings)(s, test_dfs[i]) for i, s in enumerate(selected_sites));

In [None]:
template_ds = xr.open_dataset('../summa_setup_template_3/forcings/test.nc')
for site in tqdm(all_sites):
    with xr.open_dataset(f'../../fluxnet/netcdf_processed/{site}.nc') as test_ds:
        ds = test_ds.load()
    ds['data_step'] = template_ds['data_step']
    ds['data_step'].values = 1800.
    ds['hruId'] = template_ds['hruId']
    ds['time'].encoding['dtype'] = dtype('float64')
    ds['time'].encoding['units'] = 'hours since 1990-01-01'
    ds.to_netcdf(f'../../fluxnet/netcdf_processed/{site}.nc')

In [None]:
for site in tqdm(all_sites):
    test_ds = xr.open_dataset(f'../../fluxnet/netcdf_processed/{site}.nc')
    start = pd.to_datetime(test_ds.time.values[0]).strftime('%Y-%m-%d %H:%M')
    finsh = pd.to_datetime(test_ds.time.values[-1]).strftime('%Y-%m-%d %H:%M')

    script = f"""
    mkdir -p ../sites
    cp -r ../summa_setup_template_3 ../sites/{site}
    
    # Move forcing files
    cp -r ../../fluxnet/netcdf_processed/{site}.nc ../sites/{site}/forcings/
    sed -i "s|test.nc|{site}.nc|g" ../sites/{site}/forcings/forcing_file_list.txt
    
    # Move attribute files
    cp -r ../../fluxnet/local_attrs_processed/{site}_local_attrs.nc ../sites/{site}/params/local_attributes.nc
    cp -r ../../fluxnet/trial_params_processed/{site}_trial_params.nc ../sites/{site}/params/parameter_trial.nc
    
    
    # Set output filename template and replace filename
    cd ../sites/{site}
    # Set start and finish times
    sed -i "s|simStartTime.*|simStartTime    '{start}'   ! simulation start time|g" ./template_file_manager.txt
    sed -i "s|simEndTime.*|simEndTime    '{finsh}'   ! simulation end time|g" ./template_file_manager.txt
    ./install_local_setup.sh
    cd - 
    """
    retval = os.system(script)
    assert retval == 0, site