# Investigate the impacts of all realizations on all users 

In [1]:
import pandas as pd
import numpy as np
import os
import glob
import duckdb
import ast
import matplotlib.pyplot as plt
import itertools

## Load drought counts and classify each realization

In [None]:
# Load in all drought counts for all realizations
droughts_df = pd.read_csv('../rival_framings_demand/drought_counts_all_realizations_non_stationary.csv', index_col=0)
droughts_df.index = np.arange(1, len(droughts_df) + 1)

In [None]:
# We will use these counts to classify whether a realization belongs 
# in history, paleo variability, or all-encompassing experiments

# create a list of our conditions
conditions = [
    (droughts_df['Decadal counts']/105*100 <= 21) & (droughts_df['Multidecadal counts']/105*100 == 0), #history criteria
    ((droughts_df['Decadal counts']/105*100 > 21) | (droughts_df['Multidecadal counts']/105*100 > 0)) & #paleo criteria
    (droughts_df['Decadal counts']/105*100 <= 57) & (droughts_df['Multidecadal counts']/105*100 <= 42),
    (droughts_df['Decadal counts']/105*100 > 57) | (droughts_df['Multidecadal counts']/105*100 > 42)] #all-encompassing critera

# create a list of the values we want to assign for each condition
values = ['History', 'Paleo', 'Encompassing']

# create a new column and use np.select to assign values to it using our lists as arguments
droughts_df['Classification'] = np.select(conditions, values)

# display updated DataFrame
droughts_df.head()

In [None]:
droughts_df['Classification'].value_counts()

In [None]:
# directory where the experiment data is stored
flow_data_dir = '../rival_framings_demand/xdd_parquet_flow'

## Loop through every realization and calculate summary impact metrics

Function converting order number to sample and realization for file retrieval

In [None]:
no_to_realization = lambda x: (int((x-1)/10)+1, (x-1)%10+1)

Create empty dataframe to store impacts summary per realization

In [None]:
dtypes = np.dtype(
    [
        ("Duration", int),
        ("Magnitude", float),
        ("%_users", float),
        ("Outflow_drought_min", float),
        ("Outflow_10th", float)
    ]
)
df_impacts = pd.DataFrame(np.zeros(1000, dtype=dtypes))
df_impacts.index = np.arange(1, len(df_impacts) + 1)

Define function to calculate duration of continuous occurences

In [None]:
def shortage_duration(sequence, threshold):
    cnt_shrt = [sequence[i]>=threshold for i in range(len(sequence))] # Returns a list of True values when there's a shortage
    shrt_dur = [ sum( 1 for _ in group ) for key, group in itertools.groupby( cnt_shrt ) if key ] # Counts groups of True values
    return shrt_dur

Historic rolling sum deliveries (calculated in annual_outflow.txt)

In [None]:
deliveries_thresholds = [35000, 43220, 44896] #Hypothetical, Hist. Min, Hist. 5th

Define function that identifies the percentile violated

In [None]:
def percentile(x):
    for i, per in enumerate(deliveries_thresholds):
        if x<= per:
            violation = i
            break
        else:
            violation = 999
    return violation

Loop through all realizations and calculate user impacts and effects on basin outflows

In [None]:
WDs = [36, 37, 38, 39, 45, 50, 51, 52, 53, 70, 72]

