In [1]:
# import functions 
import pandas as pd
import matplotlib.pyplot as plt
import xarray as xr
import glob
import numpy as np
import geopandas as gpd
from shapely.geometry import Point
from pylab import MaxNLocator
import rioxarray
from shapely.geometry import Point
from shapely.geometry import mapping

In [2]:
def geo_idx(dd, dd_array):
   """
     search for nearest decimal degree in an array of decimal degrees and return the index.
     np.argmin returns the indices of minium value along an axis.
     so subtract dd from all values in dd_array, take absolute value and find index of minium.
    """
   geo_idx = (np.abs(dd_array - dd)).argmin()
   return geo_idx

In [3]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from multiprocessing import Pool
from inspect import signature


#--------------------------------------------------------------
# @profile
def parallel(function, it, nbrCores, noInteract=False):
    #evaluates in parallel all the results, not lazy

    #disable interactive window for plots, also kills existing figures
    if noInteract: plt.switch_backend('Agg')

    #generate pool of workers
    with Pool(processes = nbrCores) as pool:

        #single argument for function
        if len(signature(function).parameters)==1:
            results = pool.map(function, it)

        #mutliple arguments for function
        else:
            results = pool.starmap(function, it)

    #kill all processes
    pool.close()
    pool.join()

    #re enable interactive window for plots
    if noInteract: 
        try: plt.switch_backend('Qt5Agg')
        except ImportError: pass   #pass if non-interactive shell 

    if type(results[0])==tuple:
       
        return (np.asarray(r) for r in zip(*results)) 
    #convert directly to numpy array, merge each proc times
    else:
        return np.asarray(results)


#--------------------------------------------------------------
def distrib_task(begin, end, division) :

    #job distribution
    size   = end - begin + 1    #total number of indexes
    segm   = size // division   #number of iteration to perform per division
    remain = size % division    #remaining indexes to calculate after division

    #handles case if less time elements than division
    if segm == 0: division = remain

    #initialization
    lower = begin
    jobs = [None]*division

    #loop over divisions
    for i in range(division):

        #distribute indexes into the divisions, accounting for remaining loops
        upper = lower + segm    #upper index
        if remain > 0:
            upper  += 1      #added +1 if additional index to process
            remain -= 1      #keep track of the remaining loops to distribute

        jobs[i] = slice(lower,upper,1)
        lower = upper             #next pair lower index
        #next division

    return jobs

In [4]:
from rich import print
import pandas as pd
from datetime import datetime
from xlsx2csv import Xlsx2csv
from io import StringIO

def timer(name, startTime = None):
    if startTime:
        print(f"Timer: Elapsed time for [{name}]: {datetime.now() - startTime}")
    else:
        startTime = datetime.now()
        print(f"Timer: Starting [{name}] at {startTime}")
        return startTime


def read_excel(path: str, sheet_index: int) -> pd.DataFrame:
    print(path)
    buffer = StringIO()
    Xlsx2csv(path, outputencoding="utf-8").convert(buffer,sheetid=sheet_index)
    buffer.seek(0)
    df = pd.read_csv(buffer,low_memory=False)
    return df

In [5]:
def natural_keys_urban(text):
    start = text.split('w_')[-1].index('t')
    end  = text.split('w_')[-1].index('_')
    mo = text.split('w_')[-1][start+1:end]
    year = text.split('w_')[-1][-4]
    return int(year)*12+int(mo)
def natural_keys(text):
    start = text.split('_y')[-1].index('t')
    end  = text.split('_y')[-1].index('/')
    mo = text.split('_y')[-1][start+1:end]
    year = text.split('_y')[-1][0]
    return int(year)*12+int(mo)
def natural_keys_ag(text):
    start = text.split('w_')[-1].index('m')
    end  = text.split('w_')[-1].index('_')
    mo = text.split('w_')[-1][start+1:end]
    year = text.split('w_')[-1][-4]
    return int(year)*12+int(mo)

