In [1]:
import sys
from pathlib import Path
from tqdm.auto import tqdm

import numpy as np
import pandas as pd
import geopandas as gpd
import torch
#from imports import *

sys.path.insert(0, "./DiffHydro")
sys.path.insert(0, "./DiffRoute")

from diffhydro import (TimeSeriesThDF, CatchmentInterpolator, StagedCatchmentInterpolator,
                       RivTree, RivTreeCluster)
from diffhydro.utils import nse_fn
from diffhydro.pipelines import LearnedRouter

from diffroute.io import _read_rapid_graph
from diffroute.graph_utils import define_schedule

In [2]:
root = Path('/data_prediction005/SYSTEM/prediction002/home/tristan/data/geoflow')
root_discharge = root / "retro_parquet"

def load_rapid_graph_with_attributes(root, vpu, plength_thr=None, node_thr=None):

    g = _read_rapid_graph(root / 'data' / 'configs' / vpu)[0]
    df = gpd.read_file(root / "tdxhydro" / f"streams_{vpu}.gpkg").set_index("LINKNO")
    
    params = pd.DataFrame({ 
        "is_lake": df["musk_x"]==.01,
        "dist": df["LengthGeodesicMeters"],
        "upa": np.sqrt(df["DSContArea"])
    }).astype("float32")

    # Standardize
    params[["dist", "upa"]] = (params[["dist", "upa"]] \
                             - params[["dist", "upa"]].mean()) \
                             / params[["dist", "upa"]].std()
    
    if (plength_thr is not None) and (node_thr is not None):
        clusters_g, node_transfer = define_schedule(g, plength_thr=plength_thr, 
                                                    node_thr=node_thr)
        g = RivTreeCluster(clusters_g, 
                           node_transfer,
                           include_index_diag=True,
                           param_df=params)
        for g_ in g: g_.irf_fn = "muskingum"
    else:
        g = RivTree(g,
                    include_index_diag=True,
                    param_df=params)
        g.irf_fn = "muskingum"
    return g

def load_vpu(root, vpu, 
             runoff=None, 
             interp_df=None, 
             device="cpu", 
             plength_thr=10**4, 
             node_thr=10**4):
    if interp_df is None:
        interp_df = pd.read_pickle(root / "data" / "interp_weight.pkl").set_index("river_id")
    if runoff is None:
        runoff = pd.read_feather(root / "data" / "daily_sparse_runoff.feather").loc[:"2019"] / (3600. * 24)

    g = load_rapid_graph_with_attributes(vpu, 
                                         plength_thr=plength_thr, 
                                         node_thr=node_thr).to(device)

    interp_df = interp_df.loc[g.nodes]
    pix_idxs  = interp_df["pixel_idx"].unique()

    runoff = TimeSeriesThDF.from_pandas(runoff[pix_idxs]).to(device)
    ci = CatchmentInterpolator(g, runoff, interp_df).to(device)
    runoff = ci.interpolate_runoff(runoff)
    
    q = TimeSeriesThDF.from_pandas(pd.read_parquet(root_discharge / f"{vpu}.pqt")).to(device)[g.nodes]
    
    return g, q, runoff

In [None]:
class RoutingDataset():
    def __init__(self, runoff, discharge, sample_len):
        self.runoff = runoff
        self.discharge = discharge 
        self.sample_len = sample_len
        self.total_len = self.runoff.shape[0]

    def sample(self):
        """
        """
        pass

### New

In [3]:
plength_thr=10**4
node_thr=10**4

model_name="muskingum"
time_window = max_delay = 30
dt = 1/24
epochs = 500

device = "cuda:0"
tr_vpu = "603"
te_vpu = "602"
n_iter_per_cluster = 100
n_clusters = 20



In [4]:
%time interp_df = pd.read_pickle(root / "data" / "interp_weight.pkl").set_index("river_id")
%time runoff = pd.read_feather(root / "data" / "daily_sparse_runoff.feather").loc[:"2019"] / (3600. * 24)
runoff = TimeSeriesThDF.from_pandas(runoff).to(device)

CPU times: user 159 ms, sys: 792 ms, total: 951 ms
Wall time: 951 ms
CPU times: user 56.7 s, sys: 2min 8s, total: 3min 4s
Wall time: 23.6 s


In [5]:
tr_g, tr_discharge, tr_runoff = load_vpu(tr_vpu, runoff, interp_df, device,
                                         plength_thr=plength_thr, node_thr=node_thr)
te_g, te_discharge, te_runoff = load_vpu(te_vpu, runoff, interp_df, device,
                                         plength_thr=plength_thr, node_thr=node_thr)

#### Upstream stats computations ... ####


Computing breakpoints:   0%|          | 0/55576 [00:00<?, ?it/s]

#### Segmentation into subgraphs ... ####
Removing edges...


  0%|          | 0/55576 [00:00<?, ?it/s]

Segment graph into connected components....
Build subgraphs for each cluster and node-cluster map...


  0%|          | 0/689 [00:00<?, ?it/s]

Establish dependencies between clusters...


  0%|          | 0/625 [00:00<?, ?it/s]

