In [1]:
## Data handlers
from dask_jobqueue import *
from dask.distributed import *
import xarray as xr
import numpy as np
import pandas as pd
import datetime
import netCDF4 
from dask import delayed
from dask import compute
from dask.diagnostics import*
from tqdm import tqdm
import dask
## Plots
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import seaborn as sns
import matplotlib.dates as mdates

## PDF generator 
from reportlab.pdfgen import canvas
from reportlab.lib.pagesizes import A4
from reportlab.lib.pagesizes import portrait
from reportlab.platypus import Image
from reportlab.platypus import Table
from reportlab.lib import colors
## Global config
import os, sys, glob
module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
    sys.path.append(module_path)
import config 
import gc
import warnings
warnings.filterwarnings('ignore')


In [68]:
## Request dask workers using PBS for 3 hour##
#cluster = PBSCluster(cores=6,memory='64GB',queue='low',project='civil',interface='ib0',walltime='02:00:00')
#cluster.scale(2)

In [82]:
## Keep refreshing this cell till you see the workers and cores after job has started. ##
client = Client(cluster) 
client

0,1
Client  Scheduler: tcp://172.20.9.177:32808  Dashboard: http://172.20.9.177:36159/status,Cluster  Workers: 2  Cores: 4  Memory: 42.66 GB


In [83]:
## Build the list of files in LIS output directory
lsm_files = []
route_files = []

for file in tqdm(sorted(glob.glob('/home/civil/phd/cez198621/msaharia/LDAS/02-ILDAS/OUTPUT/EXP002/SURFACEMODEL/*/*HIST*'))) :
    lsm_files.append(file)
for file in tqdm(sorted(glob.glob('/home/civil/phd/cez198621/msaharia/LDAS/02-ILDAS/OUTPUT/EXP002/ROUTING/*/*HIST*'))) :
    route_files.append(file)

100%|██████████| 24471/24471 [00:00<00:00, 2130408.34it/s]
100%|██████████| 24471/24471 [00:00<00:00, 2119279.25it/s]


In [50]:
 ## Find the list of valid stations having more than 5x365 observations and also find the earliest year of observation.##
 ## Using Dask Delayed to parallel execution ##

def valid_stations (read_dir,min_values,key) :
        try :
            key_4 =  format(key, "04")
            gauge_id = 'IWM-gauge-'+str(format(key_4))
            station = pd.read_csv(read_dir+gauge_id+'.csv').dropna(subset=['Streamflow (cumecs)'])
            value_count = station['Streamflow (cumecs)'].count()
            if value_count >= min_values :
                return gauge_id
        except Exception as e:
            return
read_dir = ('/home/civil/phd/cez198621/projects/IWM_verification/IMDAA_runs/observed_data/IFI-Observations/flow/')        
min_values = 5*365
n_stations = 3900
dask_results = []
for key in range(0,n_stations) :
    dask_result = dask.delayed(valid_stations)(read_dir,min_values,key)
    dask_results.append(dask_result)
dask_compute = dask.compute(*dask_results)
valid_stations = [] 
for val in dask_compute: 
    if val != None : 
        valid_stations.append(val) 
print('Number of valid stations:',str(len(valid_stations)))
# Save the valid guage files in a separate directory for later use ##
for i in tqdm(range(0,len(valid_stations))):
    key = valid_stations[i]
    station = pd.read_csv('/home/civil/phd/cez198621/projects/IWM_verification/IMDAA_runs/observed_data/IFI-Observations/flow/'+key+'.csv')
    station.set_index('Date',inplace=True)
    station.to_csv('/home/civil/phd/cez218606/LISF1/Results/PRINCETON/processed_data/'+key+'-obs.csv')
## Save the list of valid gaugeIDs ##
valid_df = pd.DataFrame(data={"stations": valid_stations})
valid_df.to_csv('/home/civil/phd/cez218606/LISF1/Results/PRINCETON/processed_data/valid_stations_list.csv',index=False)

