# Delta Water Unavailability Methodology Module 2: Demand Processing

In [None]:
# This script calls input datasets of water rights and demands in the Sacramento-San Joaquin Delta Watershed
# to prepare them for the water unavailability analysis.
# Further documentation is available on the Delta Water Unavailability Methodology website:
# https://www.waterboards.ca.gov/waterrights/water_issues/programs/drought/drought_tools_methods/delta_method.html

# Import necessary packages
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from datetime import datetime, timedelta
import glob
import calendar

import os

In [None]:
# Function to check if output folders exist
def check_folder(folder_path):
    # Check if the folder exists
    if not os.path.exists(folder_path):
        print(f"Warning: Required folder '{folder_path}' does not exist. Please ensure 1)Supply has been run successfully.")
    else:
        print(f"Folder '{folder_path}' already exists.")

# Check folders
check_folder('./intermediate-outputs')
check_folder('./output-data')

## A)  Water Rights Demands
### i) User-Specified Analysis Inputs

In [None]:
# Import Start and End Date, and Exceedance variables from 1.Supply script
try:
    %store -r start_date
    %store -r end_date
    %store -r user_excd
except NameError as e:
    print(f"Warning: {str(e)}. Please ensure 1)Supply has been run successfully.")

In [None]:
# Specify the year of reported demand data that will be used (currently only 2018 or 2019 available)
while True:
    demand_year = input("Enter a Demand Year (2018 or 2019): \n")
    
    if demand_year == "2018" or demand_year == "2019":
        break  # Exit the loop if the user enters a valid value
    
    print("Invalid input. Please enter 2018 or 2019.")

print("You've selected Demand Year", demand_year)

In [None]:
# Specify if Enhanced Reporting demands will be used in the analysis
while True:
    use_enhanced_reporting = input("Use Enhanced Reporting? Please enter 'yes' or 'no': ").strip().upper()
    
    if use_enhanced_reporting == 'YES' or use_enhanced_reporting == 'NO':
        print("You've selected", use_enhanced_reporting, "on using Enhanced Reporting data")
        break
    else:
        print("Invalid input. Please try again.")

In [None]:
scenario_inputs = pd.DataFrame({
    'Start Date': [start_date],
    'End Date': [end_date],
    '% Exceedance': [user_excd],
    'Demand Year': [demand_year],
    'Use Enhanced Reporting?': [use_enhanced_reporting]
})
#Export all user-specified Variables
scenario_inputs.to_csv('./output-data/Scenario_Inputs.csv', index=False)

### ii) Import Water Rights dataset

In [None]:
# Input dataset can contain different water rights, but expected headers should match sample input
water_rights = pd.read_csv('../user-inputs/WaterRights.csv')
water_rights.dropna(how='all', inplace=True)

### iii) Import Demands dataset

In [None]:
# Demand dataset must contain the same water rights as the Water Rights dataset, with expected headers
demand = pd.read_csv('../user-inputs/Demands.csv')

#### Filter 2018 and 2019 Demand Data

In [None]:
# Dynamically selects and renames demand data columns based on the specified year
def select_and_rename_demand_columns(demand_df, year):
    dir_pattern = f'DIR_QAQC_{year}_{{month:02d}}_AF'
    stor_pattern = f'STOR_QAQC_{year}_{{month:02d}}_AF'
    
    columns_selection = ['APPL_ID'] + [dir_pattern.format(month=month) for month in range(1, 13)] + [stor_pattern.format(month=month) for month in range(1, 13)]
    demand_selected = demand_df[columns_selection].copy()

    new_column_names = {**{dir_pattern.format(month=month): f'{calendar.month_abbr[month]} Direct (AF)' for month in range(1, 13)},
                        **{stor_pattern.format(month=month): f'{calendar.month_abbr[month]} Storage (AF)' for month in range(1, 13)}}
    
    # Rename columns
    demand_selected.rename(columns=new_column_names, inplace=True)
    return demand_selected

### iv) Import Enhanced Reporting Data

In [None]:
'''
All months of Enhanced Reporting data that are available are imported.
If a water right appears in a month’s Enhanced Reporting dataset and its NO_CHANGES value = FALSE,
then the Enhanced Reporting value (direct or storage) is used for that month’s demand. 
'''
# Grab demand data based on specified Demand Year 
period = select_and_rename_demand_columns(demand, demand_year)