for k in df_impacts.index:
    print(k)
    
    realization_index = k
    # get sample and realization for file retrieval
    sample, real = no_to_realization(realization_index)
    # get years that are in drought in specific realization
    years = list(itertools.chain.from_iterable(ast.literal_eval(droughts_df.at[int(realization_index),'Drought years'])))
    years = tuple([x+1908 for x in years])
    
    # only do the following if there's at least one drought year
    # if there is no drought, values will stay default (zero) and we only need to calculate outflow percentile
    
    # target glob path
    glob_path = os.path.join(flow_data_dir, f'S{sample}_{real}.parquet')
    
    sql = f"""
    SELECT
        SUM(river_outflow)*1233.4818/1000000 AS annual_river_outflow
    FROM
        '{glob_path}'
    GROUP BY
        structure_id
        ,year
    HAVING
        structure_id = '09163500'
    ORDER BY
        year;
    """     
    # get query result as a data frame
    try:
        df_flows = duckdb.query(sql).df()
        df_impacts.at[realization_index, "Outflow_10th"] = np.nanpercentile(df_flows['annual_river_outflow'].rolling(10).sum(), 10)
    except RuntimeError:
        print(f'missing file S{sample}_{real}')
    if years:
        sql = f"""
        SELECT
            SUM(river_outflow)*1233.4818/1000000 AS annual_river_outflow
        FROM
            '{glob_path}'
        GROUP BY
            structure_id
            ,year
        HAVING
            year in {years} AND structure_id = '09163500'
        ORDER BY
            year;
        """        
        try:
            # get query result as a data frame
            df_flows_drought = duckdb.query(sql).df()

            #filter out gauge structures and keep only drought years
            # Calculate shortage to drought percentage
            sql = f"""
            SELECT 
                structure_id
                ,year
                ,shortage
                ,demand
                ,shortage/demand*100 as ratio
            FROM
                '{glob_path}'
            WHERE
                year in {years}
                AND structure_id NOT LIKE '09%' ; 
            """
            df = duckdb.query(sql).df()

            df['ratio'] = df['ratio'].fillna(0)

            # Get list of users
            users = df['structure_id'].unique()
            
            # Create dataframe to store impacts for each user (no of users = 343)
            dtypes = np.dtype(
                [
                    ("Duration", int),
                    ("Magnitude", float),
                    ("Experienced_shortage", int)
                ]
            )
            user_impacts_summary = pd.DataFrame(np.zeros(343, dtype=dtypes))
            user_impacts_summary.index = users

            # Loop through all users and calculate individual impacts
            for index, row in user_impacts_summary.iterrows():
                user_impacts = df[df['structure_id']==index]
                if np.sum(user_impacts['ratio']) > 0: # check if user experienced any impacts
                    user_impacts_summary.at[index, "Experienced_shortage"] = 1
                    # calculate mean user impacts
                    user_impacts_summary.at[index, "Magnitude"] = np.around(np.mean(user_impacts['ratio']), 
                                                                            decimals= 1) 
                    # calculate longest duration of at least mean
                    user_shrt_dur = shortage_duration(user_impacts['ratio'].values, 
                                                      user_impacts_summary.at[index, "Magnitude"]) 
                    if user_shrt_dur:
                        user_impacts_summary.at[index, "Duration"] = np.around(np.max(user_shrt_dur)/12, 
                                                                               decimals= 1) 
                        
            # Summarize impacts across all users 
            df_impacts.at[realization_index, "%_users"] = np.around(np.mean(user_impacts_summary["Experienced_shortage"])*100, 
                                                                    decimals= 0)
            df_impacts.at[realization_index, "Magnitude"] = np.around(np.mean(user_impacts_summary["Magnitude"]), 
                                                                      decimals= 0)
            df_impacts.at[realization_index, "Duration"] = np.around(np.mean(user_impacts_summary["Duration"]), 
                                                                     decimals= 0)
            df_impacts.at[realization_index, "Outflow_drought_min"] = np.min(df_flows_drought['annual_river_outflow'].rolling(10).sum())
            
            # Summarize impacts across WDs
            for wd in WDs:
                df_impacts.at[realization_index, f"%_users_{wd}"] = np.around(np.mean(user_impacts_summary["Experienced_shortage"].filter(regex=f'^{wd}', 
                                                                                                                                         axis=0))*100, 
                                                                              decimals= 0)
                df_impacts.at[realization_index, f"Magnitude_{wd}"] = np.around(np.mean(user_impacts_summary["Magnitude"].filter(regex=f'^{wd}', 
                                                                                                                                 axis=0)), 
                                                                                decimals= 0)
        except RuntimeError:
            print(f'missing file S{sample}_{real}')

In [None]:
df_impacts['Classification'] = droughts_df['Classification']
df_impacts.to_csv('drought_impacts_all_realizations.csv')

In [None]:
df_impacts

In [None]:
df_impacts['Classification'] = droughts_df['Classification']
df_impacts.to_csv('drought_impacts_all_realizations_non_stationary.csv')

In [13]:
df_impacts['Classification'] = droughts_df['Classification']
df_impacts.to_csv('drought_impacts_all_realizations_non_stationary.csv')