In [1]:
# Imports
import json
import numpy as np
import holoviews as hv
import panel as pn
import pandas as pd
import geopandas as gpd
import geoviews as gv
from geoviews import feature as gf
from shapely.geometry import Polygon
import cartopy.crs as crs  # Has to be imported after geoviews. Why? God only knows.

import multiprocessing as mp
from io import StringIO
from dask.diagnostics.progress import ProgressBar, format_time, default_timer
import inspect
import xarray as xr
import xclim as xc
from xclim import subset as xcsub
from xclim import ensembles as xcens
import threddsclient as tds

import tempfile
from pathlib import Path

gv.extension('bokeh')
#hv.extension('bokeh')
pn.extension()

In [2]:
# Initial state and constants
state = {'region': None}
crs_merc_pp = crs.Mercator().proj4_params

season_label = {1: 'Annuel', 3: 'Printemps', 4: 'SaisonChaude',
                6: 'Ete', 7: 'Annuel', 9: 'Automne', 10: 'SaisonFroide', 12: 'Hiver'}

def load_regions(filename):
    regions = {}
    regions['hidef'] = filename
    with filename.open('r', encoding='utf-8') as f:
        d = json.load(f)
    regions.update(name_fr=d['name_fr'], name_en=d['name_en'], identifier=d['identifier'])
    if (filename.parent / (filename.stem + '_simp.json')).is_file():
        regions['lowdef'] = filename.parent / (filename.stem + '_simp.json')
    else:
        regions['lowdef'] = regions['hidef']
    return regions


def load_extracted(folder):
    region = gpd.read_file(folder / 'region_bounds.geojson')
    return region


predef_regions = [load_regions(file)
                  for file in Path('nc_data/regions/').iterdir()
                  if file.suffix == '.json' and '_simp' not in file.stem]

In [3]:
# Task/Dask callback and progress.
class Progress(ProgressBar):
    """Object to control a progress panel row with a title, progress bar and elapsed time"""
    def __init__(self, bar):
        """bar is a panel.Row with Markdown, panel.widgets.Progress and Markdown"""
        self.pb = bar[1]
        self.bar = bar
        super().__init__(out=StringIO(), dt=2)
    
    def _finish(self, dsk, state, errored):
        self.finish(errored)
        super()._finish(dsk, state, errored)
    
    def _draw_bar(self, frac, elapsed):
        """Change the bar's value and the elapsed time string"""
        self.pb.value = int(frac * 100)
        self.bar[2] = format_time(elapsed)
    
    def new_task(self, name):
        """Not needed for dask tasks, initialize the progress bar with a new title."""
        self.pb.bar_color = 'primary'
        self.pb.value = None
        self.pb.active = True
        self.bar[2]  = ""
        self.bar[0] = f'*{name}*'
        self._start_time = default_timer()

    def update(self, frac):
        """When not using dask tasks, update the bar to a given frac (0..1)"""
        elapsed = default_timer() - self._start_time
        self._draw_bar(frac, elapsed)
    
    def finish(self, errored=False):
        """Useless with dask's task, mark the task as finished."""
        self.update(1)
        if errored:
            self.pb.bar_color = 'danger'
            self.bar[0] = f'*Last task exited with errors*'
        else:
            self.pb.bar_colors = 'success'
        self.pb.active = False

w_progbar = pn.widgets.Progress(name='Tasks Progress', active=False, width=300, bar_color='primary')
pbar = Progress(pn.Row('*No tasks to compute.*', w_progbar, ''))
pbar.register() #<-- This makes Firefox slow down a lot. Any use with dask does that...

class Loading(object):
    def __enter__(self):
        self.old_state = pbar.pb.active
        pbar.pb.active = True
    def __exit__(self, *args, **kwargs):
        pbar.pb.active = self.old_state

In [11]:
# Data management
data_config = {
    'baseurl_scenarios': 'https://pavics.ouranos.ca/twitcher/ows/proxy/thredds/catalog/birdhouse/ouranos/cb-oura-1.0/catalog.html',
    'url_obs': 'https://pavics.ouranos.ca/twitcher/ows/proxy/thredds/dodsC/birdhouse/nrcan/nrcan_canada_daily/nrcan_canada_daily-allvars-agg.ncml',
    'data_folder': None,
    'datasets': None
}