if use_enhanced_reporting == 'YES':
    # Define available months of Enhanced Reporting data (November 2022-March 2023)
    update_columns = ['Direct (AF)', 'Storage (AF)']
    update_months = ['Nov', 'Dec', 'Jan', 'Feb', 'Mar']
    all_update_cols = [f'{month} {col}' for month in update_months for col in update_columns]

    # Identify the months within the date range that are available for enhanced reporting
    analysis_months = [datetime.strftime(date, "%b") for date in pd.date_range(start_date, end_date, freq='M').unique()]
    analysis_months = [month for month in analysis_months if month in update_months]

    # Path to quality-controlled Enhanced Reporting data
    path = '../user-inputs/Enhanced Reporting/*.csv'
    
    # Loop through each available Enhanced Reporting dataset
    for file_path in glob.glob(path):
        df = pd.read_csv(file_path, encoding='cp1252')
        file_months = set(col.split(' ')[0] for col in df.columns if any(month in col for month in update_months))
        
        if not file_months.intersection(set(analysis_months)):
            continue  # Skip this file if its month is not within the analysis_months

        # Loop through the demand data to replace with Enhanced Reporting data where appropriate
        for update_col in [col for col in all_update_cols if col.split(' ')[0] in analysis_months]:
            month, data_type = update_col.split(' ', 1)
            for index, row in df.iterrows():
                appl_id = row['Application ID']
                if not row.get('No Changes?', True): 
                    period.loc[period['APPL_ID'] == appl_id, update_col] = row[update_col]
                else:
                    demand_2018 = select_and_rename_demand_columns(demand, "2018")
                    demand_2018 = demand_2018.loc[demand_2018['APPL_ID'] == appl_id]
                    if not demand_2018.empty:
                        period.loc[period['APPL_ID'] == appl_id, update_col] = demand_2018[update_col].values


### v) Calculate Period Direct and Period Storage Values

In [None]:
# Columns for direct and storage demands can be identified by their naming convention.
direct_columns = [col for col in period.columns if 'Direct' in col]
storage_columns = [col for col in period.columns if 'Storage' in col]

# Selecting appropriate columns for calculation based on their type
period_direct = period[['APPL_ID'] + direct_columns]
period_storage = period[['APPL_ID'] + storage_columns]

In [None]:
# Function to rename columns to match previous conventions
def rename_to_month_abbr(df):
    new_column_names = {col: col.split(' ')[0] for col in df.columns if col != 'APPL_ID'}
    return df.rename(columns=new_column_names)

# Apply the function to both DataFrames
period_direct = rename_to_month_abbr(period_direct)
period_storage = rename_to_month_abbr(period_storage)

#### Prorate both Period Direct and Period Storage

In [None]:
# Function that intakes the specified Start and End Dates to calculate a multiplier
# The multiplier is used to prorate multiple months of data down to a single value for the specified dates
def calculate_prorated_constant(start_date, end_date, constants):
    # Get the number of days in a month
    def days_in_month(year, month):
        return calendar.monthrange(year, month)[1]

    # Initialize the prorated_value
    prorated_value = 0.0

    current_date = start_date
    # Add prorated value for the start month
    prorated_value += (days_in_month(start_date.year, start_date.month) - start_date.day + 1) / days_in_month(start_date.year, start_date.month) * constants[start_date.month - 1]

    # Increment month by month until the end date is reached
    while True:
        next_month_first_day = (current_date.replace(day=28) + timedelta(days=4)).replace(day=1) # To safely get the first day of next month
        if next_month_first_day >= end_date:
            break
        
        # Add prorated value for full month or fraction for partial month
        if next_month_first_day.month == end_date.month:
            prorated_value += end_date.day / days_in_month(end_date.year, end_date.month) * constants[end_date.month - 1]
        else:
            prorated_value += constants[next_month_first_day.month - 1]
        current_date = next_month_first_day

    return prorated_value

In [None]:
# Call the proration function to transform monthly Direct values into a single Period Direct value based on specified Start and End Dates
period_direct_prorated = []
period_direct_dict = period_direct.set_index('APPL_ID').T.to_dict('list')