Number of valid stations: 284


In [None]:
## Using hit and trial, match the LIS file index with observation start date (1959-04-06) and build index for batches  of 2 years##
batch_index = [4112,5843,9496,13148,16801,20453,24472]
## Open gauge metadata file and set index as GaugeId ##
meta_file = pd.read_csv('/home/civil/phd/cez218606/LISF1/Results/PRINCETON/observed_data/gaugemetadata.csv')
meta_file.reset_index(drop = True,inplace=True)
meta_file.set_index('GaugeID',inplace=True)
## Create the list of desired variables to be extracted. SoilMoist will be extracted separately for different profiles. ##
lsm_vars = ['Evap_tavg','TotalPrecip_tavg']
route_vars = ['FloodedFrac_tavg','RiverDepth_tavg','SWS_tavg','Streamflow_tavg']
## Create the empty containers to store gauge-wise extractions.## 
batch_vars = [None]*len(valid_stations)
merged_vars = [None]*len(valid_stations)

In [None]:
%%time
## Load LIS data in the batches using "open_mfdataset" and then load it in RAM using ".compute" ##
for n in tqdm(range (0,(len(batch_index)-1))):
    lsmdat = xr.open_mfdataset(lsm_files[batch_index[n]:batch_index[n+1]],combine='by_coords',parallel=True)
    routedat=xr.open_mfdataset(route_files[batch_index[n]:batch_index[n+1]],combine='by_coords',parallel = True)
    lsmdat = config.reformat_LIS_output(lsmdat)
    routedat = config.reformat_LIS_output(routedat)
    lsmdat  = lsmdat.chunk({'time':365})
    routedat = routedat.chunk({'time':365})
    lsmdat = lsmdat.compute()
    routedat = routedat.compute()
    
    ## Iterate over the gauge stations and extract required simulated data ##
    for i in tqdm(range(0,len(valid_stations))):
        gauge_id = valid_stations[i]
        gauge_lat = meta_file.loc[gauge_id,'Latitude']
        gauge_lon = meta_file.loc[gauge_id,'Longitude']
        lsmdat_sel = lsmdat.sel(lat=gauge_lat,lon=gauge_lon,method='nearest')
        routedat_sel = routedat.sel(lat=gauge_lat,lon=gauge_lon,method='nearest')
        ext_lsm = lsmdat_sel[lsm_vars].to_dataframe()
        ext_soil_1 = lsmdat_sel['SoilMoist_tavg'].sel(SoilMoist_profiles=0).to_dataframe()
        ext_soil_1 = ext_soil_1.rename(columns={'SoilMoist_tavg': "SM_L1"})
        ext_soil_2 = lsmdat_sel['SoilMoist_tavg'].sel(SoilMoist_profiles=1).to_dataframe()
        ext_soil_2 = ext_soil_2.rename(columns={'SoilMoist_tavg': "SM_L2"})
        ext_soil_3 = lsmdat_sel['SoilMoist_tavg'].sel(SoilMoist_profiles=2).to_dataframe()
        ext_soil_3 = ext_soil_3.rename(columns={'SoilMoist_tavg': "SM_L3"})
        ext_soil_4 = lsmdat_sel['SoilMoist_tavg'].sel(SoilMoist_profiles=3).to_dataframe()
        ext_soil_4 = ext_soil_4.rename(columns={'SoilMoist_tavg': "SM_L4"})
        ext_route = routedat_sel[route_vars].to_dataframe()
        batch_vars[i] = pd.concat([ext_lsm,ext_soil_1,ext_soil_2,ext_soil_3,ext_soil_4,ext_route],axis=1)
        batch_vars[i] = batch_vars[i].loc[:,~batch_vars[i].columns.duplicated()]
        batch_vars[i] = batch_vars[i].drop(['lat','lon'], axis=1)
        merged_vars[i] = pd.concat([merged_vars[i],batch_vars[i]])
        
    ## Purge the variables to free up RAM ##
    del lsmdat
    del routedat
    del lsmdat_sel
    del routedat_sel
    gc.collect()
    