def get_folder_name(name):
    if name is None:
        n = len(list(Path('nc_data').glob('tmp*')))
        return Path(tempfile.mktemp(prefix='tmp', dir='nc_data')).stem, n
    else:
        if name.startswith('user'):
            n = len(list(Path('nc_data').glob(name + '*')))
            return Path(tempfile.mktemp(prefix=name, dir='nc_data')).stem, n
        else:
            return name, None

def _parse_data_url(url):
    return url.split('/')[11:13] + [url]

def get_all_datasets():
    if data_config['datasets'] is None:
        pbar.new_task('Crawling the server to find all datasets.')
        datasets = []
        i = 0
        for ds in tds.crawl(data_config['baseurl_scenarios'], depth=10):
            if '.ncml' in ds.name:
                datasets.append(_parse_data_url(ds.opendap_url()))
                i += 1
                pbar.update(i / 23)

        datasets.append(('NRCAN', 'obs', data_config['url_obs']))
        pbar.finish()
        data_config['datasets'] = datasets
    return data_config['datasets']

def extract_region(region):
    datasets = get_all_datasets()
    folder = Path('nc_data') / region.id[0]
    if not folder.is_dir():
        folder.mkdir()
    region_mask = None
    
    manager = mp.Manager()
    queue = manager.Queue()
    pool = mp.Pool(8)
    
    def extract(model, rcp, url, folder, region):
        filepath = folder / f'sub_{model}_{rcp}.nc'
        if filepath.is_file():
            return filepath
        ds = xr.open_dataset(url, drop_variables=['ts', 'time_vectors'],
                             chunks={'time': (360 if 'HadGEM2' in url else 365),
                                     'lat': 56, 'lon': 50})
        mask = xcsub.create_mask(x_dim=ds.lon, y_dim=ds.lat, poly=region)
        ds.where(mask.notnull(), drop=True).to_netcdf(filepath, compute=True)
        return filepath

    jobs = []
    for model, rcp, url in datasets:
        job = pool.apply_async(extract, (model, rcp, url, folder, region))
        jobs.append(job)
    
    for i, job in enumerate(jobs):
        filepth = job.get()
        print(filepth)
        
    region.to_file(folder / 'region_bounds.geojson', driver='GeoJSON')

def compute_indicator(ind, params):
    invars = []
    for name, param in ind._sig.parameters.items():
        if param.default is inspect._empty:
            invars.append(name)

    for reg in list_available().keys():
        pbar.new_task(f'Computing indicator {ind.title} on region {reg}')
        obs = xr.open_dataset(Path('nc_data') / reg / 'sub_NRCAN_obs.nc', chunks={})
        rcp45 = xcens.create_ensemble(list((Path('nc_data') / reg).glob('sub_*_rcp45.nc')), chunks={})
        rcp85 = xcens.create_ensemble(list((Path('nc_data') / reg).glob('sub_*_rcp85.nc')), chunks={})
        
        dobs = unstack_yr_season(ind(*[obs[vr] for vr in invars], **params))
        drcp45 = unstack_yr_season(ind(*[rcp45[vr] for vr in invars], **params))
        drcp85 = unstack_yr_season(ind(*[rcp85[vr] for vr in invars], **params))
        
        dsim = xr.concat((drcp45, drcp85), xr.DataArray(['rcp45', 'rcp85'], dims=('rcp',), name='rcp'))
        dsim.name = ind.var_name
        dsim_perc = xcens.ensemble_percentiles(dsim.to_dataset())
        
        dobs.to_netcdf(Path('nc_data') / reg / f'ind_{dobs.name}_obs.nc')
        dsim_perc.to_netcdf(Path('nc_data') / reg / f'ind_{dsim.name}_sim.nc')
        
def unstack_yr_season(ds):
    ds['time'] = pd.MultiIndex.from_arrays([ds.time.dt.year.values + (ds.time.dt.month.values == 12).astype(int),
                                            [season_label[m] for m in ds.time.dt.month.values]],
                                           names=['year', 'season'])
    return ds.unstack('time').rename(year='time')
    
