In [None]:
import sys
sys.path.append('../python')
from get_vs import *
from misc import *
from models import *

from mcmc import smc, dist
import random
import subprocess
import pickle
import pandas as pd
from pandas import DataFrame
import numpy as np
import os
from tqdm import tqdm
import xarray as xr
import gcsfs
from dask.distributed import Client

is_pangeo_data = True # True if in Pangeo binder, False if in laptop
if is_pangeo_data:
    from dask_kubernetes import KubeCluster as Cluster
    n_workers = 10
else:
    from dask.distributed import LocalCluster as Cluster
    n_workers = 4

%matplotlib inline
import matplotlib.pyplot as plt

In [None]:
cluster = Cluster(n_workers=n_workers)
client = Client(cluster)
cluster

Wait for cluster to be up and running, then make some modules available on the cluster.

In [None]:
def set_worker_env():
    global mcmc
    import sys, os
    cwd = os.getcwd()
    sys.path.append(cwd + '/python')
    import mcmc
client.run(set_worker_env)

The coordinates of the virtual stations in [Hydroweb](http://hydroweb.theia-land.fr) don't match with the rivers in [HydroSHEDS](http://www.hydrosheds.org). In order to find the corresponding coordinates in HydroSHEDS, we look around the original position for the pixel with the biggest accumulated flow which is bigger than a minimum flow. If no such flow is found, we look further around, until we find one (but not too far away, in which case we just drop the virtual station). The new_lat/new_lon are the coordinates of this pixel, if found.

In [None]:
if not os.path.exists('../data/amazonas/amazonas.pkl'):
    df = locate_vs('../data/amazonas/amazonas.txt', pix_nb=20, acc_min=1_000_000)
    df.to_pickle('../data/amazonas/amazonas.pkl')
else:
    df = pd.read_pickle('../data/amazonas/amazonas.pkl')

In [None]:
sub_latlon = df[['new_lat', 'new_lon']].dropna().values
print(f'Out of {len(df)} virtual stations in Hydroweb, {len(sub_latlon)} could be found in HydroSHEDS.')

The following coordinates are duplicated because some virtual stations fall inside the same pixels.

In [None]:
rm_latlon = [(-4.928333333333334, -62.733333333333334), (-3.8666666666666667, -61.6775)]

In [None]:
df_ll = df[['new_lat', 'new_lon']].dropna()
duplicated = df_ll[df_ll.duplicated(keep=False)]
duplicated

In [None]:
# all the subbasins in the hydrologic partition (including virtual stations)
#gcs_get_dir('pangeo-data/gross/ws_mask/amazonas', 'ws_mask/amazonas', fs)
#gcs_w_token = gcsfs.GCSFileSystem(project='pangeo-data', token='browser')
if is_pangeo_data:
    fs = gcsfs.GCSFileSystem(project='pangeo-data')
    all_labels = [os.path.basename(path[:-1]) for path in fs.ls('pangeo-data/gross/ws_mask/amazonas')]
else:
    all_labels = os.listdir('ws_mask/amazonas')
print('Total number of subbasins:', len(all_labels))

In [None]:
label_pickle_path = '../data/amazonas/labels.pkl'
if not os.path.exists(label_pickle_path):
    labels_without_vs = list(labels)
    labels_with_vs = {}
    if is_pangeo_data:
        gcs_get_dir('pangeo-data/gross/ws_mask', 'ws_mask', fs)
    for label in tqdm(labels):
        ds = xr.open_zarr(f'ws_mask/amazonas/{label}')
        da = ds['mask']
        olat, olon = da.attrs['outlet']
        idx = df_ll[(olat-0.25/1200<df_ll.new_lat.values) & (df_ll.new_lat.values<olat+0.25/1200) & (olon-0.25/1200<df_ll.new_lon.values) & (df_ll.new_lon.values<olon+0.25/1200)].index.values
        if len(idx) > 0:
            labels_without_vs.remove(label)
            labels_with_vs[label] = list(df.iloc[idx].station.values)
    with open(label_pickle_path, 'wb') as f:
        pickle.dump((labels_with_vs, labels_without_vs), f)
else:
    with open(label_pickle_path, 'rb') as f:
        labels_with_vs, labels_without_vs = pickle.load(f)

In [None]:
labels_with_vs_tree = get_label_tree(list(labels_with_vs))

In [None]:
def dist_map(x, y):
    df = DataFrame({'x': x, 'y': y}).dropna()
    x_sorted = np.sort(df.x.values)
    y_sorted = np.sort(df.y.values)
    return np.interp(x, x_sorted, y_sorted)

def get_logp(gr4, warmup, peq, area_up, area_down, down_label):
    def logp(xs):
        q_sim = 0
        ## upstream basins
        #xs_up = {k:v for k, v in xs.items() if k != down_label}
        #for label, x in xs_up.items():
        #    # x includes the delay in the GR model
        #    q_sim += gr4(x).run([peq[f'p{label}'].values, peq[f'e{label}'].values]) * area_up[label]
        # downstream basin
        q_sim += gr4(xs).run([peq[f'p{down_label}'].values, peq[f'e{down_label}'].values]) * area_down
        q_sim /= sum(area_up.values()) + area_down
        # observation is measured water level
        h_obs = peq.h_obs.values
        h_err = peq.h_err.values
        h_sim = np.hstack((np.full(warmup, np.nan), dist_map(q_sim[warmup:], h_obs[warmup:])))
        df = DataFrame({'h_sim': h_sim, 'h_obs': h_obs, 'h_err': h_err})[warmup:].dropna()
        std2 = df.h_err * df.h_err
        # must not have zero error on observation
        min_std2 = np.max(std2) / 100
        std2 = np.clip(std2, min_std2, None)
        lp = np.sum(-np.square(df.h_sim.values - df.h_obs.values) / (2 * std2) - np.log(np.sqrt(2 * np.pi * std2))), q_sim
        return lp
    return logp

In [None]:
os.makedirs('tmp/precipitation', exist_ok=True)
os.makedirs('tmp/pet', exist_ok=True)
os.makedirs('ws_mask/amazonas', exist_ok=True)

d0, d1 = '2000-03-01 12:00:00', '2018-12-31'
x_range = ((0.1, 1e4), (-1, 1), (0.1, 1e3), (0.1, 1e2))
x_scale = [100, 0.1, 10, 1]
#sample_nb = 1_000 # number of samples generated by MCMC
#burnin = sample_nb // 10 # number of burnin samples
draws = 100
warmup = 12 * 30 * 24 * 2 # one year in 30min steps

q_ensemble = {}
for down_label in labels_with_vs_tree:
    # get whole basin's labels
    whole_labels = startswith_label(down_label, all_labels)
    # copy basin's masks locally
    for label in whole_labels:
        if not os.path.exists(f'ws_mask/amazonas/{label}'):
            fs = gcsfs.GCSFileSystem(project='pangeo-data')
            gcs_get_dir(f'pangeo-data/gross/ws_mask/amazonas/{label}', f'ws_mask/amazonas/{label}', fs)
    # get upstream basins' labels
    # also compute upstream basins' areas
    up_labels, area_up = {}, {}
    for label in labels_with_vs_tree[down_label]['up']:
        area_up[label] = 0
        up_labels[label] = startswith_label(label, all_labels)
        for label2 in up_labels[label]:
            area_up[label] += xr.open_zarr(f'ws_mask/amazonas/{label2}', auto_chunk=False)['mask'].attrs['area']
    # get downstream bassin's labels and compute its area
    down_labels = whole_labels
    for labels in up_labels.values():
        down_labels = subtract_label(labels, down_labels)
    area_down = 0
    for label in down_labels:
        area_down += xr.open_zarr(f'ws_mask/amazonas/{label}', auto_chunk=False)['mask'].attrs['area']
    area_whole = sum(area_up.values()) + area_down

    # get whole basin's mask
    da = get_mask('ws_mask/amazonas', whole_labels)
    subprocess.check_call('rm -rf tmp/mask'.split())
    da.to_dataset(name='mask').to_zarr('tmp/mask')
    # get basin's water level time series at virtual station (basin's outlet)
    he = get_waterlevel(d0, d1, labels_with_vs[down_label][0]) # there might be several stations
    dh0 = he.dropna().index[0] # first date of observation
    dh1 = he.dropna().index[-1] # last date of observation
    # start date which allows to warmup the model, but not more (warmup is in 30min)
    dh0 = max(str2datetime(d0), dh0 - timedelta(minutes=warmup*30))
    # get whole basin's precipitation and PET time series
    if os.path.exists(f'tmp/precipitation/{down_label}.pkl'):
        p = pd.read_pickle(f'tmp/precipitation/{down_label}.pkl')
        e = pd.read_pickle(f'tmp/pet/{down_label}.pkl')
    else:
        print(f'Getting precipitation and PET for {down_label}')
        p = get_precipitation(d0, d1, 'tmp/mask')
        p.to_pickle(f'tmp/precipitation/{down_label}.pkl')
        e = get_pet(d0, d1, 'tmp/mask')
        e.to_pickle(f'tmp/pet/{down_label}.pkl')
    p_up, e_up = {}, {}
    # precipitation and PET of upstream bassins already exist from previous iteration
    for label in labels_with_vs_tree[down_label]['up']:
        p_up[label] = pd.read_pickle(f'tmp/precipitation/{label}.pkl')
        e_up[label] = pd.read_pickle(f'tmp/pet/{label}.pkl')
    # compute precipitation and PET of downstream bassin
    p_down = p * area_whole
    e_down = e * area_whole
    for label in labels_with_vs_tree[label]['up']:
        p_down -= p_up[label] * area_up[label]
        e_down -= e_up[label] * area_up[label]
    p_down /= area_whole
    e_down /= area_whole

    # create a DataFrame for whole basin's precipitation, PET and water level
    peq = DataFrame()
    peq[f'p{down_label}'] = p_down.loc[dh0:dh1]
    peq[f'e{down_label}'] = e_down.loc[dh0:dh1]
    peq[f'h_obs'] = he.h.loc[dh0:dh1]
    peq[f'h_err'] = he.e.loc[dh0:dh1]

    ## initial model parameter values with no prior information
    #x_start = [[random.uniform(*rng) for rng in x_range] for _ in range(n_workers)]
    #d_start = [random.uniform(0, 100) for _ in range(n_workers)]

    if not up_labels:
        # this is a source basin (no basin flowing into it)
        area_up = {}
        area_down = 1
        # prior probability distribution is uniform for basin
        x_pdf = [dist.uniform_pdf(*r) for r in x_range]
        def prior_logp(values):
            logp = sum([dist.logp_from_pdf(pdf, v) for pdf, v in zip(x_pdf, values)])
            return logp
        model_logp = get_logp(gr4hh, warmup, peq, area_up, area_down, down_label)
    #else:
    #    # there are basins flowing into this basin
    #    x0 = []
    #    for _ in range(n_workers):
    #        # initial model parameter values are the most likely for head basins
    #        _x0 = [sample(xyz[2]) for xyz in x_head for x_head in x_heads] # GR parameters
    #        _x0 += [random.uniform(0, 100) for _ in x_heads] # delay (TODO: based on river length)
    #        # initial model parameter values with no prior information for tail basin
    #        _x0 += [random.uniform(*rng) for rng in x_range]
    #        x0.append(_x0)
    #    # prior probability distribution is uniform for tail basin
    #    lnprob_prior = [lnprob_from_density(p, *r) for p, r in zip(x_head, x_range) for x_head in x_heads]
    #    lnprob_prior += [lnprob_from_density(uniform_density(*d_range), *d_range) for _ in x_heads]
    #    x_tail = [uniform_density(*r) for r in x_range]
    #    lnprob_prior += [lnprob_from_density(p, *r) for p, r in zip(x_tail, x_range)]
    # run SMC
    posterior, q_sims = smc.smc(x_pdf, model_logp, prior_logp, draws=draws, dask_client=client)
    plt.figure(figsize=(20, 5))
    for q_sim in q_sims:
        plt.plot(dist_map(q_sim[warmup:], peq.h_obs.values[warmup:]), alpha=0.005, color='blue')
    plt.scatter(range(len(peq)-warmup), peq.h_obs.values[warmup:])
    plt.show()
    #lnprob = mcmc.get_lnprob(gr4hh, warmup, peq, lnprob_prior, area_up, area_down)
    ## run MCMC
    #if True:#is_pangeo_data:
    #    futures = [client.submit(mcmc.Sampler, x0[i], lnprob, scale=x_scale, actor=True) for i in range(n_workers)]
    #    samplers = [future.result() for future in futures]
    #    futures = [sampler.run(sample_nb, burnin) for sampler in samplers]
    #    results = [future.result() for future in futures]
    #    for i, result in enumerate(results):
    #        if i == 0:
    #            samples, q_sim = result
    #        else:
    #            samples = np.vstack((samples, result[0]))
    #            q_sim += result[1]
    #else:
    #    sampler = mcmc.Sampler(x0, lnprob, scale=x_scale)
    #    samples, q_sim = sampler.run(sample_nb, burnin)
    #sys.exit()
    # get simulated streamflow and uncertainty
    #q_sim = np.array(q_sim)
    #q_ensemble[f'f{label}'] = q_sim
    # plot updated streamflow
    #plot_series(ensemble=q_sim[:, -days:], true=df[f'q_true_{ws_i}'].values[-days:], title=f'Streamflow at the outlet of $B_{ws_i}$')
    #if up_labels:
    #    # reduce model
    #    # get P and E over the whole basin
    #    peq = df[['p', 'e']]
    #    x_prior = [uniform_density(*r) for r in x_range]
    #    lnprob_prior = [lnprob_from_density(p, *r) for p, r in zip(x_prior, x_range)]
    #    q_kde = np.empty((2, n_kde, q_sim.shape[1]))
    #    for i in range(q_kde.shape[2]):
    #        q_kde[:, :, i] = get_kde(q_sim[:, i], nb=n_kde)
    #    lnprob = get_lnprob(peq, lnprob_prior, 1, 0, q_kde)
    #    x0 = x_start
    #    sampler = mcmc.Sampler(x0, lnprob)
    #    samples, q_sim = sampler.run(sample_nb, burnin)
    #x_head = [get_kde(samples[:, i]) for i in range(4)]