## Import necessary libraries

In [100]:
from __future__ import annotations

import json
from pathlib import Path

import numpy as np
import pandas as pd

from openpyxl import Workbook
from openpyxl.utils.dataframe import dataframe_to_rows
from openpyxl.styles import PatternFill, Font, Alignment, NamedStyle
from openpyxl.utils import get_column_letter
from openpyxl.formatting.rule import CellIsRule

## Convert input and output JSON files to dataframes

In [112]:
def check_is_file(*filepaths):
    files_not_found = []
    for filepath in filepaths:
        _filepath = Path(filepath)
        if not _filepath.is_file():
            files_not_found.append(filepath)
    if len(files_not_found) > 0:
        plural = "" if len(files_not_found) == 1 else "s"
        past_tense_form = "was" if len(files_not_found) == 1 else "were"
        exception_message = (
            f"The following file{plural} {past_tense_form}n't found: "
            + ", ".join(files_not_found)
        )
        raise FileNotFoundError(exception_message)


# Helper function to autofit column widths with a minimum width
def autofit_columns(ws):
    for col in ws.columns:
        max_length = 0
        column = col[0].column_letter  # Get the column name
        for cell in col:
            if cell.value:
                max_length = max(max_length, len(str(cell.value)))
        adjusted_width = max(max_length + 2, 10)  # Minimum width set to 10
        ws.column_dimensions[column].width = adjusted_width


# Convert input and output JSON files to dataframes
def explode_quality_rows(
    df: pd.DataFrame,
    quality_col_prefix: str = "quality_",
) -> pd.DataFrame:
    """
    Explode the `'quality_*'` columns into rows.

    Parameters
    ----------
    df : pd.DataFrame
        The dataframe with the output specifications with the quality columns
        that contain dictionaries of quality parameters to be extracted.
    quality_col_prefix : str, default='quality_'
        The prefix of the quality columns that contain the output pile specifications
        that need to be extracted into different columns.

    Returns
    -------
    pd.DataFrame
        The `pandas.DataFrame` with the quality dictionaries extracted into
        new columns.
    """
    # Create a new DataFrame to store exploded rows
    exploded_df = pd.DataFrame()

    for index, row in df.iterrows():
        quality_dict_list = [
            value for key, value in row.items() if key.startswith(quality_col_prefix)
        ]

        # Convert the list of quality dictionaries into a DataFrame
        quality_df = pd.DataFrame(quality_dict_list)

        # Repeat the original columns for each exploded row
        repeated_columns = pd.DataFrame(
            [
                row.drop(
                    labels=[
                        col for col in df.columns if col.startswith(quality_col_prefix)
                    ]
                )
            ]
            * len(quality_df)
        )

        # Concatenate the repeated columns with the quality columns
        exploded_row_df = pd.concat(
            [
                repeated_columns.reset_index(drop=True),
                quality_df.reset_index(drop=True),
            ],
            axis=1,
        )

        # Append to the exploded DataFrame
        exploded_df = pd.concat([exploded_df, exploded_row_df], ignore_index=True)

    return exploded_df


def assign_engines_to_stockpiles(
    stockpiles_df: pd.DataFrame, engines_df: pd.DataFrame
) -> pd.DataFrame:
    """
    Assign engines to stockpiles based on matching yards and rails.

    Parameters
    ----------
    stockpiles_df : pd.DataFrame
        A `pandas.DataFrame` containing stockpile information including
        'rails' and 'yard'.
    engines_df : pd.DataFrame
        A `pandas.DataFrame` containing engine information including 'yards' and 'rail'.

    Returns
    -------
    pd.DataFrame
        Updated `stockpiles_df` with an 'engines' column listing the assigned engine IDs.
    """
    stockpiles_df["engines"] = [[] for _ in range(stockpiles_df.shape[0])]

    for idx, stockpile in stockpiles_df.iterrows():
        assigned_engines = [
            eng_row["id"]
            for _, eng_row in engines_df.iterrows()
            if stockpile["yard"] in eng_row["yards"]
            and eng_row["rail"] in stockpile["rails"]
        ]
        stockpiles_df.at[idx, "engines"] = assigned_engines

    return stockpiles_df


def extract_quality_ini_values(
    stockpiles_df: pd.DataFrame, quality_prefix: str = "qualityIni"
) -> pd.DataFrame:
    """
    Extract initial quality values from nested dictionaries in the stockpiles DataFrame
    and add them as new columns.

    Parameters
    ----------
    stockpiles_df : pd.DataFrame
        A `pandas.DataFrame` containing stockpile information, including quality parameters.
    quality_prefix : str, default='qualityIni'
        Prefix used in column names for quality-related information.

    Returns
    -------
    pd.DataFrame
        Updated `stockpiles_df` with quality parameters extracted as individual columns.
    """
    quality_cols = stockpiles_df.columns[
        stockpiles_df.columns.str.startswith(quality_prefix)
    ]
    quality_ini_dict = {}

    for column in quality_cols:
        for idx, row in stockpiles_df.iterrows():
            parameter = row[column]["parameter"]
            value = row[column]["value"]
            quality_ini_dict.setdefault(parameter, []).append(value)

    # Add the extracted quality values as new columns and drop the original quality columns
    stockpiles_df = pd.concat(
        [stockpiles_df, pd.DataFrame(quality_ini_dict, index=stockpiles_df.index)],
        axis=1,
    ).drop(columns=quality_cols, errors="ignore")

    return stockpiles_df