def list_available():
    avail = {}
    for folder in Path('nc_data').iterdir():
        if folder.name == 'regions' or not folder.is_dir():
            continue
        if ((folder / 'region_bounds.geojson').is_file()
            and len(list(folder.glob('sub_*.nc'))) == 21):
            avail[folder.name] = {}
        for indfile in folder.glob('ind_*_sim.nc'):
            if (folder / indfile.name.replace('sim', 'obs')).is_file():
                ind = xr.open_dataset(indfile, chunks={})
                avail[folder.name][indfile.name] = list(ind.data_vars.values())[0].attrs['long_name']
    return avail


# Panel making and callbacks

In [12]:
# Region selection and data extraction.

reg_list = {'-- Select an option -- ' : 'none'}
reg_list.update({r['name_en']: [r] for r in predef_regions})
reg_list.update({'-- Import your own geojson file -- ': 'import', 
                 '-- Draw your own region --' : 'polydraw'})

w_regsel = pn.widgets.Select(name='Region',
                             options=reg_list)

w_dataexbutt = pn.widgets.Button(name='Extract data', button_type='warning')
C_dataextract = pn.Column(w_dataexbutt, '')

w_addregbutt = pn.widgets.Button(name='Add region', button_type='primary')
w_cancelbutt = pn.widgets.Button(name='Cancel', button_type='danger')

p = Polygon([(-89.04521, 40.04104), (-89.04521, 66.62331), (-54.46326, 66.62331), (-54.46326, 40.04104)])
gbase = gpd.GeoDataFrame({'id': [0], 'name': ['Scenario Generique v1 bounds']}, geometry=[p], crs='epsg:4326')
M_lims = gv.Polygons(gbase).opts(gv.opts.Polygons(fill_alpha=0))
base_proj = crs.Sinusoidal(central_longitude=-70)
lims = base_proj.transform_points(crs.PlateCarree(), np.array([-95, -30]), np.array([38, 68]))
M_base = (gf.land * gf.lakes * gf.ocean * gf.borders * gf.rivers)
M_tiles = gv.tile_sources.CartoEco

w_reg_avail = pn.widgets.MultiSelect(name='Available regions', value=[],
                                     options=list(list_available().keys()),
                                     size=8)
## (Add / title), choice dropdown, (SubmitButton | fileInput | Nothing | data extraction | title2), (cancel | list of available regions)
C_regopts = pn.Column(w_addregbutt, '', '## Already extracted regions', w_reg_avail)
P_reg = pn.Column(pbar.bar, pn.Row(
    C_regopts,
    (M_lims * M_base).opts(
        projection=base_proj, xlim=tuple(lims[:, 0]), ylim=tuple(lims[:, 1]), width=500, height=500 
    )
))

C_loading = pn.pane.GIF('https://upload.wikimedia.org/wikipedia/commons/b/b1/Loading_icon.gif')


def update_map(*regs, title=''):
    polys = []
    for reg in regs:
        if isinstance(reg, str):
            polys.append(gpd.read_file(Path('nc_data') / reg / 'region_bounds.geojson'))
        else:
            polys.append(reg)
    poly = pd.concat(polys)
    bnds = poly.to_crs(crs_merc_pp).total_bounds
    map_bounds = dict(xlim=(bnds[0], bnds[2]), ylim=(bnds[1], bnds[3]))
    C_map = pn.Column(
        title, # '### You selected this region',
        (M_tiles * gv.Polygons(poly, vname='name').opts(
            alpha=0.25, fill_color=None, line_width=2, width=500, height=500
        ))
    )
    P_reg[1][1] = C_map

def handle_addreg(event):
    P_reg[1][1] = (M_lims * M_base).opts(
        projection=base_proj, xlim=tuple(lims[:, 0]), ylim=tuple(lims[:, 1]), width=500, height=500 
    )
    C_regopts[0] = '## Select a region'
    C_regopts[1] = w_regsel
    C_regopts[2] = ''
    C_regopts[3] = w_cancelbutt

w_addregbutt.on_click(handle_addreg)

def handle_cancel(event):
    P_reg[1][1] = (M_lims * M_base).opts(
        projection=base_proj, xlim=tuple(lims[:, 0]), ylim=tuple(lims[:, 1]), width=500, height=500 
    )
    C_regopts[0] = w_addregbutt
    C_regopts[1] = ''
    C_regopts[2] = '## Already extracted regions'
    C_regopts[3] = w_reg_avail
    state['region'] = None