#### Grouping subgraphs to cluster and infering dependencies ... ####
Initialize dependencies...
Associate clusters for remaining subgraphs...


0it [00:00, ?it/s]

Merging graphs...


  0%|          | 0/53 [00:00<?, ?it/s]

Computing merged graphs node idxs...


  0%|          | 0/53 [00:00<?, ?it/s]

Match breakpoint nodes across clusters...


  0%|          | 0/53 [00:00<?, ?it/s]

#### Upstream stats computations ... ####


Computing breakpoints:   0%|          | 0/29581 [00:00<?, ?it/s]

#### Segmentation into subgraphs ... ####
Removing edges...


  0%|          | 0/29581 [00:00<?, ?it/s]

Segment graph into connected components....
Build subgraphs for each cluster and node-cluster map...


  0%|          | 0/403 [00:00<?, ?it/s]

Establish dependencies between clusters...


  0%|          | 0/329 [00:00<?, ?it/s]

#### Grouping subgraphs to cluster and infering dependencies ... ####
Initialize dependencies...
Associate clusters for remaining subgraphs...


0it [00:00, ?it/s]

Merging graphs...


  0%|          | 0/17 [00:00<?, ?it/s]

Computing merged graphs node idxs...


  0%|          | 0/17 [00:00<?, ?it/s]

Match breakpoint nodes across clusters...


  0%|          | 0/17 [00:00<?, ?it/s]

### Small graph, single forward

In [7]:
model = LearnedRouter(max_delay, dt).to(device)
opt = torch.optim.Adam(model.parameters(), lr=.001)

In [8]:
te_y = te_discharge
te_x = te_runoff

tr_y = tr_discharge
tr_x = tr_runoff

In [9]:
pbar = tqdm(range(n_iter_per_cluster), desc="Training")

for _ in pbar:
    out = model(tr_x[:,"1940":"1950"], tr_g)
    
    tr_nse = nse_fn(out.values, tr_y[:,"1940":"1950"].values).mean()
    
    opt.zero_grad()
    loss = 1-tr_nse
    loss.backward()
    opt.step()
    
    out = model(te_x, te_g)
    te_nse = nse_fn(out.values, te_y.values).mean()
    
    pbar.set_postfix({"Tr NSE:": tr_nse.item(), "Te NSE":te_nse.item()})

Training:   0%|          | 0/100 [00:00<?, ?it/s]

OutOfMemoryError: CUDA out of memory. Tried to allocate 6.05 GiB. GPU 0 has a total capacity of 79.15 GiB of which 988.88 MiB is free. Process 4052476 has 702.00 MiB memory in use. Process 4052540 has 700.00 MiB memory in use. Process 4052521 has 700.00 MiB memory in use. Process 4052468 has 702.00 MiB memory in use. Process 4052491 has 702.00 MiB memory in use. Process 4052501 has 702.00 MiB memory in use. Process 4052545 has 700.00 MiB memory in use. Process 4052505 has 702.00 MiB memory in use. Process 4052510 has 700.00 MiB memory in use. Process 4052535 has 702.00 MiB memory in use. Process 4052525 has 702.00 MiB memory in use. Process 4052515 has 702.00 MiB memory in use. Process 4052552 has 700.00 MiB memory in use. Process 4052530 has 700.00 MiB memory in use. Process 4052601 has 702.00 MiB memory in use. Process 4052558 has 702.00 MiB memory in use. Process 4052576 has 702.00 MiB memory in use. Process 4052484 has 702.00 MiB memory in use. Process 4052597 has 702.00 MiB memory in use. Process 4052589 has 702.00 MiB memory in use. Process 749443 has 1.09 GiB memory in use. Including non-PyTorch memory, this process has 63.26 GiB memory in use. Of the allocated memory 59.24 GiB is allocated by PyTorch, and 3.51 GiB is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True to avoid fragmentation.  See documentation for Memory Management  (https://pytorch.org/docs/stable/notes/cuda.html#environment-variables)

### Large one

In [None]:
for cluster_idx in tqdm(range(n_clusters)):

    te_y = te_discharge
    te_x = te_runoff
    
    pbar = tqdm(range(n_iter_per_cluster), desc="Training")

    with torch.no_grad():
        transfer_bucket = model.init_upstream_discharges(tr_runoff, tr_g, cluster_idx)
        te_transfer_bucket = model.init_upstream_discharges(te_runoff, te_g, cluster_idx)
    
    g = tr_g[cluster_idx]
    tr_y = tr_discharge[g.nodes]
    tr_x = tr_runoff[g.nodes]

    for _ in pbar:
        out = model.route_one_cluster(tr_x, tr_g, cluster_idx, transfer_bucket)
        
        tr_nse = nse_fn(out.values, tr_y.values).mean()
        
        opt.zero_grad()
        loss = 1-tr_nse
        loss.backward()
        opt.step()
        
        out = model(te_x, te_g)
        te_nse = nse_fn(out.values, te_y.values).mean()
        
        pbar.set_postfix({"Tr NSE:": tr_nse.item(), "Te NSE":te_nse.item()})