for identifier, constants in period_direct_dict.items():
    constant = calculate_prorated_constant(start_date, end_date, constants)
    period_direct_prorated.append({'APPL_ID': identifier, 'DIRECT_PERIOD': constant})

period_direct_prorated = pd.DataFrame(period_direct_prorated)

#display(period_direct_prorated)

In [None]:
# Call the proration function to transform monthly Storage values into a single Period Storage value based on specified Start and End Dates
period_storage_prorated = []
period_storage_dict = period_storage.set_index('APPL_ID').T.to_dict('list')

for identifier, constants in period_storage_dict.items():
    constant = calculate_prorated_constant(start_date, end_date, constants)
    period_storage_prorated.append({'APPL_ID': identifier, 'STORAGE_PERIOD': constant})

period_storage_prorated = pd.DataFrame(period_storage_prorated)

#display(period_storage_prorated)

## B)  POD Demands
### i) Import POD dataset

In [None]:
# POD dataset must contain the same water rights as the Water Rights dataset, with expected headers
pods = pd.read_csv('../user-inputs/PODs.csv') 

### ii) Import Consumptive Use Factors (CUF) dataset

In [None]:
# Consmuptive Use Factors are used to reduce Direct demands to acocunt for return flows by month and subwatershed
use_factors = pd.read_csv('../user-inputs/ConsumptiveUseFactors.csv')

# Rename columns to match previous conventions
use_factors = use_factors.rename(columns={
    **{f'{i:02d}_RATIO': calendar.month_abbr[i] for i in range(1, 13)}
})

### iii) Calculate a Period CUF based on user-specified Start/End Dates

In [None]:
# Call the proration function to transform monthly CUF valuess into single Period CUF values based on specified Start and End Dates
prorated_cuf = []

for subwatershed in use_factors['SUBWATERSHED']:
    constant = calculate_prorated_constant(start_date, end_date, use_factors.loc[use_factors['SUBWATERSHED'] == subwatershed].drop('SUBWATERSHED', axis=1).squeeze())
    prorated_cuf.append({'Period_CUF_Constant': constant, 'SUBWATERSHED': subwatershed})

period_cuf = pd.DataFrame(prorated_cuf)

#display(period_cuf)

### iv) POD Calculations

###### 1) POD Period Direct

In [None]:
# Determine Direct demands for each POD by merging from the water right period demand dataset based on its Application ID
merged_df = pd.merge(pods, period_direct_prorated[['APPL_ID', 'DIRECT_PERIOD']], on='APPL_ID')

# Multiply POD Direct demands by CUF values based on 'SUBWATERSHED'
final_df = pd.merge(merged_df, period_cuf, how='left', on='SUBWATERSHED')

# Weight the water right's CUF-reduced direct demand to the individual POD
final_df['DIRECT_PERIOD_POD'] = final_df['WEIGHT_DIR'] * final_df['DIRECT_PERIOD'] * final_df['Period_CUF_Constant']

# Create the final POD Direct demand dataframe with the desired columns
direct_period_pod = final_df[['APPL_ID', 'POD_ID', 'WATERSHED', 'SUBWATERSHED', 'LEGAL_DELTA', 'WEIGHT_DIR', 'DIRECT_PERIOD', 'Period_CUF_Constant', 'DIRECT_PERIOD_POD']]

###### 2) POD Period Storage

In [None]:
# Determine Storage demands for each POD by merging from the water right period demand dataset based on its Application ID
merged_df2 = pd.merge(pods, period_storage_prorated[['APPL_ID', 'STORAGE_PERIOD']], on='APPL_ID')

# Storage demands are not multiplied by CUF factors since diversions to storage do not produce return flows

# Weight the water right's storage demand to the individual POD
merged_df2['STORAGE_PERIOD_POD'] = merged_df2['WEIGHT_STOR'] * merged_df2['STORAGE_PERIOD']

# Create the final POD Storage demand dataframe with the desired columns
storage_period_pod = merged_df2[['APPL_ID', 'POD_ID', 'WATERSHED', 'SUBWATERSHED', 'STORAGE_PERIOD', 'WEIGHT_STOR', 'STORAGE_PERIOD_POD']]