w_cancelbutt.on_click(handle_cancel)

def handle_regchange(event):
    P_reg[1][1] = C_loading
    def on_new_poly(reg):
        state['region'] = reg.to_crs(epsg=4326)
        area = reg.to_crs({'proj': 'cea'}).area[0] / 1e6
        update_map(reg, title='### You selected this region')
        C_regopts[2] = w_dataexbutt

    if event.obj.value == 'polydraw':
        
        poly = hv.Polygons([])
        poly_stream = hv.streams.PolyDraw(source=poly, drag=True, num_objects=1,
                                          show_vertices=True, styles={
                                          'fill_color': ['green']
                                          })

        M_regsel = (M_base * M_lims) * poly.opts(
            hv.opts.Polygons(fill_alpha=0.3, active_tools=['poly_draw'])
        ).opts(width=500, height=500)
        C_regsel = pn.Column('## Select a region on the map.\n'
                             'Double-click to start/stop drawing, click to add points.'
                             'The black square shows the base data\'s limits',
                             M_regsel)
        
        def on_submit_poly(event):
            raw_poly = [Polygon([(x, y) for x, y in zip(xs, ys)])
                        for xs, ys in zip(poly_stream.data['xs'], poly_stream.data['ys'])]
            poly = gpd.GeoDataFrame(geometry=raw_poly,
                                    crs=crs_merc_pp)
            folder_name, N = get_folder_name('userdrawn')
            poly = gpd.GeoDataFrame(
                {'id': [folder_name], 'name': [f'User drawn (#{N})']},
                geometry=[poly.unary_union],
                crs=poly.crs
            )
            
            on_new_poly(poly)

        w_regsubmit = pn.widgets.Button(name='Submit region', button_type='primary')
        w_regsubmit.on_click(on_submit_poly)
        C_regopts[2] = w_regsubmit
        P_reg[1][1] = C_regsel

    elif event.obj.value == 'import':
        def on_submit_file(event):
            event.obj.save('user_region.dat')
            poly = gpd.read_file('user_region.dat')
            folder_name, N = get_folder_name('userimported')
            poly = gpd.GeoDataFrame(
                {'id': [folder_name], 'name': [f'Imported region #{N}']},
                geometry=[poly.unary_union],
                crs=poly.crs
            )
            on_new_poly(poly)

        w_file = pn.widgets.FileInput(
            name='Choose a file containing one region (any fiona-supported files).'
        )
        w_file.param.watch(on_submit_file, 'value')
        C_regopts[2] = w_file
        P_reg[1][1] = M_lims * M_base 
    elif event.obj.value == 'none':
        pass
    else:
        regs = event.obj.value[0]
        df_low = gpd.read_file(regs['lowdef'])
        polys = gv.Polygons(df_low, vdims=['name'])
        sel = hv.streams.Selection1D(source=polys)
        M_regsel = M_base * polys.opts(
            tools=['hover', 'tap'], color_index=None, color='skyblue',
            selection_color='steelblue', selection_line_color='k',
            nonselection_color='skyblue', nonselection_alpha=1, nonselection_line_color='k',
            hover_color='deepskyblue', hover_line_color='k'
        ).opts(width=500, height=500)
        C_regsel = pn.Column('### Select a region on the map.\n',
                             'Click to select, hold shift to select multiple regions.',
                             M_regsel)

        def on_submit_selection(event):
            P_reg[1][1] = C_loading
            df_high = gpd.read_file(regs['hidef'])
            poly = gpd.GeoDataFrame(
                {'id': [regs['identifier'] + '_' + '_'.join(map(str, sel.index))],
                 'name': ['MRC ' + ','.join(df_high.iloc[sel.index].name)]},
                geometry=[df_high.iloc[sel.index].unary_union],
                crs=df_high.crs
            )
            on_new_poly(poly)
        
        w_regsubmit = pn.widgets.Button(name='Submit selection', button_type='primary')
        w_regsubmit.on_click(on_submit_selection)
        C_regopts[2] = w_regsubmit
        P_reg[1][1] = C_regsel