## Save the extracted variables in gauge-wise CSVs ##
for i in tqdm(range(0,len(valid_stations))):
    key = valid_stations[i]
    merged_vars[i].to_csv('/home/civil/phd/cez198621/projects/IWM_verification/IMDAA_runs/processed_data/'+key+'-sim.csv')


In [84]:
# For GRDC Data
#grdc_filelist =sorted(glob.glob('./../data/input/grdc_monthly/GRDC_csv/*monthly*'))
## Using hit and trial, match the LIS file index with observation start date (1959-04-06) and build index for batches  of 10 years##
#batch_index = [0,4112,5843,9496,11687]
## Open gauge metadata file and set index as GaugeId ##
#meta_file = pd.read_csv('../data/input/grdc_monthly/GRDC_csv/metafile.csv')
# meta_file.reset_index(drop = True,inplace=True)
# meta_file.set_index('GRDC_No',inplace=True)
## Create the list of desired variables to be extracted. SoilMoist will be extracted separately for different profiles. ##
#lsm_vars = ['Evap_tavg','TotalPrecip_tavg']
#route_vars = ['FloodedFrac_tavg','RiverDepth_tavg','SWS_tavg','Streamflow_tavg']
## Create the empty containers to store gauge-wise extractions.## 
#batch_vars = [None]*len(grdc_filelist)
#merged_vars = [None]*len(grdc_filelist)

In [85]:
#%%time
# For GRDC Data
## Load LIS data in the batches using "open_mfdataset" and then load it in RAM using ".compute" ##
#for n in tqdm(range (0,(len(batch_index)-1))):
#    lsmdat = xr.open_mfdataset(lsm_files[batch_index[n]:batch_index[n+1]],combine='by_coords',parallel=True)
#    routedat=xr.open_mfdataset(route_files[batch_index[n]:batch_index[n+1]],combine='by_coords',parallel = True)
#    lsmdat = config.reformat_LIS_output(lsmdat)
#    routedat = config.reformat_LIS_output(routedat)
#    lsmdat  = lsmdat.chunk({'time':365})
#    routedat = routedat.chunk({'time':365})
#    lsm_monthly =lsmdat.resample(time="1M").mean()
#    route_monthly = routedat.resample(time="1M").mean()
#    lsmdat = lsm_monthly.compute()
#    routedat = route_monthly.compute()
#    
#    ## Iterate over the gauge stations and extract required simulated data ##
#    for i in tqdm(range(0,len(grdc_filelist))):
#        gauge_id = meta_file.loc[i,'GRDC_No']
#        gauge_lat = meta_file.loc[i,'Latitude']
#        gauge_lon = meta_file.loc[i,'Longitude']
#        lsmdat_sel = lsmdat.sel(lat=gauge_lat,lon=gauge_lon,method='nearest')
#        routedat_sel = routedat.sel(lat=gauge_lat,lon=gauge_lon,method='nearest')
#        ext_lsm = lsmdat_sel[lsm_vars].to_dataframe()
#        ext_soil_1 = lsmdat_sel['SoilMoist_tavg'].sel(SoilMoist_profiles=0).to_dataframe()
#        ext_soil_1 = ext_soil_1.rename(columns={'SoilMoist_tavg': "SM_L1"})
 #       ext_soil_2 = lsmdat_sel['SoilMoist_tavg'].sel(SoilMoist_profiles=1).to_dataframe()
 #       ext_soil_2 = ext_soil_2.rename(columns={'SoilMoist_tavg': "SM_L2"})
 #       ext_soil_3 = lsmdat_sel['SoilMoist_tavg'].sel(SoilMoist_profiles=2).to_dataframe()
 #       ext_soil_3 = ext_soil_3.rename(columns={'SoilMoist_tavg': "SM_L3"})
 #       ext_soil_4 = lsmdat_sel['SoilMoist_tavg'].sel(SoilMoist_profiles=3).to_dataframe()
 #       ext_soil_4 = ext_soil_4.rename(columns={'SoilMoist_tavg': "SM_L4"})
 #       ext_route = routedat_sel[route_vars].to_dataframe()
 #       batch_vars[i] = pd.concat([ext_lsm,ext_soil_1,ext_soil_2,ext_soil_3,ext_soil_4,ext_route],axis=1)
 #       batch_vars[i] = batch_vars[i].loc[:,~batch_vars[i].columns.duplicated()]
 #       batch_vars[i] = batch_vars[i].drop(['lat','lon'], axis=1)
 #       merged_vars[i] = pd.concat([merged_vars[i],batch_vars[i]])
 #       
  #  ## Purge the variables to free up RAM ##
 #   del lsmdat
 #   del routedat
 #   del lsmdat_sel
 #   del routedat_sel
 #   gc.collect()
 #   