def process_stockpiles_and_engines(
    stockpiles_df: pd.DataFrame, engines_df: pd.DataFrame
) -> pd.DataFrame:
    """
    Process stockpiles and engines by assigning engines to stockpiles and extracting initial quality values.

    Parameters
    ----------
    stockpiles_df : pd.DataFrame
        A `pandas.DataFrame` containing stockpile information.
    engines_df : pd.DataFrame
        A `pandas.DataFrame` containing engine information.

    Returns
    -------
    pd.DataFrame
        Processed `stockpiles_df` with engines assigned and quality values extracted.
    """
    stockpiles_df = assign_engines_to_stockpiles(stockpiles_df, engines_df)
    stockpiles_df = extract_quality_ini_values(stockpiles_df)
    return stockpiles_df


def travel_time(grp: pd.DataFrame) -> pd.DataFrame:
    """
    Calculate the travel time between consecutive events within a group.

    This function calculates the time between the end of one event and the start of the next event
    within a grouped DataFrame. If the group contains only one event, the travel time is set to 0.

    Parameters
    ----------
    grp : pd.DataFrame
        A `pandas.DataFrame` containing at least 'start_time' and 'end_time'
        columns. The DataFrame is expected to be pre-grouped by a relevant key
        before being passed to this function.

    Returns
    -------
    pd.DataFrame
        A `pandas.DataFrame` with a single column 'travel_time',
        containing the calculated travel times between consecutive events.
        The index of the returned DataFrame matches the input DataFrame.
    """
    if len(grp) == 1:
        return pd.DataFrame(
            {"travel_time": [grp["start_time"].values[0]]}, index=grp.index
        )
    grp = grp.sort_values(["end_time"])
    end_time = None
    res = []
    for _, row in grp.iterrows():
        _end_time = row["end_time"]
        if end_time is None:
            res.append(row["start_time"])
        else:
            res.append(row["start_time"] - end_time)
        end_time = _end_time
    return pd.DataFrame({"travel_time": res}, index=grp.index)