w_regsel.param.watch(handle_regchange, 'value')


def handle_availsel(event):
    update_map(*event.obj.value, title='### Showing selected extracted regions')
    
w_reg_avail.param.watch(handle_availsel, 'value')

def handle_extract_data(event):
    extract_region(state['region'])
    w_reg_avail.options = list(list_available().keys())
    handle_cancel(None)

w_dataexbutt.on_click(handle_extract_data)

In [13]:
# Helpers for layout
def get_indicator_str(indicator=None, **opts):
    opts_str = ', '.join([f'{k}: {_parse(v)}' for k, v in opts.items()])
    return f'{indicator}({opts_str})'

In [14]:
# Indicator options
ind_list = {name : ind
            for name, ind in xc.atmos.__dict__.items()
            if isinstance(ind, xc.core.indicator.Indicator)}
w_ind_sel = pn.widgets.Select(name='Indicator',
                              options=ind_list)

w_ind_compute = pn.widgets.Button(name='Compute indicator', button_type='warning')

w_ind_cancel = pn.widgets.Button(name='Cancel', button_type='danger')

w_ind_add = pn.widgets.Button(name='Add indicator', button_type='primary')

## (add | title), dropdown, indicator desc, opts column, ComputeButton, cancel
C_indopts = pn.Column(w_ind_add, '', '', '', '', '')

L_indicator = pn.Column('', '')

# List of all indicators and regions
C_list = pn.Column(pn.pane.Markdown('## Available regions and indicators.'), '')

def update_ind_avail():
    C_list[1] = '\n'.join([f' - {reg} : {ind}' for reg, inds in list_available().items() for ind in inds.values()])
update_ind_avail()

def handle_ind_add(event):
    C_indopts[0] = '## Select an indicator'
    C_indopts[1] = w_ind_sel
    C_indopts[5] = w_ind_cancel

w_ind_add.on_click(handle_ind_add)

def handle_ind_cancel(event):
    C_indopts[0] = w_ind_add
    C_indopts[1] = ''
    C_indopts[2] = ''
    C_indopts[3] = ''
    C_indopts[4] = ''
    C_indopts[5] = ''

w_ind_cancel.on_click(handle_ind_cancel)

def handle_indicator_change(event):
    ind = event.obj.value
    
    opts = []
    for param in ind._sig.parameters.keys():
        defval = ind._sig.parameters[param].default
        if defval is inspect._empty:
            continue
        if param == 'freq':
            opts.append(pn.widgets.Select(name=param,
                                          options={'Annual': 'YS',
                                                   'Seasonal': 'QS-DEC'}))
        elif param == 'window':
            # intslider
            opts.append(pn.widgets.IntSlider(name=param, start=1, step=1, end=8, value=defval))
        else:
            opts.append(pn.widgets.TextInput(name=param,
                                             placeholder=str(defval)))
    C_indopts[2] = f'#### {ind.title}\n{ind.abstract}'
    C_indopts[3] = pn.Column(*opts)
    C_indopts[4] = w_ind_compute

w_ind_sel.param.watch(handle_indicator_change, 'value')

def handle_compute(event):
    indicator = w_ind_sel.value
    
    params = {}
    for opt in C_indopts[3]:
        value = opt.value
        if not value:
            value = opt.placeholder
        params[opt.name] = value
    
    compute_indicator(indicator, params)
    update_ind_avail()
    handle_cancel(None)

w_ind_compute.on_click(handle_compute)

In [15]:
P_ind = pn.Row(C_indopts, C_list)

In [16]:
# All options:
P_reg

In [10]:
P_ind

In [None]:
state['extracted_regions']

In [None]:
a = pn.pane.Markdown('bfjenkw')

In [None]:
a.se

In [None]:
# Map view
# The 2 maps + Horizon, RCP, Percentile
C_mapview = pn.Column(pn.Row('', ''), pn.Row('', '', ''))

# Graph view
C_graphview = pn.Column('')

# Summary view
C_summview = pn.Column('')


T = pn.Tabs()
T.append(('Maps', 'Please select a region and data first.'))
T.append(('Graph', 'Please select data first.'))
T.append(('Summary', 'Please select data first.'))