In [6]:
# file_path = r"/oak/stanford/groups/gorelick/bhima_08262024/modules/hydro/sixyears_bau4year/hydro_outputs"
basepath = r"/scratch/users/awan005/bhima_mas_code_publication"
file_path = basepath + r"/outputs/data_raw/sixyears_bau4year"

In [7]:
sim = ['scenario1_donothing', 'scenario2_donothing'] 
# scenario1: moderate cliamte change, high urbanization
# scenario2: extreme climate change, high urbanization

### Reservoir storage

In [8]:
## reservoir location
waterbody = pd.read_csv(basepath + r"/modules/hydro/hydro_files/excel/Copy of Waterbodies.csv")
waterbody_water = waterbody[waterbody['lat']>0]
Reservoirwater = xr.open_dataarray(file_path  + "_" + sim[0] + "/" +'final_y1_t1' +'/'+ "lakeResStorage_daily.nc")

In [9]:
Res = {}
for i in range(waterbody_water.shape[0]):
    Res[waterbody_water['Lake_name'][waterbody_water.index[i]]]=(waterbody_water['lat'][waterbody_water.index[i]],waterbody_water['long'][waterbody_water.index[i]])

In [10]:
lats = Reservoirwater['lat'][:]
lons = Reservoirwater['lon'][:]
lat_idx={}
lon_idx={}

for keys in Res:
    outlet = Res[keys]

    in_lat = outlet[0]
    in_lon = outlet[1]

    lat_idx[keys] = geo_idx(in_lat, lats)
    lon_idx[keys] = geo_idx(in_lon, lons)

In [11]:
List = ['Bhama Askhed', 'Andra', 'Pawana', 'Mulshi', 'Temghar', 'Warasgaon', 'Panshet', 'Khadakwasla','Ujjani','Veer','Bhatghar']

### reservoir 

In [12]:
res_path_tt = []

for k in range(len(sim)):
    res_path = glob.glob(file_path + '_' + sim[k] + r"/final_y*/lakeResStorage_daily.nc")
    res_path.sort(key=natural_keys)

    res_path_tt+=res_path

In [13]:
def res_storage(k, m):

    da_combine_res = xr.open_dataarray(res_path_tt[k])[:,lat_idx[m],lon_idx[m]]

    res_tt = da_combine_res[:-1].groupby('time').sum(...).mean().values * 1e-6  

    return res_tt

In [14]:
import psutil

nbrCores= psutil.cpu_count(logical=False)
print(str(nbrCores))

res_tt2 = {}
for i in range(len(List)):

    it = ((int(j), List[i]) for j in range(len(res_path_tt)))

    res_tt2[List[i]] = parallel(res_storage,it,nbrCores)

In [15]:
pd.DataFrame.from_dict(res_tt2).to_csv(basepath + r"/outputs/data_processed/resource_status_metrics/res_forpaper_extremeclimate_ssp1.csv")

### river

In [16]:
Khamgoan = (18.545,74.21861111)

In [17]:
def geo_idx(dd, dd_array):
   """
     search for nearest decimal degree in an array of decimal degrees and return the index.
     np.argmin returns the indices of minium value along an axis.
     so subtract dd from all values in dd_array, take absolute value and find index of minium.
    """
   geo_idx = (np.abs(dd_array - dd)).argmin()
   return geo_idx

In [18]:
da = Reservoirwater
outlet = Khamgoan

lats = da['lat'][:]
lons = da['lon'][:]

in_lat = outlet[0]
in_lon = outlet[1]

lat_idx = geo_idx(in_lat, lats)
lon_idx = geo_idx(in_lon, lons)

In [19]:
discharge_path_tt = []

for k in range(len(sim)):
    
    discharge_path = glob.glob(file_path + '_' + sim[k] + r"/final_y*/discharge_daily.nc")

    discharge_path.sort(key=natural_keys)

    discharge_path_tt+=discharge_path