def json_input_output_to_excel(
    json_input_path: str | Path,
    json_output_path: str | Path,
    excel_path: str | Path | None = None,
):
    
    check_is_file(json_input_path, json_output_path)
    
    if excel_path is None:
        excel_filename = Path(json_output_path).with_suffix(".xlsx").name
        excel_folder_path = Path(json_output_path).parent.parent.joinpath("excel")
        excel_path = str(excel_folder_path.joinpath(excel_filename))
    else:
        excel_folder_path = Path(excel_path).parent

    excel_folder_path.mkdir(exist_ok=True, parents=True)

    # Load JSON files
    # Input file
    with open(json_input_path) as fh:
        instance_data = json.load(fh)
    
    # Output file
    with open(json_output_path) as fh:
        output_data = json.load(fh)
    
    info_df = pd.DataFrame([instance_data["info"]], columns=["Instance_Name", "Capacity", "Yard"])
    engines_df = pd.DataFrame(instance_data["engines"])
    stockpiles_df = pd.DataFrame(instance_data["stockpiles"])
    stockpiles_quality_df = pd.json_normalize(stockpiles_df.pop("qualityIni"), sep="_").add_prefix("qualityIni_")
    stockpiles_df = pd.concat([stockpiles_df, stockpiles_quality_df], axis=1)
    stockpiles_df = process_stockpiles_and_engines(stockpiles_df, engines_df)
    
    inputs_df = pd.DataFrame(instance_data["inputs"])
    inputs_quality_df = pd.json_normalize(inputs_df.pop("quality"), sep="_").add_prefix("quality_")
    inputs_df = pd.concat([inputs_df, inputs_quality_df], axis=1)
    
    # Convert instance_1.json to DataFrames
    outputs_df = pd.DataFrame(instance_data["outputs"])
    outputs_quality_df = pd.json_normalize(outputs_df.pop("quality"), sep="_").add_prefix("quality_")
    outputs_df = pd.concat([outputs_df, outputs_quality_df], axis=1)
    
    # Explode the outputs_df
    outputs_df = explode_quality_rows(outputs_df, quality_col_prefix="quality_").drop(
        columns=["time"], errors="ignore"
    )
    distances_travel_df = pd.DataFrame(instance_data["distancesTravel"])
    time_travel_df = pd.DataFrame(instance_data["timeTravel"])
    
    time_travel_df.columns += 1
    time_travel_df.index += 1
    
    distances_travel_df.columns += 1
    distances_travel_df.index += 1
    
    from_to_list = []
    for col in time_travel_df.columns:
        for idx in time_travel_df.index:
            from_to_list.append([f"{col} -> {idx}", time_travel_df.loc[idx, col]])
    from_to_df = pd.DataFrame(from_to_list, columns=["from_to", "duration"])
    
    engines_df[from_to_df["from_to"].to_list()] = -1
    engines_df[from_to_df["from_to"].to_list()] = engines_df[from_to_df["from_to"].to_list()].astype(float)
    
    for idx, row in engines_df.iterrows():
        yards = row["yards"]
        rail = row["rail"]
        stockpiles = []
        for _, stockpile_row in stockpiles_df.iterrows():
            if stockpile_row["yard"] in yards and rail in stockpile_row["rails"]:
                stockpiles.append(stockpile_row["id"])
        for start_stockpile in stockpiles:
            for end_stockpile in stockpiles:
                column_name = f"{start_stockpile} -> {end_stockpile}"
                duration = from_to_df.loc[
                    from_to_df["from_to"] == column_name, "duration"
                ].values[0]
                engines_df.loc[engines_df.index == idx, column_name] = duration
    
    engines_df[from_to_df["from_to"].to_list()] = engines_df[
        from_to_df["from_to"].to_list()
    ].replace(-1, "")
    
    # Convert out_1.json to DataFrames
    objective_df = pd.DataFrame(
        [{"Objective": output_data["objective"], "Gap": output_data["gap"][0]}]
    )
    stacks_df = pd.DataFrame(output_data["stacks"])
    reclaims_df = pd.DataFrame(output_data["reclaims"])
    
    outputs_df_out = pd.DataFrame(output_data["outputs"])
    outputs_quality_df_out = pd.json_normalize(outputs_df_out.pop("quality"), sep="_").add_prefix("quality_")
    
    outputs_df_out = pd.concat([outputs_df_out, outputs_quality_df_out], axis=1)
    outputs_df_out = explode_quality_rows(outputs_df_out, quality_col_prefix="quality_")
    
    if not stacks_df.empty:
        stacks_df["end_time"] = stacks_df["start_time"] + stacks_df["duration"]
        stacks_df["operation"] = "stack"
    
    if not reclaims_df.empty:
        reclaims_df["end_time"] = reclaims_df["start_time"] + reclaims_df["duration"]
        reclaims_df["operation"] = "reclaim"
    
    operations_df = (
        pd.concat([xdf for xdf in [stacks_df, reclaims_df] if not xdf.empty])
        .astype({"weight": int})
        .sort_values(["engine", "start_time"])
        .assign(
            travel_time=lambda xdf: (
                xdf.groupby("engine", as_index=False).apply(travel_time)
            )["travel_time"].reset_index(level=0, drop=True)
        )
    )
    
    stockpiles_final_df = (
        stockpiles_df.merge(
            operations_df.rename(columns={"weight": "weightFinal"})
            .groupby("stockpile")["weightFinal"]
            .sum(),
            left_on="id",
            right_index=True,
            how="left",
        )
        .fillna({"weightFinal": 0})
        .astype({"weightFinal": int})
        .assign(weightFinal=lambda xdf: xdf["weightIni"] - xdf["weightFinal"])
    )
    
    quality_cols = stockpiles_df.columns.intersection(["Fe", "SiO2", "Al2O3", "P", "+31.5", "-6.3"]).to_list()
    
    operations_df = operations_df.merge(
        stockpiles_df.rename(columns={"id": "stockpile"})[["stockpile", "weightIni", *quality_cols]],
        on="stockpile",
        how="left",
    ).assign(weightFinal=lambda xdf: xdf["weightIni"] - xdf["weight"])
    
    final_output_row = [
        operations_df["weight"].sum(),
        "output",
        operations_df["engine"].unique().tolist(),
        operations_df["start_time"].min(),
        operations_df["end_time"].max(),
        1,
        operations_df["end_time"].max(),
        "output_stack",
        operations_df["travel_time"].sum(),
        operations_df["weightFinal"].sum(),
    ]
    
    for quality_col in quality_cols:
        final_output_row.append(
            (
                (
                    operations_df["weight"]
                    * operations_df[quality_col]
                    / operations_df["weight"].sum()
                ).sum()
            )
        )
    operations_df = pd.concat(
        [
            operations_df,
            pd.DataFrame(
                {
                    col: [value]
                    for col, value in zip(operations_df.columns, final_output_row)
                }
            ),
        ],
        axis=0,
    )
    
    required_weight = operations_df.loc[operations_df["stockpile"] == "output", "weight"].values[0]
    infos_gerais = pd.DataFrame(
        {
            "Variável": ["Peso Carregamento"],
            "Valor": [required_weight]
        }
    )
    
    engines_yards = (
        stockpiles_final_df[["yard", "engines"]]
        .astype({"engines": str})
        .drop_duplicates()
        .assign(
            engines=lambda xdf: xdf["engines"]
            .str.replace("[", "")
            .str.replace("]", "")
            .str.replace(" ", "")
            .str.replace(",", "")
            .apply(list)
        )
    )
    all_engines = list(
        sorted(set([engine for engines in engines_yards["engines"] for engine in engines]))
    )
    for engine in all_engines:
        engines_yards[f"Veículo {engine}"] = engines_yards["engines"].apply(lambda value: "x" if engine in value else "")
    
    engines_yards = engines_yards.drop(columns=["engines"]).rename({"yard": "Área"})
    
    rename_dict = {
        "id": "ID",
        "yard": "Área",
        "weightIni": "Quantidade (ton)",
    }
    final_cols = [*list(rename_dict.values()), *quality_cols]
    stockpiles_final_df = stockpiles_final_df.rename(columns=rename_dict)[final_cols]
    
    load_rates = (
        engines_df[["id", "speedReclaim"]]
        .astype({"speedReclaim": int})
        .rename(columns={"id": "Veículo", "speedReclaim": "Taxa (ton/min)"})
    )
    
    from_to_cols = [col for col in engines_df.columns if "->" in col]
    travel_times_dict = {
        "De": [],
        "Para": [],
    }
    for from_to in from_to_cols:
        from_stockpile, to_stockpile = from_to.split(" -> ")
        travel_times_dict["De"].append(from_stockpile)
        travel_times_dict["Para"].append(to_stockpile)
        for engine in all_engines:
            vehicle_travel_times = travel_times_dict.get(f"Veículo {engine}", [])
            engine_row = engines_df.loc[engines_df["id"] == int(engine)]
            time_travel = engine_row[from_to].values[0]
            vehicle_travel_times.append(time_travel)
            travel_times_dict[f"Veículo {engine}"] = vehicle_travel_times
    
    travel_times_df = pd.DataFrame(travel_times_dict).astype({"De": int, "Para": int})
    
    rename_dict = {
        "engine": "Veículo",
        "stockpile": "Pilha",
        "weightIni": "Peso Inicial Pilha",
        "weightFinal": "Peso Final Pilha",
        "weight": "Carregamento (ton)",
        "start_time": "Início",
        "end_time": "Fim",
        "duration": "Tempo Carregamento",
        "travel_time": "Tempo Deslocamento",
    }
    operations_df = operations_df.rename(columns=rename_dict)[[*list(rename_dict.values()), *quality_cols]]
    operations_df[quality_cols] = operations_df[quality_cols].round(2)
    operations_df = operations_df.fillna("")
    
    outputs_df_out["check"] = np.where(
        (outputs_df_out["value"] >= outputs_df_out["minimum"]) & 
        (outputs_df_out["value"] <= outputs_df_out["maximum"]),
        True,
        False,
    )
    rename_dict = {
        "parameter": "Elemento",
        "value": "Valor",
        "minimum": "Mínimo",
        "maximum": "Máximo",
        "goal": "Meta",
        "check": "Check",
    }
    outputs_df_out = outputs_df_out.rename(columns=rename_dict)[list(rename_dict.values())]
    
    # Create a new Excel workbook
    wb = Workbook()
    
    # Define styles
    header_fill = PatternFill(start_color="5B80B8", end_color="5B80B8", fill_type="solid")
    header_font = Font(bold=True, color="FFFFFF")
    load_rates_header_fill = PatternFill(start_color="E26B0A", end_color="E26B0A", fill_type="solid")
    alt_fill_1 = PatternFill(start_color="B9C8DE", end_color="B9C8DE", fill_type="solid")
    alt_fill_2 = PatternFill(start_color="DEE6F0", end_color="DEE6F0", fill_type="solid")
    alignment = Alignment(horizontal="center", vertical="center")
    alignment_merged = Alignment(horizontal="center", vertical="center", wrap_text=True)
    
    # Define a number format style with 0 decimal places and thousands' separator
    number_format = NamedStyle(name="number_format", number_format="#,##0")
    
    # Create the 'Inputs' sheet
    ws1 = wb.active
    ws1.title = "Inputs"
    
    dataframes = [
        (infos_gerais, "T1: Informações Gerais"),
        (stockpiles_final_df, "T2: Composição e Posição de Pilhas"),
        (engines_yards, "T3: Acessos de caminhões às áreas das pilhas"),
        (load_rates, "T4: Taxa de Carregamento por Caminhão"),
        (travel_times_df, "T5: Tempos de deslocamento entre pilhas (min)"),
    ]
    
    col_offset = 1  # Start at column 1
    for df, header in dataframes:
        start_col = col_offset
        end_col = start_col + len(df.columns) - 1
    
        # Write dataframe columns
        for col_num, column_title in enumerate(df.columns, start=start_col):
            cell = ws1.cell(row=2, column=col_num)
            cell.value = column_title
            if header == "T4: Taxa de Carregamento por Caminhão":
                cell.fill = load_rates_header_fill
            else:
                cell.fill = header_fill
            cell.font = header_font
            cell.alignment = alignment
    
        # Write dataframe contents
        for row_num, row in enumerate(dataframe_to_rows(df, index=False, header=False), start=3):
            fill = alt_fill_1 if row_num % 2 else alt_fill_2
            for col_num, value in enumerate(row, start=start_col):
                cell = ws1.cell(row=row_num, column=col_num)
                cell.value = value
    
                # Apply number formatting
                if header == 'T1: Informações Gerais' and cell.column_letter in ['Valor', 'Quantidade (ton)', 'Taxa (ton/min)']:
                    cell.style = number_format
                elif header == 'T4: Taxa de Carregamento por Caminhão' and column_title == 'Taxa (ton/min)':
                    cell.style = number_format
    
                cell.fill = fill
                cell.alignment = alignment
    
        col_offset = end_col + 2  # Move to the next section with a gap of 1 column
    
    # Freeze panes in 'Inputs' sheet
    ws1.freeze_panes = ws1["A3"]
    
    # Autofit column widths
    autofit_columns(ws1)
    
    col_offset = 1  # Start at column 1
    for df, header in dataframes:
        start_col = col_offset
        end_col = start_col + len(df.columns) - 1
    
        # Set merged header
        ws1.merge_cells(start_row=1, start_column=start_col, end_row=1, end_column=end_col)
        header_cell = ws1.cell(row=1, column=start_col)
        header_cell.value = header
        header_cell.alignment = alignment_merged
    
        # Set row height for the merged cell row
        ws1.row_dimensions[1].height = 36
    
        col_offset = end_col + 2  # Move to the next section with a gap of 1 column
    
    # Hide gridlines in 'Inputs' sheet
    ws1.sheet_view.showGridLines = False
    
    # Create 'Resultados' sheet
    ws2 = wb.create_sheet(title="Resultados")
    for r_idx, row in enumerate(
        dataframe_to_rows(operations_df, index=False, header=True), start=1
    ):
        for c_idx, value in enumerate(row, start=1):
            try:
                cell = ws2.cell(row=r_idx, column=c_idx, value=value)
            except ValueError:
                cell = ws2.cell(row=r_idx, column=c_idx, value=str(value))
    
            # Apply number formatting
            if cell.column_letter in ['Peso Inicial Pilha', 'Peso Final Pilha', 'Carregamento (ton)']:
                cell.style = number_format
    
            if r_idx == 1:
                cell.fill = header_fill
                cell.font = header_font
            else:
                cell.fill = alt_fill_1 if r_idx % 2 == 0 else alt_fill_2
            cell.alignment = alignment
    
    # Autofit column widths
    autofit_columns(ws2)
    
    ws2.sheet_view.showGridLines = False

    # Create 'Check Restrições' sheet
    ws3 = wb.create_sheet(title="Check Restrições")
    for r_idx, row in enumerate(
        dataframe_to_rows(outputs_df_out, index=False, header=True), start=1
    ):
        for c_idx, value in enumerate(row, start=1):
            cell = ws3.cell(row=r_idx, column=c_idx, value=value)
            if r_idx == 1:
                cell.fill = header_fill
                cell.font = header_font
            else:
                cell.fill = alt_fill_1 if r_idx % 2 == 0 else alt_fill_2
            cell.alignment = alignment
    
    # Apply conditional formatting for the 'Check' column
    check_col_idx = outputs_df_out.columns.get_loc("Check") + 1
    ws3.conditional_formatting.add(
        f"{get_column_letter(check_col_idx)}2:{get_column_letter(check_col_idx)}{ws3.max_row}",
        CellIsRule(
            operator="equal",
            formula=["TRUE"],
            stopIfTrue=True,
            fill=PatternFill(start_color="C6EFCE", end_color="C6EFCE", fill_type="solid"),
            font=Font(color="006100"),
        ),
    )
    
    ws3.conditional_formatting.add(
        f"{get_column_letter(check_col_idx)}2:{get_column_letter(check_col_idx)}{ws3.max_row}",
        CellIsRule(
            operator="equal",
            formula=["FALSE"],
            stopIfTrue=True,
            fill=PatternFill(start_color="FFC7CE", end_color="FFC7CE", fill_type="solid"),
            font=Font(color="9C0006"),
        ),
    )
    
    # Autofit column widths
    autofit_columns(ws3)
    
    ws3.sheet_view.showGridLines = False
    
    # Save the workbook
    wb.save(excel_path)

