In [None]:
import pydsm
from pydsm import postpro

# Dask related functions

In [None]:
###############################################################################
# Dask uses parallel processing, which will run several times faster          #
# However, messages printed to stdout will not be displayed in the notebook.  #
# This includes messages indicating that plots will not be created for        #
# certain locations due to missing DSS data. These messages will be displayed #
# in the conda prompt window. The use of dask with network drives is not      #
# recommended--some processes may fail.                                       #
###############################################################################
use_dask = True

import dask
from dask.distributed import Client, LocalCluster

class DaskCluster:
    def __init__(self):
        self.client=None
    def start_local_cluster(self):
        cluster = LocalCluster(n_workers=8, threads_per_worker=1, memory_limit='6G') # threads_per_worker=1 needed if using numba :(
        self.client = Client(cluster)
    def stop_local_cluster(self):
        self.client.shutdown()
        self.client=None
        
def run_all(processors):
    tasks=[dask.delayed(postpro.run_processor)(processor,dask_key_name=f'{processor.study.name}::{processor.location.name}/{processor.vartype.name}') for processor in processors]
    dask.compute(tasks)

In [None]:
if use_dask:
    cluster = DaskCluster()
    cluster.start_local_cluster()
    cluster.client

# Setup for EC

In [None]:
dssfile='e:/dsm2v82_calibration/ObservedData/updated_sep-oct2020/cdec_ec_merged_sep2020.dss'
locationfile='e:/dsm2v82_calibration/LocationInfo/calibration_ec_stations.csv'
vartype='EC'
units='mmhos/cm'
study_name='Observed'
observed=True
processors=postpro.build_processors(dssfile, locationfile, vartype, units, study_name, observed)
if use_dask:
    run_all(processors)
else:
    for p in processors:
        postpro.run_processor(p)

# Setup for FLOW

In [None]:
dssfile='e:/dsm2v82_calibration/ObservedData/updated_sep-oct2020/cdec_flow_merged_sep2020.dss'
locationfile='e:/dsm2v82_calibration/LocationInfo/calibration_flow_stations.csv'
vartype='FLOW'
units='cfs'
study_name='Observed'
observed=True
processors=postpro.build_processors(dssfile, locationfile, vartype, units, study_name, observed)
for processor in processors:
    if processor.vartype.name == 'FLOW' and processor.location.name == 'VCU':
        processor.do_scale(-1)
if use_dask:
    run_all(processors)
else:
    for p in processors:
        postpro.run_processor(p)

# Setup for STAGE

In [None]:
dssfile='e:/dsm2v82_calibration/ObservedData/updated_sep-oct2020/cdec_stage_merged_sep2020.dss'
locationfile='e:/dsm2v82_calibration/LocationInfo/calibration_stage_stations.csv'
vartype='STAGE'
units='feet'
study_name='Observed'
observed=True
processors=postpro.build_processors(dssfile, locationfile, vartype, units, study_name, observed)
if use_dask:
    run_all(processors)
else:
    for p in processors:
        postpro.run_processor(p)

In [None]:
# Execute the command below to shutdown the cluster when done. 
if use_dask:
    cluster.stop_local_cluster()