In [None]:
import premise as ps
import gc
import bw2data as bd

def premise_generator(
    base_db: str,
    base_db_version: str,
    scenarios: list[dict],
    overwrite_bw_db: bool = False,
    keep_uncertainty_data: bool = True,
    use_multiprocessing: bool = True,
    batch_size: int = 5,
    encryption_key: str = None,
) -> None:
    """
    This method generates a premise database for a list of scenarios.

    Parameters
    ----------
    base_db : str
        The name of the base database to use.
    base_db_version : str
        The version of the base database to use.
    scenarios : list
        A list of scenarios, where each scenario is a dictionary of the form:
        {
            "model": str,
            "pathway": str,
            "year": int
        }
    overwrite_bw_db : bool, optional
        Whether or not to overwrite existing databases, by default False
    keep_uncertainty_data : bool, optional
        Whether or not to keep uncertainty data, by default True
    use_multiprocessing : bool, optional
        Whether or not to use multiprocessing, by default True
    batch_size : int, optional
        The number of scenarios to process in each batch, by default 5
        Batches are processed sequentially to avoid memory issues, so the larger the batch size, the longer the processing time.
    """


    if not encryption_key:
        raise ValueError(
            "Encryption key not found. Please ask the maintainers for a .env file with an encryption key."
        )

    valid_scenarios = _validate_scenarios(scenarios, overwrite_bw_db)

    # Process scenarios in batches
    for i in range(0, len(valid_scenarios), batch_size):
        batch_scenarios = valid_scenarios[i : i + batch_size]
        _process_batch(
            batch_scenarios,
            base_db,
            base_db_version,
            encryption_key,
            use_multiprocessing,
            keep_uncertainty_data,
        )

def _validate_scenarios(
    scenarios: list, overwrite_bw_db: bool = False
) -> list:
    """
    This method takes a list of scenarios and checks if they are already in the Brightway database.
    If they are, it removes them from the list of scenarios.
    This is done to avoid overwriting existing databases. and to avoid re-calculating LCI databases that already exist.

    Parameters
    ----------
    scenarios : list
        A list of scenarios, where each scenario is a dictionary of the form:
        {
            "model": str,
            "pathway": str,
            "year": int
        }
    overwrite_bw_db : bool, optional
        Whether or not to overwrite existing databases, by default False

    Returns
    -------
    list
        A list of scenarios that are not already in the Brightway database.
    """
    valid_scenarios = []
    for scenario in scenarios:
        name = _get_scenario_names(scenario)
        if name in bd.databases:
            if overwrite_bw_db:
                print(f"Database {name} already exists, removing.")
                bd.Database(name).delete()
            else:
                print(f"Database {name} already exists, skipping.")
                continue
        valid_scenarios.append(scenario)
    return valid_scenarios

def _process_batch(
    batch_scenarios: list[dict],
    base_db: str,
    base_db_version: str,
    encryption_key: str,
    use_multiprocessing: bool,
    keep_uncertainty_data: bool,
) -> None:
    """
    This method processes a batch of scenarios.

    Parameters
    ----------
    batch_scenarios : list
        A list of scenarios, where each scenario is a dictionary of the form:
        {
            "model": str,
            "pathway": str,
            "year": int
        }
    base_db : str
        The name of the base database to use.
    base_db_version : str
        The version of the base database to use.
    encryption_key : str
        The encryption key to use.
    use_multiprocessing : bool
        Whether or not to use multiprocessing.
    keep_uncertainty_data : bool
        Whether or not to keep uncertainty data.
    """
    names_list = _get_a_list_of_scenario_names(batch_scenarios)
    print("Generating premise database for the current batch.")
    ndb = ps.NewDatabase(
        scenarios=batch_scenarios,
        source_db=base_db,
        source_version=base_db_version,
        key=encryption_key,
        use_multiprocessing=use_multiprocessing,
        keep_uncertainty_data=keep_uncertainty_data,
    )

    ndb.update_all()
    print("Writing database to brightway")
    ndb.write_db_to_brightway(names_list)
    del ndb
    gc.collect()

def _get_a_list_of_scenario_names(scenarios: list[dict]) -> list[str]:
    """
    Takes a list of dictionaries and returns a list of formatted strings.

    Each dictionary in the list should have the keys 'model', 'pathway', and 'year'.
    The function returns a list of strings in the format "pathway-model-year".

    Args:
    scenarios (list of dict): A list of dictionaries with keys 'model', 'pathway', and 'year'.

    Returns:
    list of str: A list of formatted strings.
    """

    return [
        f"{scenario['pathway']}_{scenario['model']}_{scenario['year']}"
        for scenario in scenarios
    ]