## Save the extracted variables in gauge-wise CSVs ##
#for i in tqdm(range(0,len(grdc_filelist))):
 #   key = meta_file.loc[i,'GRDC_No']
#    merged_vars[i].to_csv('../data/output/grdc_sim/'+str(key)+'_sim.csv')

  0%|          | 0/4 [00:00<?, ?it/s]
  0%|          | 0/44 [00:00<?, ?it/s][A
 11%|█▏        | 5/44 [00:00<00:00, 47.40it/s][A
 25%|██▌       | 11/44 [00:00<00:00, 50.35it/s][A
 36%|███▋      | 16/44 [00:00<00:00, 48.92it/s][A
 50%|█████     | 22/44 [00:00<00:00, 51.22it/s][A
 64%|██████▎   | 28/44 [00:00<00:00, 52.96it/s][A
 77%|███████▋  | 34/44 [00:00<00:00, 54.71it/s][A
100%|██████████| 44/44 [00:00<00:00, 55.49it/s][A
 25%|██▌       | 1/4 [17:13<51:39, 1033.23s/it]
  0%|          | 0/44 [00:00<?, ?it/s][A
 14%|█▎        | 6/44 [00:00<00:00, 54.69it/s][A
 27%|██▋       | 12/44 [00:00<00:00, 55.12it/s][A
 41%|████      | 18/44 [00:00<00:00, 55.92it/s][A
 55%|█████▍    | 24/44 [00:00<00:00, 56.80it/s][A
 68%|██████▊   | 30/44 [00:00<00:00, 57.16it/s][A
 82%|████████▏ | 36/44 [00:00<00:00, 57.49it/s][A
100%|██████████| 44/44 [00:00<00:00, 57.43it/s][A
 50%|█████     | 2/4 [23:39<27:58, 839.22s/it] 
  0%|          | 0/44 [00:00<?, ?it/s][A
  2%|▏         | 1/44 [00:00

CPU times: user 29min 14s, sys: 1min 15s, total: 30min 29s
Wall time: 47min 54s





In [17]:
## Append LIS extractions to observed data and save as new CSVs####
#meta_file = pd.read_csv('../data/input/grdc_monthly/GRDC_csv/metafile.csv')
#grdc_filelist =sorted(glob.glob('./../data/input/grdc_monthly/GRDC_csv/*monthly*'))
#for i in tqdm(range (0,len(grdc_filelist))):
#    key = meta_file.loc[i,'GRDC_No']
#    obs = pd.read_csv('../data/input/grdc_monthly/GRDC_csv/'+str(key)+'_monthly.csv')
#    sim = pd.read_csv('../data/output/grdc_sim/'+str(key)+'_sim.csv')
#    sim = sim.rename(columns={"time": "Date"})
#    obs = obs.join(sim.set_index('Date'), on='Date')
#    obs.to_csv('../data/output/grdc_sim/grdc_merged/'+str(key)+'_merged.csv')

100%|██████████| 44/44 [00:02<00:00, 18.57it/s]