###### 3) POD Demand

In [None]:
# Combine the POD Direct and Storage dataframes to create a final POD Period Demand dataframe
demand_period_pod = pd.DataFrame()
demand_period_pod = pods[['APPL_ID','POD_ID','WATERSHED', 'SUBWATERSHED', 'LEGAL_DELTA']].copy()
demand_period_pod['DIRECT_PERIOD_POD'] = direct_period_pod['DIRECT_PERIOD_POD']
demand_period_pod['STORAGE_PERIOD_POD'] = storage_period_pod['STORAGE_PERIOD_POD']
demand_period_pod['DEMAND_PERIOD_POD'] = direct_period_pod['DIRECT_PERIOD_POD'] + storage_period_pod['STORAGE_PERIOD_POD']

#display(demand_period_pod)

In [None]:
# Add 'WATER_RIGHT_TYPE_CUSTOM' column from water_rights dataframe
demand_period_pod = demand_period_pod.merge(water_rights[['APPL_ID', 'WATER_RIGHT_TYPE_CUSTOM']], on='APPL_ID', how='left')

# Export total POD Period Demands, to be called in 3.Analysis script
demand_period_pod.to_csv('./intermediate-outputs/Demands_Period_POD.csv', index=False)

## C) Analysis Preparation

### Create the Analysis Dataset

In [None]:
# Create the Analysis dataset by flattening the PODs dataset to only rows that have a unique combination of Appl_ID, Watershed, Subwatershed, and Legal_Delta values
selected_columns = ['APPL_ID', 'POD_ID', 'WATERSHED', 'SUBWATERSHED', 'LEGAL_DELTA', 'WEIGHT_DIR', 'WEIGHT_STOR']
flat_df = pods[selected_columns].copy()

# Compute the sum of direct and storage weights for each Watershed-Subwatershed-Legal_Delta combination
sum_weights = flat_df.groupby(['APPL_ID', 'WATERSHED', 'SUBWATERSHED', 'LEGAL_DELTA'], as_index=False).agg({
    'WEIGHT_DIR': 'sum',
    'WEIGHT_STOR': 'sum'
})

'''
If the sum of Direct and Storage Weights is zero, remove the record from the Analysis dataset. 
This is necessary because areas with only points of rediversion or inactive 
PODs are not considered when evaluating water unavailability.
'''

sum_weights = sum_weights.loc[(sum_weights['WEIGHT_DIR'] != 0) | (sum_weights['WEIGHT_STOR'] !=0)]

In [None]:
# Assign Priority Date, Primary Owner, Water Right Type, values from WaterRights dataset
analysis_df = pd.merge(water_rights, sum_weights, on='APPL_ID', how='left')

# Remove any records with a Priority Date of Riparian from the Analysis dataset, as these will be evaluated collectively
analysis_df = analysis_df.loc[analysis_df['PRIORITY_DATE_CUSTOM'] != 'Riparian']
analysis_df = analysis_df.reset_index(drop=True)

In [None]:
# Sort records based on Subwatershed and Legal Delta as listed below
sort_order = {
    ('Sacramento Bend', False): 1,
    ('Upper Sacramento Valley', False): 2,
    ('Stony', False): 3,
    ('Cache', False): 4,
    ('Upper Feather', False): 5,
    ('Yuba', False): 6,
    ('Bear', False): 7,
    ('Upper American', False): 8,
    ('Sacramento Valley Floor', False): 9,
    ('Upper San Joaquin', False): 10,
    ('Fresno', False): 11,
    ('Chowchilla', False): 12,
    ('Merced', False): 13,
    ('Tuolumne', False): 14,
    ('San Joaquin Valley Floor', False): 15,
    ('Putah', False): 16,
    ('Stanislaus', False): 17,
    ('Calaveras', False): 18,
    ('Mokelumne', False): 19,    
    ('Cosumnes', False): 20,
    ('Putah', True): 21,
    ('Stanislaus', True): 22,
    ('Calaveras', True): 23,
    ('Mokelumne', True): 24,    
    ('Cosumnes', True): 25,
    ('Sacramento Valley Floor', True): 26,
    ('San Joaquin Valley Floor', True): 27,
}