def _get_scenario_names(scenarios: list[dict] | str) -> list[str]:
        """
        Takes a list of dictionaries or one dict and returns a list of formatted strings or one string if not a lst.
        Each dictionary in the list should have the keys 'model', 'pathway', and 'year'.
        The function returns a list of strings in the format "pathway-model-year".
        Args:
        scenarios (list of dict): A list of dictionaries with keys 'model', 'pathway', and 'year'.
        Returns:
        list of str: A list of formatted strings.
        """
        if isinstance(scenarios, list):
            return [
                f"{scenario['pathway']}_{scenario['model']}_{scenario['year']}"
                for scenario in scenarios
            ]
        elif isinstance(scenarios, str):
            return f"{scenarios['pathway']}_{scenarios['model']}_{scenarios['year']}"


In [None]:
import yaml
def read_yaml_file(
    file_path: str = "./data/iam_scenarios.yaml",
) -> dict:
    """Reads a YAML file and returns its contents as a dictionary."""
    try:
        with open(file_path, "r", encoding="utf-8") as stream:
            return yaml.safe_load(stream)
    except FileNotFoundError:
        print(f"The file {file_path} was not found.")
    except yaml.YAMLError as exc:
        print(f"An error occurred while parsing the YAML file: {exc}")
    except Exception as exc:
        print(f"An unexpected error occurred: {exc}")
    return None

In [None]:
import re
from fuzzywuzzy import process

def fuzzy_find_scenario(
    input_str: str,
    scenarios_dict: dict = read_yaml_file("./data/iam_scenarios.yaml"),
    year: int = None,
    auto_generate_years: bool = False,
    start_year: int = 2020,
    end_year: int = 2100,
    increment: int = 5,
) -> list[dict]:
    '''
    This function takes a string as input and returns a list of scenarios that best match the input string.
    The function uses fuzzy matching to find the best match.
    
    Parameters
    ----------
    input_str : str
        The string to match.
    scenarios_dict : dict, optional
        A dictionary of scenarios, by default read from the YAML file "iam_scenarios.yaml".
    year : int, optional
        The year to use for the matched scenarios, by default None.
        if you only want a scenario for one year, set this parameter. and keep auto_generate_years as False.
    auto_generate_years : bool, optional
        Whether or not to generate scenarios for a range of years, by default False.
    start_year : int, optional
        The start year for the range of years, by default 2020.
    end_year : int, optional
        The end year for the range of years, by default 2100.
    increment : int, optional
        The increment for the range of years, by default 5.
        Any smaller value will not help because IAM scenarios are usually available for every 5 years.
        Anything less is just interpolation.
        
    Returns
    -------
    list
        A list of matched scenarios. Setup in the right format to be used in the premise generator.
    '''
    
    input_str = input_str.lower().replace(" ", "")

    # Prepare a list of all model pathways for fuzzy matching
    model_paths = []
    for group, scenarios in scenarios_dict.items():
        for scenario in scenarios:
            for model_key in ["REMIND", "IMAGE"]:
                model_name = scenario.get(model_key)
                if model_name:
                    model_paths.append(model_name)

    # Find the best match using fuzzywuzzy
    best_match, score = process.extractOne(input_str, model_paths)

    # Filter and return the matched scenarios
    def filter_matched_scenarios(best_match):
        matches = []
        for group, scenarios in scenarios_dict.items():
            for scenario in scenarios:
                for model_key in ["REMIND", "IMAGE"]:
                    model_name = scenario.get(model_key)
                    if model_name == best_match:
                        matches.append(
                            {
                                "model": model_key.lower(),
                                "pathway": model_name,
                                "year": year,
                            }
                        )
        return matches

    matched_scenarios = filter_matched_scenarios(best_match)

    if auto_generate_years:
        return [
            {"model": sc["model"], "pathway": sc["pathway"], "year": yr}
            for sc in matched_scenarios
            for yr in range(start_year, end_year + 1, increment)
        ]

    return matched_scenarios

In [None]:
def dual_scenarios(
        yaml_structure: dict, start_year: int, end_year: int, increment: int
    ) -> list[dict]:
        """
        Generates a list of scenarios for both IMAGE and REMIND models for each year increment, excluding 'None' entries.
        It returns only the scenarios or pathways that have both IMAGE and REMIND models.
        
        yaml_structure: dict
            A dictionary of scenarios, by default read from the YAML file "iam_scenarios.yaml".
            use read_yaml_file() to read the file. and pass the result to this function.
        start_year: int
            The start year for the range of years.
        end_year: int
            The end year for the range of years.
        increment: int
            The increment for the range of years.
            Any smaller value will not help because IAM scenarios are usually available for every 5 years.
            Anything less is just interpolation.
        """
        scenarios = []
        for year in range(start_year, end_year + 1, increment):
            for _, entries in yaml_structure.items():
                for entry in entries:
                    image_model = entry.get("IMAGE")
                    remind_model = entry.get("REMIND")
                    if image_model and remind_model:
                        if image_model:
                            scenarios.append(
                                {"model": "image", "pathway": image_model, "year": year}
                            )
                        if remind_model:
                            scenarios.append(
                                {
                                    "model": "remind",
                                    "pathway": remind_model,
                                    "year": year,
                                }
                            )
        return scenarios