In [113]:
json_input_path = "../tests/instance_8.json"
json_output_path = "../out/json/out_8.json"
json_input_output_to_excel(json_input_path, json_output_path)

  engines_df[from_to_df["from_to"].to_list()] = -1
  engines_df[from_to_df["from_to"].to_list()] = -1
  engines_df[from_to_df["from_to"].to_list()] = -1
  engines_df[from_to_df["from_to"].to_list()] = -1
  engines_df[from_to_df["from_to"].to_list()] = -1
  engines_df[from_to_df["from_to"].to_list()] = -1
  engines_df[from_to_df["from_to"].to_list()] = -1
  engines_df[from_to_df["from_to"].to_list()] = -1
  engines_df[from_to_df["from_to"].to_list()] = -1
  engines_df[from_to_df["from_to"].to_list()] = -1
  engines_df[from_to_df["from_to"].to_list()] = -1
  engines_df[from_to_df["from_to"].to_list()] = -1
  engines_df[from_to_df["from_to"].to_list()] = -1
  engines_df[from_to_df["from_to"].to_list()] = -1
  engines_df[from_to_df["from_to"].to_list()] = -1
  engines_df[from_to_df["from_to"].to_list()] = -1
  engines_df[from_to_df["from_to"].to_list()] = -1
  engines_df[from_to_df["from_to"].to_list()] = -1
  engines_df[from_to_df["from_to"].to_list()] = -1
  engines_df[from_to_df["from_t

In [116]:
from __future__ import annotations

import json
from pathlib import Path
from typing import Union, Dict, List, Any

import numpy as np
import pandas as pd
from openpyxl import Workbook
from openpyxl.utils.dataframe import dataframe_to_rows
from openpyxl.styles import PatternFill, Font, Alignment, NamedStyle
from openpyxl.utils import get_column_letter
from openpyxl.formatting.rule import CellIsRule


def check_is_file(*filepaths: Union[str, Path]) -> None:
    """Check if all provided filepaths exist, raise an error if any do not."""
    files_not_found = [str(filepath) for filepath in filepaths if not Path(filepath).is_file()]
    if files_not_found:
        plural = "s" if len(files_not_found) > 1 else ""
        verb = "were" if len(files_not_found) > 1 else "was"
        raise FileNotFoundError(f"The following file{plural} {verb} not found: {', '.join(files_not_found)}")


def load_json(file_path: Union[str, Path]) -> Dict[str, Any]:
    """Load a JSON file and return the data as a dictionary."""
    with open(file_path) as fh:
        return json.load(fh)


def prepare_dataframe(data: dict, key: str, prefix: str = "", quality_key: str = None) -> pd.DataFrame:
    """Prepare a DataFrame by extracting nested quality information."""
    df = pd.DataFrame(data.get(key, []))
    if quality_key and not df.empty:
        quality_df = pd.json_normalize(df.pop(quality_key), sep="_").add_prefix(prefix)
        df = pd.concat([df, quality_df], axis=1)
    return df


class ExcelFormatter:
    def __init__(self, config: Dict[str, Any]):
        """Initialize the ExcelFormatter with a configuration dictionary.

        Parameters
        ----------
        config : dict
            Configuration dictionary to customize Excel formatting.
        """
        self.config = config
        self.wb = Workbook()
        self.header_style = self._create_header_style(config.get("header_style", {}))
        self.alt_fills = self._create_alternate_fills(config.get("alternate_fills", {}))
        self.number_format = NamedStyle(name="number_format", number_format=config.get("number_format", "#,##0"))

    @staticmethod
    def _create_header_style(style_config: Dict[str, str]) -> Dict[str, PatternFill | Font | Alignment]:
        """Create header styles based on the provided configuration."""
        return {
            "fill": PatternFill(start_color=style_config.get("start_color", "5B80B8"), 
                                end_color=style_config.get("end_color", "5B80B8"), 
                                fill_type="solid"),
            "font": Font(bold=True, color=style_config.get("font_color", "FFFFFF")),
            "alignment": Alignment(horizontal="center", vertical="center")
        }

    @staticmethod
    def _create_alternate_fills(fill_config: Dict[str, str]) -> List[PatternFill]:
        """Create alternate row fill styles."""
        return [
            PatternFill(start_color=fill_config.get("alt_fill_1", "B9C8DE"), 
                        end_color=fill_config.get("alt_fill_1", "B9C8DE"), 
                        fill_type="solid"),
            PatternFill(start_color=fill_config.get("alt_fill_2", "DEE6F0"), 
                        end_color=fill_config.get("alt_fill_2", "DEE6F0"), 
                        fill_type="solid")
        ]

    def add_dataframe_to_sheet(self, ws, df: pd.DataFrame, title: str, start_row: int = 1, start_col: int = 1) -> None:
        """Add a DataFrame to the worksheet with the specified formatting."""
        ws.merge_cells(start_row=start_row, start_column=start_col, 
                       end_row=start_row, end_column=start_col + len(df.columns) - 1)
        header_cell = ws.cell(row=start_row, column=start_col, value=title)
        header_cell.alignment = Alignment(horizontal="center", vertical="center", wrap_text=True)
        ws.row_dimensions[start_row].height = 36

        for col_num, column_title in enumerate(df.columns, start=start_col):
            cell = ws.cell(row=start_row + 1, column=col_num, value=column_title)
            cell.fill = self.header_style["fill"]
            cell.font = self.header_style["font"]
            cell.alignment = self.header_style["alignment"]

        for row_num, row in enumerate(dataframe_to_rows(df, index=False, header=False), start=start_row + 2):
            fill = self.alt_fills[row_num % 2]
            for col_num, value in enumerate(row, start=start_col):
                cell = ws.cell(row=row_num, column=col_num, value=value)
                if isinstance(value, (int, float)):
                    cell.style = self.number_format
                cell.fill = fill
                cell.alignment = Alignment(horizontal="center", vertical="center")

        ws.freeze_panes = f"A{start_row + 2}"
        self._autofit_columns(ws)

    def _autofit_columns(self, ws) -> None:
        """Auto-adjust the width of columns based on the length of their content."""
        for col in ws.columns:
            max_length = max(len(str(cell.value)) for cell in col if cell.value) + 2
            adjusted_width = max(max_length, 10)
            ws.column_dimensions[col[0].column_letter].width = adjusted_width

    def save(self, excel_path: Union[str, Path]) -> None:
        """Save the workbook to the specified path."""
        self.wb.save(excel_path)


class DataProcessor:
    def __init__(self, config: Dict[str, Any]):
        """Initialize the DataProcessor with configuration settings.

        Parameters
        ----------
        config : dict
            Configuration dictionary to customize data processing.
        """
        self.config = config

    def process_stockpiles_and_engines(self, stockpiles_df: pd.DataFrame, engines_df: pd.DataFrame) -> pd.DataFrame:
        """Process stockpiles and engines by assigning engines to stockpiles and extracting initial quality values."""
        stockpiles_df = self.assign_engines_to_stockpiles(stockpiles_df, engines_df)
        return self.extract_quality_ini_values(stockpiles_df)

    def assign_engines_to_stockpiles(self, stockpiles_df: pd.DataFrame, engines_df: pd.DataFrame) -> pd.DataFrame:
        """Assign engines to stockpiles based on matching yards and rails."""
        def find_assigned_engines(stockpile):
            return [
                eng_row["id"]
                for _, eng_row in engines_df.iterrows()
                if stockpile["yard"] in eng_row["yards"] and eng_row["rail"] in stockpile["rails"]
            ]

        stockpiles_df["engines"] = stockpiles_df.apply(find_assigned_engines, axis=1)
        return stockpiles_df

    def extract_quality_ini_values(self, stockpiles_df: pd.DataFrame) -> pd.DataFrame:
        """Extract initial quality values from nested dictionaries in the stockpiles DataFrame."""
        quality_cols = stockpiles_df.filter(regex=f'^{self.config["quality_prefix"]}').columns
        quality_ini_dict = {param: [] for param in stockpiles_df[quality_cols[0]].iloc[0].keys()}

        for column in quality_cols:
            for _, row in stockpiles_df.iterrows():
                for param, value in row[column].items():
                    quality_ini_dict[param].append(value)

        stockpiles_df = pd.concat([stockpiles_df, pd.DataFrame(quality_ini_dict, index=stockpiles_df.index)], axis=1)
        stockpiles_df.drop(columns=quality_cols, inplace=True)
        return stockpiles_df

    def calculate_travel_time(self, grp: pd.DataFrame) -> pd.DataFrame:
        """Calculate the travel time between consecutive events within a group."""
        if len(grp) == 1:
            return pd.DataFrame({"travel_time": [grp["start_time"].iloc[0]]}, index=grp.index)
        
        grp_sorted = grp.sort_values("end_time")
        res = [grp_sorted["start_time"].iloc[0]] + list(grp_sorted["start_time"].iloc[1:].values - grp_sorted["end_time"].iloc[:-1].values)
        return pd.DataFrame({"travel_time": res}, index=grp_sorted.index)


def json_input_output_to_excel(
    json_input_path: Union[str, Path],
    json_output_path: Union[str, Path],
    excel_path: Union[str, Path] = None,
    config: Dict[str, Any] = None
) -> None:
    """Convert input and output JSON files to an Excel file with formatted sheets.

    Parameters
    ----------
    json_input_path : Union[str, Path]
        Path to the input JSON file.
    json_output_path : Union[str, Path]
        Path to the output JSON file.
    excel_path : Union[str, Path], optional
        Path to save the Excel file. If None, the file is saved in the same directory as the output JSON.
    config : Dict[str, Any], optional
        Configuration dictionary to customize the processing and formatting. Defaults to None.
    """
    config = config or {
        "quality_prefix": "qualityIni_",
        "header_style": {"start_color": "5B80B8", "end_color": "5B80B8", "font_color": "FFFFFF"},
        "alternate_fills": {"alt_fill_1": "B9C8DE", "alt_fill_2": "DEE6F0"},
        "number_format": "#,##0"
    }

    check_is_file(json_input_path, json_output_path)
    excel_path = Path(excel_path or Path(json_output_path).with_suffix(".xlsx").parent.parent / "excel" / Path(json_output_path).with_suffix(".xlsx").name)
    excel_path.parent.mkdir(parents=True, exist_ok=True)

    instance_data = load_json(json_input_path)
    output_data = load_json(json_output_path)

    # Initialize processor and formatter
    processor = DataProcessor(config)
    formatter = ExcelFormatter(config)

    # Process input data
    info_df = pd.DataFrame([instance_data["info"]], columns=["Instance_Name", "Capacity", "Yard"])
    engines_df = prepare_dataframe(instance_data, "engines")
    stockpiles_df = prepare_dataframe(instance_data, "stockpiles", config["quality_prefix"], "qualityIni")
    stockpiles_df = processor.process_stockpiles_and_engines(stockpiles_df, engines_df)
    inputs_df = prepare_dataframe(instance_data, "inputs", "quality_", "quality")

    # Process output data
    outputs_df = prepare_dataframe(output_data, "outputs", "quality_", "quality")
    operations_df = pd.concat(
        [prepare_dataframe(output_data, op_type) for op_type in ["stacks", "reclaims"]], ignore_index=True
    ).astype({"weight": int}).sort_values(["engine", "start_time"])

    operations_df["end_time"] = operations_df["start_time"] + operations_df["duration"]
    operations_df["travel_time"] = operations_df.groupby("engine", as_index=False).apply(processor.calculate_travel_time)["travel_time"].reset_index(drop=True)

    stockpiles_final_df = stockpiles_df.merge(
        operations_df.groupby("stockpile")["weight"].sum().reset_index().rename(columns={"weight": "weightFinal"}),
        left_on="id", right_on="stockpile", how="left"
    ).fillna({"weightFinal": 0}).astype({"weightFinal": int})
    stockpiles_final_df["weightFinal"] = stockpiles_final_df["weightIni"] - stockpiles_final_df["weightFinal"]

    operations_df = operations_df.merge(
        stockpiles_final_df[["id", "weightIni"] + stockpiles_final_df.columns.intersection(["Fe", "SiO2", "Al2O3", "P", "+31.5", "-6.3"]).to_list()].rename(columns={"id": "stockpile"}),
        on="stockpile", how="left"
    )
    operations_df["weightFinal"] = operations_df["weightIni"] - operations_df["weight"]

    final_output_row = pd.Series({
        "weight": operations_df["weight"].sum(),
        "operation": "output",
        "engine": operations_df["engine"].unique().tolist(),
        "start_time": operations_df["start_time"].min(),
        "end_time": operations_df["end_time"].max(),
        "duration": 1,
        "output": operations_df["end_time"].max(),
        "weightFinal": operations_df["weightFinal"].sum(),
        "travel_time": operations_df["travel_time"].sum(),
    })

    for quality_col in stockpiles_final_df.columns.intersection(["Fe", "SiO2", "Al2O3", "P", "+31.5", "-6.3"]).to_list():
        final_output_row[quality_col] = (operations_df["weight"] * operations_df[quality_col] / operations_df["weight"].sum()).sum()

    operations_df = pd.concat([operations_df, final_output_row.to_frame().T], ignore_index=True)

    # Prepare DataFrames for Excel
    dataframes = [
        (pd.DataFrame({"Variável": ["Peso Carregamento"], "Valor": [operations_df["weight"].sum()]}), "T1: Informações Gerais"),
        (stockpiles_final_df.rename(columns={"id": "ID", "yard": "Área", "weightIni": "Quantidade (ton)"}), "T2: Composição e Posição de Pilhas"),
        (self._prepare_engines_yards(stockpiles_final_df), "T3: Acessos de caminhões às áreas das pilhas"),
        (self._prepare_load_rates(engines_df), "T4: Taxa de Carregamento por Caminhão"),
        (self._prepare_travel_times_df(engines_df), "T5: Tempos de deslocamento entre pilhas (min)"),
    ]

    # Add dataframes to the Excel workbook
    ws1 = formatter.wb.active
    ws1.title = "Inputs"
    row_offset = 1
    for df, title in dataframes:
        formatter.add_dataframe_to_sheet(ws1, df, title, start_row=row_offset)
        row_offset += len(df) + 4  # Adjust row offset for the next table

    # Add 'Resultados' sheet
    ws2 = formatter.wb.create_sheet(title="Resultados")
    formatter.add_dataframe_to_sheet(ws2, operations_df, "Resultados")

    # Add 'Check Restrições' sheet
    outputs_df_out = self._prepare_check_restrictions(outputs_df)
    ws3 = formatter.wb.create_sheet(title="Check Restrições")
    formatter.add_dataframe_to_sheet(ws3, outputs_df_out, "Check Restrições")
    self._add_conditional_formatting(ws3, "Check", outputs_df_out)

    # Save the workbook
    formatter.save(excel_path)

def _prepare_engines_yards(stockpiles_final_df: pd.DataFrame) -> pd.DataFrame:
    """Prepare the DataFrame for engines and yards information."""
    engines_yards = stockpiles_final_df[["yard", "engines"]].astype(str).drop_duplicates()
    all_engines = sorted({engine for engine_list in engines_yards["engines"].apply(lambda x: x.strip("[]").replace(" ", "").split(",")) for engine in engine_list})
    for engine in all_engines:
        engines_yards[f"Veículo {engine}"] = engines_yards["engines"].apply(lambda e: "x" if engine in e else "")
    return engines_yards.drop(columns=["engines"]).rename(columns={"yard": "Área"})

def _prepare_load_rates(engines_df: pd.DataFrame) -> pd.DataFrame:
    """Prepare the DataFrame for load rates."""
    return engines_df[["id", "speedReclaim"]].rename(columns={"id": "Veículo", "speedReclaim": "Taxa (ton/min)"}).astype({"Taxa (ton/min)": int})

def _prepare_travel_times_df(engines_df: pd.DataFrame) -> pd.DataFrame:
    """Prepare the DataFrame for travel times."""
    travel_times_dict = {"De": [], "Para": []}
    for from_to in engines_df.filter(regex="->").columns:
        travel_times_dict["De"].append(from_to.split(" -> ")[0])
        travel_times_dict["Para"].append(from_to.split(" -> ")[1])
        for engine in all_engines:
            travel_times_dict.setdefault(f"Veículo {engine}", []).append(engines_df.at[engines_df["id"] == int(engine), from_to].values[0])
    return pd.DataFrame(travel_times_dict)

def _prepare_check_restrictions(outputs_df_out: pd.DataFrame) -> pd.DataFrame:
    """Prepare the DataFrame for the 'Check Restrições' sheet."""
    return outputs_df_out.assign(
        check=lambda df: (df["value"] >= df["minimum"]) & (df["value"] <= df["maximum"])
    ).rename(columns={
        "parameter": "Elemento",
        "value": "Valor",
        "minimum": "Mínimo",
        "maximum": "Máximo",
        "goal": "Meta",
        "check": "Check",
    })

def _add_conditional_formatting(ws, check_column: str, df: pd.DataFrame) -> None:
    """Add conditional formatting to the 'Check' column."""
    check_col_idx = df.columns.get_loc(check_column) + 1
    ws.conditional_formatting.add(
        f"{get_column_letter(check_col_idx)}2:{get_column_letter(check_col_idx)}{ws.max_row}",
        CellIsRule(operator="equal", formula=["TRUE"], stopIfTrue=True, fill=PatternFill(start_color="C6EFCE", end_color="C6EFCE", fill_type="solid"), font=Font(color="006100")),
    )
    ws.conditional_formatting.add(
        f"{get_column_letter(check_col_idx)}2:{get_column_letter(check_col_idx)}{ws.max_row}",
        CellIsRule(operator="equal", formula=["FALSE"], stopIfTrue=True, fill=PatternFill(start_color="FFC7CE", end_color="FFC7CE", fill_type="solid"), font=Font(color="9C0006")),
    )


In [117]:
json_input_output_to_excel(
    json_input_path="../tests/instance_8.json",
    json_output_path="../out/json/out_8.json",
    excel_path="../out/excel/out_8-1.xlsx"
)

ValueError: Length of values (80) does not match length of index (20)