# Create a custom sorting key for subwatershed and Legal Delta
def custom_sort_key2(row):
    return sort_order.get((row['SUBWATERSHED'], row['LEGAL_DELTA']), 0)

# Apply the custom sorting key to create a sort column
analysis_df['SORT_KEY'] = analysis_df.apply(custom_sort_key2, axis=1)

# Sort the DataFrame by the custom sort key and then by APPL_ID
analysis_df = analysis_df.sort_values(by=['SORT_KEY', 'APPL_ID'])

# Drop the temporary column used for sorting
analysis_df.drop(columns=['SORT_KEY'], inplace=True)

# Reset index if needed
analysis_df = analysis_df.reset_index(drop=True)


In [None]:
# Sort records based on Application ID then Priority Date
# If 2 records have the same priority date, the first numbered application is assumed to be senior
analysis_df = analysis_df.sort_values(by=['PRIORITY_DATE_CUSTOM', 'APPL_ID'])

In [None]:
# Create a custom sorting key to ensure Project-priority records go second-to-last and Pending-priority records go last
def custom_sort_key(date):
    if date == 'Project':
        return 2
    elif date == 'Pending':
        return 3
    else:
        return 1

# Apply the key to Priority Date
analysis_df['SORT_KEY'] = analysis_df['PRIORITY_DATE_CUSTOM'].apply(custom_sort_key)

# Sort the dataframe by Application ID then Sort Key
sorted_analysis_df = analysis_df.sort_values(by=['SORT_KEY', 'PRIORITY_DATE_CUSTOM', 'APPL_ID'])

# Drop the temporary column used for sorting
sorted_analysis_df.drop(columns=['SORT_KEY'], inplace=True)

sorted_analysis_df = sorted_analysis_df.reset_index(drop=True)

In [None]:
# Function to map a Headwater (T/F) value based on the Subwatershed
def assign_headwater(subwatershed):
    if subwatershed in ['Upper Sacramento Valley', 'Sacramento Valley Floor', 'San Joaquin Valley Floor']:
        return False
    else:
        return True
    
# Create a new Headwater column
sorted_analysis_df['HEADWATER'] = sorted_analysis_df['SUBWATERSHED'].apply(assign_headwater)

# Re-arrange columns to match previous conventions
analysis_dataset = sorted_analysis_df[['WATERSHED', 'SUBWATERSHED', 'APPL_ID', 'PRIMARY_OWNER_NAME', 'WATER_RIGHT_TYPE_CUSTOM', 'PRIORITY_DATE_CUSTOM', 'LEGAL_DELTA', 'HEADWATER']]

In [None]:
# The first 81 rows of the Analysis dataset represent aggregations of Riparian-priority claims by sub-type and subwatershed
# Import pre-defined aggregation list
prior = pd.read_csv('../user-inputs/AnalysisPreparationRiparian.csv')

In [None]:
# Add Watershed values to Riparian-priority records
# Function to map Watershed values based on Subwatershed
def assign_watershed(subwatershed):
    if subwatershed in ['Sacramento Bend', 'Stony', 'Cache', 'Upper Feather', 'Yuba', 'Bear',
                        'Upper American', 'Putah', 'Upper Sacramento Valley', 'Sacramento Valley Floor']:
        return 'Sacramento'
    else:
        return 'San Joaquin'
    
# Create a new Watershed column
prior['WATERSHED'] = prior['SUBWATERSHED'].apply(assign_watershed)

In [None]:
# Append the list of aggregated Riparian-priority claims at the beginning of the Analysis dataset
analysis_dataset = pd.concat([prior, analysis_dataset], keys=['df1', 'df2'], ignore_index=True)
analysis_dataset = analysis_dataset[['WATERSHED', 'SUBWATERSHED', 'APPL_ID', 'PRIMARY_OWNER_NAME', 'WATER_RIGHT_TYPE_CUSTOM', 'PRIORITY_DATE_CUSTOM', 'LEGAL_DELTA', 'HEADWATER']]

In [None]:
# Export intermediate results for use in 3.Analysis script
analysis_dataset.to_csv('./intermediate-outputs/Analysis_Dataset_Pre.csv', index=False)