In [None]:
# | default_exp modeling_options_utils

In [None]:
# | hide
from nbdev.showdoc import *
from fastcore.test import *
from fastcore.utils import *

In [None]:
# | export
import os
import random
import shutil
import datetime
import collections
import numpy as np
import pandas as pd
from typing import Dict
from pathlib import Path, PosixPath
from pydantic import ValidationError
from pysureau.pysureau_utils import dict_to_csv
from pysureau.parameter_validators import ModelingOptionsParameterValidator

In [None]:
# | export


def create_empty_modeling_options_file(
    path: Path,  # Path to the folder where the parameter files will be saved. If set to None then the files will be saved at the current working directory
) -> Dict:  # Return two dictionary file for user input
    "Function for creating the CSV templates necessary for the modeling options file"

    # Assert parameters ---------------------------------------------------------
    assert isinstance(path, str) | isinstance(path, PosixPath), (
        f'Input path must be a str, not a {type(path).__name__}'
    )

    # Convert string to Path if provided ----------------------------------------
    path = Path(path)

    # Main
    if os.path.exists(path):
        modeling_options_params = {
            'eord': 1,
            'lcav': 1,
            'scav': 1,
            'latitude': 'NA',
            'longitude': 'NA',
            'year_end': 'NA',
            'year_start': 'NA',
            'time_step_for_evapo': 1,
            'threshold_mortality': 90,
            'transpiration_granier_arg_c': 0,
            'custom_small_time_step_in_sec': 600,
            'transpiration_granier_arg_b': 0.134,
            'transpiration_granier_arg_a': -0.006,
            'print_prog': True,
            'defoliation': False,
            'constant_climate': False,
            'output_overwrite': False,
            'output_type': 'simple',
            'pet_formulation': 'pt',
            'rn_formulation': 'linacre',
            'numerical_scheme': 'implicit',
            'resolution_output': 'subdaily',
            'transpiration_model': 'jarvis',
            'comp_options_for_evapo': 'normal',
            'stomatal_reg_formulation': 'sigmoid',
            'output_path': '~/pysureau_project_EDITME/2_model_outputs',
        }
        # Write to CSV file
        dict_to_csv(
            path=path,
            filename='modeling_options.csv',
            dictionary=modeling_options_params,
        )

    else:
        raise ValueError('Failed creating empty modeling options file')

In [None]:
# | export


def read_modeling_options_file(
    file_path: Path,  # Path to the sureau_parameter_files folder containing the csv files with parameter values i.e path/to/sureau_parameter_files/file_name.csv
    sep: str = ',',  # CSV file separator can be ',' or ';'
) -> Dict:
    "Create a dictionary containing modeling options that can be used as an input in run.SurEauR"  #

    # Assert parameters ---------------------------------------------------------

    assert os.path.exists(file_path), (
        f'sureau_parameter_files folder not found at {file_path}. Save the CSV inside the parameter_files folder in the pysureau project'
    )

    # Read and validate dataframe -----------------------------------------------

    # Read CSV
    modelling_options_data = pd.read_csv(file_path, sep=sep, header=0)

    # Validate the column names of soil data are parameter_name, parameter_value
    # To correctly transform the the CSV file as dict later
    if modelling_options_data.columns.tolist() != [
        'parameter_name',
        'parameter_value',
    ]:
        raise ValueError(
            'Column names in modelling options file must be called parameter_name and parameter_value'
        )

    # Transform dataframe into dictionary ---------------------------------------
    modelling_options_dict = modelling_options_data.set_index(
        'parameter_name'
    ).to_dict()['parameter_value']

    # Loop over dictionary to transform the data types --------------------------
    # If this is not done all values will be considered str

    parameters_of_class_str = [
        'print_prog',
        'defoliation',
        'constant_climate',
        'output_overwrite',
        'output_type',
        'pet_formulation',
        'rn_formulation',
        'numerical_scheme',
        'resolution_output',
        'transpiration_model',
        'comp_options_for_evapo',
        'stomatal_reg_formulation',
        'output_path',
    ]
    
    parameters_of_class_bool = ['eord', 'lcav', 'scav']

    # Loop over all keys
    for each_key in modelling_options_dict.keys():

        if each_key in parameters_of_class_str:
            
            # If value is in parameters_of_class_str then transform to str
            modelling_options_dict[each_key] = str(
                modelling_options_dict[each_key]
            )
        
        # Special case. Done in this way beacuse param can be an integer or a str
        elif each_key == 'time_step_for_evapo':
            
            if modelling_options_dict[each_key] == "variable":
                modelling_options_dict[each_key] = str(modelling_options_dict[each_key])
                
            else:        
                modelling_options_dict[each_key] = int(modelling_options_dict[each_key])
                
             
        elif each_key in parameters_of_class_bool:
            
            # If value is in parameters_of_class_bool then transform to int
            # Done in this way because values only can be 0 or 1 and if I do as 
            # bool(modelling_options_dict[each_key]) instead of 
            # int(modelling_options_dict[each_key]) a value of 2 will return 
            # True
            modelling_options_dict[each_key] = int(
                modelling_options_dict[each_key]
            )
        
        else:
            # Transform parameters values to float
            modelling_options_dict[each_key] = float(
                modelling_options_dict[each_key]
            )

    # Validate, raise error if modelling_options_dict don't follow the Schema ---
    try:
        ModelingOptionsParameterValidator.model_validate(modelling_options_dict)

    except ValidationError as error:
            raise (error)
        
    # Compare end_year_simulation is larger than start_year_simulation
    assert (
        modelling_options_dict['year_start'] <= modelling_options_dict['year_end']
        ), f'year_start ({modelling_options_dict['year_start']}) is larger than year_end ({modelling_options_dict['year_end']})'


    return modelling_options_dict