In [20]:
def discharge_river(k, m, n):

    da_combine_res = xr.open_dataarray(discharge_path_tt[k])[:,m,n]

    res_tt = da_combine_res[:-1].values 

    return res_tt

In [21]:
import psutil

nbrCores= psutil.cpu_count(logical=False)
print(str(nbrCores))

river_tt2 = []


it = ((int(j), lat_idx, lon_idx) for j in range(len(discharge_path_tt)))

river_tt2 = parallel(discharge_river,it,nbrCores)

  return array(a, dtype, copy=False, order=order)


In [22]:
pd.DataFrame(np.hstack(river_tt2)).to_csv(basepath + r"/outputs/data_processed/resource_status_metrics/river_forpaper_extremeclimate_ssp1.csv")

### groundwater depletion

In [23]:
gw_path_tt = []

for k in range(0,len(sim),1):
    gw_path = glob.glob(file_path + '_' + sim[k] + r"/final_y*/gwdepth_adjusted_daily.nc")

    gw_path.sort(key=natural_keys)

    gw_path_tt+=gw_path

In [24]:
Pune_shp = gpd.read_file(basepath + r"/modules/hydro/hydro_files/shapefiles/PMC_Boundary/PMC_PCMC_Ward_Bounda_Outer.shp")
tanker_shp = gpd.read_file(basepath + r"/modules/hydro/hydro_files/shapefiles/tanker/tanker.shp")

In [25]:
def gw_depletion_basin(s):
    
    pp_mean = []
    tanker_mean = []

    gw_path = gw_path_tt[s]
    gw_nc = rioxarray.open_rasterio(gw_path,masked=True)[:-1].mean(dim=['time'])
    gw_nc.rio.set_spatial_dims(x_dim="x", y_dim="y", inplace=True)
    gw_nc.rio.write_crs("epsg:4326", inplace=True)
    
    pune_gpd = gw_nc.rio.clip(Pune_shp.geometry.apply(mapping), Pune_shp.crs)    
    pp_mean.append(pune_gpd.mean().values)
    
    tanker_gpd = gw_nc.rio.clip(tanker_shp.geometry.apply(mapping), tanker_shp.crs)    
    tanker_mean.append(tanker_gpd.mean().values)

    return pp_mean, tanker_mean

In [26]:
import psutil

nbrCores= psutil.cpu_count(logical=False)
print(str(nbrCores))

it = ((j) for j in range(len(gw_path_tt)))

pp_bb, pp_tanker = parallel(gw_depletion_basin,it,nbrCores)

In [27]:
pd.DataFrame(np.hstack(pp_bb)).to_csv(basepath + r"/outputs/data_processed/resource_status_metrics/gw_forpaper_pune_extremeclimate_ssp1.csv")

In [28]:
pd.DataFrame(np.hstack(pp_tanker)).to_csv(basepath + r"/outputs/data_processed/resource_status_metrics/gw_forpaper_tanker_extremeclimate_ssp1.csv")

### Groundwater map

In [29]:
xr.open_dataset(gw_path_tt[4+48]).to_netcdf(basepath + r"/outputs/data_processed/resource_status_metrics/gwdepth_adjusted_daily_avg2.nc")

In [30]:
xr.open_dataset(gw_path_tt[4+48+12]).to_netcdf(basepath + r"/outputs/data_processed/resource_status_metrics/gwdepth_adjusted_daily_yr1.nc")

In [31]:
xr.open_dataset(gw_path_tt[4+48+24]).to_netcdf(basepath + r"/outputs/data_processed/resource_status_metrics/gwdepth_adjusted_daily_yr2.nc")

In [32]:
xr.open_dataset(gw_path_tt[4+48+36]).to_netcdf(basepath + r"/outputs/data_processed/resource_status_metrics/gwdepth_adjusted_daily_yr3.nc")