In [None]:
#read_modeling_options_file(
#    '/tmp/pysureau_project_fG7GxSSG/1_parameter_files/modeling_options.csv'
#)

In [None]:
# | hide
#   
#   # Create array with time steps for the evapo --------------------------------
#   
#   if time_step_for_evapo == "variable":
#       time = np.array([0, 6, 12, 14, 16, 22])
#       raise ValueError('time_step_for_evapo set to "variable". This has not been implemented yet')#
#   elif time_step_for_evapo != "variable":
#       time = np.arange(0, 24, time_step_for_evapo, dtype=int)#
#   # Create comp_options -------------------------------------------------------
#   comp_options = collections.defaultdict(list)
#
#   # Every 10min, 6min, 3min, 1min
#   if comp_options_for_evapo == 'normal':
#           # Add key value pairs to the comp_dictionary
#           comp_options['numerical_scheme'] = numerical_scheme
#           comp_options['nsmalltimesteps'] = time_step_for_evapo * np.array(
#               [6, 10, 20, 60]
#           )
#           comp_options['lsym'] = 1
#           comp_options['ssym'] = 1
#           comp_options['clapo'] = 1
#           comp_options['ctapo'] = 1
#           comp_options['eord'] = eord
#           comp_options['lcav'] = lcav
#           comp_options['scav'] = scav#
#   # every 10 seconds
#   if comp_options_for_evapo == 'accurate':
#           comp_options['numerical_scheme'] = numerical_scheme
#           comp_options['nsmalltimesteps'] = time_step_for_evapo * np.array([600])
#           comp_options['lsym'] = 1
#           comp_options['ssym'] = 1
#           comp_options['clapo'] = 1
#           comp_options['ctapo'] = 1
#           comp_options['eord'] = eord
#           comp_options['lcav'] = lcav
#           comp_options['scav'] = scav#
#   # every hours, every 10 min
#   if comp_options_for_evapo == 'fast':
#           comp_options['numerical_scheme'] = numerical_scheme
#           comp_options['nsmalltimesteps'] = time_step_for_evapo * np.array([
#               1, 6]
#           )
#           comp_options['lsym'] = 1
#           comp_options['ssym'] = 1
#           comp_options['clapo'] = 1
#           comp_options['ctapo'] = 1
#           comp_options['eord'] = eord
#           comp_options['lcav'] = lcav
#           comp_options['scav'] = scav#
#   # every customSmallTimeStepInSec
#   if comp_options_for_evapo == 'custom':
#
#           comp_options['numerical_scheme'] = numerical_scheme#
#           comp_options['nsmalltimesteps'] = (
#               (time_step_for_evapo * 3600) / custom_small_time_step_in_sec
#           )
#           comp_options['lsym'] = 1
#           comp_options['ssym'] = 1
#           comp_options['clapo'] = 1
#           comp_options['ctapo'] = 1
#           comp_options['eord'] = eord
#           comp_options['lcav'] = lcav
#           comp_options['scav'] = scav#
#   # Create empty dictionary for storing modeling options ----------------------
#   modeling_options = collections.defaultdict(list)
#
#   # Append parameters to dictionary
#   modeling_options['year_start'] = year_start
#   modeling_options['year_end'] = year_end
#   modeling_options['resolution_output'] = resolution_output
#   modeling_options['output_type'] = output_type
#   modeling_options['constant_climate'] = constant_climate
#   modeling_options['pet_formulation'] = pet_formulation
#   modeling_options['rn_formulation'] = rn_formulation
#   modeling_options['time_step_for_evapo'] = time_step_for_evapo
#   modeling_options['time'] = time
#   modeling_options['latitude'] = latitude
#   modeling_options['longitude'] = longitude
#   modeling_options['comp_options'] = comp_options
#   modeling_options['stomatal_reg_formulation'] = stomatal_reg_formulation
#   modeling_options['defoliation'] = defoliation
#   modeling_options['threshold_mortality'] = threshold_mortality
#   modeling_options['transpiration_model'] = transpiration_model
#   modeling_options['print_prog'] = print_prog
#   modeling_options['stop_simulation_dead_plant'] = print_prog
#   modeling_options['transpiration_granier_args'] = collections.defaultdict(list,{"a":transpiration_granier_arg_a,
#                                                                                  "b":transpiration_granier_arg_b,
#                                                                                  "c":transpiration_granier_arg_c})
#   # Create folder for storing output ------------------------------------------
#   # Create random number for naming folder
#   random_number = random.randint(1, 10000)
#
#   # Join path for storing output
#
#   output_path = os.path.join(output_path, f'sureau_output_{datetime.datetime.now().strftime("%Y_%m_%d")}_{random_number}')
#   modeling_options['output_path'] = output_path #
#   # New folder
#   if not os.path.exists(output_path):
#       # Create folder
#       os.mkdir(output_path)
#       print(f'Directory for storing output created at {output_path}')#
#   # Overwrite folder
#   elif os.path.exists(output_path) and output_overwrite is True:
#       shutil.rmtree(output_path)
#       os.makedirs(output_path)
#       modeling_options['output_path'] = output_path
#       print(f'Directory ({output_path}) for storing output overwritten')#
#   # Errors
#   elif os.path.exists(output_path) and output_overwrite is False:
#       raise ValueError(
#           "File already exists and 'output_overwrite' option is set to False, change the 'output_path' or set 'overwrite' to True"
#       )#
#   else:
#       raise ValueError(
#           'Error creating folder in modeling_options function'
#       )
#
#   return modeling_options