In [33]:
import json
import os
import ast
from graphviz import Digraph
from copy import deepcopy
import random

In [34]:
class DataMode:
    def __init__(self, name: str, stub: str, value: float):
        self.name = name
        self.stub = stub
        self.value = value

    def __repr__(self):
        return f"DataMode(Name={self.name}, Stub={self.stub}, Value={self.value})"

class FormulaMode:
    def __init__(self, name: str, stub: str, formula: str):
        self.name = name
        self.stub = stub
        self.formula = formula
        self.result = None

    def evaluate(self, variables: dict):
        try:
            self.result = eval(self.formula, {}, variables)
        except Exception as e:
            print(f"Error evaluating {self.name}: {e}")
        return self.result

    def __repr__(self):
        return f"FormulaMode(Name={self.name}, Stub={self.stub}, Formula={self.formula}, Result={self.result})"

def extract_variables(expr: str) -> set:
    return {node.id for node in ast.walk(ast.parse(expr)) if isinstance(node, ast.Name)}

In [35]:
class EmissionModel:
    def __init__(self, model_name):
        self.model_name = model_name
        self.data_modes = {}
        self.formula_modes = {}

    def add_data_mode(self, name, stub, value):
        mode = DataMode(name, stub, value)
        self.data_modes[stub] = mode

    def add_formula_mode(self, name, stub, formula):
        mode = FormulaMode(name, stub, formula)
        self.formula_modes[stub] = mode

    def calculate(self, external_vars=None):
        variables = {stub: mode.value for stub, mode in self.data_modes.items()}
        if external_vars:
            variables.update(external_vars)
        evaluated = set()
        for _ in range(len(self.formula_modes)):
            updated = False
            for stub, mode in self.formula_modes.items():
                if stub in evaluated:
                    continue
                result = mode.evaluate(variables)
                if result is not None:
                    variables[stub] = result
                    evaluated.add(stub)
                    updated = True
            if not updated:
                break
        return variables

    def __repr__(self):
        lines = [f"Model: {self.model_name}"]
        for mode in self.formula_modes.values():
            if mode.result is not None:
                lines.append(f"  {mode.stub:25s} = {mode.result:.8f}")
            else:
                lines.append(f"  {mode.stub:25s} = None")
        return "\n".join(lines)


In [36]:
class ModelManager:
    def __init__(self, formula_file="formula.json"):
        self.models = {}
        self.formula_file = formula_file
        self.load_formulas()

    def load_formulas(self):
        try:
            with open(self.formula_file, "r", encoding="utf-8") as f:
                formulas = json.load(f)
                for model_name, formula_dict in formulas.items():
                    model = EmissionModel(model_name)
                    for formula_name, data in formula_dict.items():
                        model.add_formula_mode(formula_name, data["stub"], data["formula"])
                    self.models[model_name] = model
        except Exception as e:
            print(f"Error loading formulas: {e}")

    def get_model(self, model_name):
        return self.models.get(model_name, None)

    def calculate_all(self):
        global_vars = {}
        for model in self.models.values():
            result = model.calculate(global_vars)
            global_vars.update(result)

    def summary_report(self, target_stubs=None):
        lines = ["Final Summary Report:"]
        for model in self.models.values():
            for stub, mode in model.formula_modes.items():
                if (target_stubs is None or stub in target_stubs) and mode.result is not None:
                    lines.append(f"{stub:25s} = {mode.result:.6f}")
        return "\n".join(lines)

    def visualize_model(self, output_path="graphs/model_structure"):
        output_path = os.path.abspath(output_path)
        os.makedirs(os.path.dirname(output_path), exist_ok=True)
        dot = Digraph(name="ModelStructure", format="png")
        dot.attr(rankdir="LR")

        all_vars = {}
        for model in self.models.values():
            for stub, mode in model.data_modes.items():
                all_vars[stub] = mode.value
        for model in self.models.values():
            for stub, mode in model.formula_modes.items():
                mode.evaluate(all_vars)
                all_vars[stub] = mode.result

        for model in self.models.values():
            with dot.subgraph(name=f"cluster_{model.model_name}") as sub:
                sub.attr(label=f"Model: {model.model_name}", style="dashed")
                for stub, mode in model.data_modes.items():
                    label = f"{stub}\n[input]\n= {mode.value}"
                    sub.node(stub, label=label, shape="box", style="filled", fillcolor="#f0f8ff")
                for stub, mode in model.formula_modes.items():
                    label = f"{stub}\n[computed]\n= {mode.result}"
                    sub.node(stub, label=label, shape="ellipse", style="filled", fillcolor="#d0f0c0")
                    for var in extract_variables(mode.formula):
                        dot.edge(var, stub)

        dot.render(output_path, cleanup=True)
        print(f"Model structure graph saved to: {output_path}.png")

    def __repr__(self):
        divider = "\n" + "="*50 + "\n"
        return divider.join([str(model) for model in self.models.values()])


In [37]:
manager = ModelManager()
# This is only for page Company A_ISO14064-1

fixed_emission = manager.get_model("FixedEmission")
if fixed_emission:
    fixed_emission.add_data_mode("Natural Gas Activity Data", "m3", 151091)
    fixed_emission.add_data_mode("N2O Emission Factor", "N2O_XGWP", 0.00091)
    fixed_emission.add_data_mode("CO2 Emission Factor", "CO2_XGWP", 1.87904)
    fixed_emission.add_data_mode("CH4 Emission Factor", "CH4_XGWP", 0.00093)

escaped_emission = manager.get_model("EscapedEmission")
if escaped_emission:
    escaped_emission.add_data_mode("R134a Chiller Emission", "R134a", 231)
    escaped_emission.add_data_mode("R134a Emission Factor", "R134a_XGWP", 130.05)
    escaped_emission.add_data_mode("R407c Chiller Emission", "R407c", 280.5)
    escaped_emission.add_data_mode("R407c Chiller Emission Factor", "R407c_XGWP", 162.18)
    escaped_emission.add_data_mode("R410A Chiller Emission", "R410A_C", 96)
    escaped_emission.add_data_mode("R410A Chiller Emission Factor", "R410A_C_XGWP", 191.718)
    escaped_emission.add_data_mode("R410A AC Emission", "R410A_AC", 88)
    escaped_emission.add_data_mode("R410A AC Emission Factor", "R410A_AC_XGWP", 124.053)
    escaped_emission.add_data_mode("Extinguisher Emission", "Extinguisher", 20.5)
    escaped_emission.add_data_mode("Extinguisher Emission Factor", "Extinguisher_XGWP", 1)
    escaped_emission.add_data_mode("Septic Tank CH4 Emission", "SepticTankCH4_kg", 764894.2933)
    escaped_emission.add_data_mode("CH4 Emission Factor", "SepticTankCH4_XGWP", 0.10672)

mobile_emission = manager.get_model("MobileEmission")
if mobile_emission:
    mobile_emission.add_data_mode("ULP92 Consumption", "ULP92", 159.22)
    mobile_emission.add_data_mode("ULP92 CH4 Factor", "ULP92_CH4_XGWP", 0.02278)
    mobile_emission.add_data_mode("ULP92 CO2 Factor", "ULP92_CO2_XGWP", 2.263)
    mobile_emission.add_data_mode("ULP92 N2O Factor", "ULP92_N2O_XGWP", 0.07132)
    mobile_emission.add_data_mode("ULP98 Consumption", "ULP98", 5582.08)
    mobile_emission.add_data_mode("ULP98 CH4 Factor", "ULP98_CH4_XGWP", 0.02278)
    mobile_emission.add_data_mode("ULP98 CO2 Factor", "ULP98_CO2_XGWP", 2.26313)
    mobile_emission.add_data_mode("ULP98 N2O Factor", "ULP98_N2O_XGWP", 0.07132)
    mobile_emission.add_data_mode("ULP95 Consumption", "ULP95", 9217.254)
    mobile_emission.add_data_mode("ULP95 CH4 Factor", "ULP95_CH4_XGWP", 0.02278)
    mobile_emission.add_data_mode("ULP95 CO2 Factor", "ULP95_CO2_XGWP", 2.263)
    mobile_emission.add_data_mode("ULP95 N2O Factor", "ULP95_N2O_XGWP", 0.07132)
    mobile_emission.add_data_mode("Diesel Data1", "DieselData1", 1114.92)
    mobile_emission.add_data_mode("Diesel Data2", "DieselData2", 2121.581)
    mobile_emission.add_data_mode("Diesel Data3", "DieselData3", 5595.49)
    mobile_emission.add_data_mode("Diesel CH4 Factor", "Diesel_CH4_XGWP", 0.00383)
    mobile_emission.add_data_mode("Diesel CO2 Factor", "Diesel_CO2_XGWP", 2.60603)
    mobile_emission.add_data_mode("Diesel N2O Factor", "Diesel_N2O_XGWP", 0.03744)
    mobile_emission.add_data_mode("LPG Consumption", "LPGData", 3955.968)
    mobile_emission.add_data_mode("LPG CH4 Factor", "LPG_CH4_XGWP", 0.040805)
    mobile_emission.add_data_mode("LPG CO2 Factor", "LPG_CO2_XGWP", 1.75288)
    mobile_emission.add_data_mode("LPG N2O Factor", "LPG_N2O_XGWP", 0.00152)

category2 = manager.get_model("Category2")
if category2:
    category2.add_data_mode("Electricity Usage", "ElectricityData", 25461600)
    category2.add_data_mode("Electricity CO2 Factor", "Electricity_CO2_XGWP", 0.495)

category3 = manager.get_model("Category3")
if category3:
    category3.add_data_mode("Tap Water Usage", "TapWaterData", 0)
    category3.add_data_mode("Tap Water CO2 Factor", "TapWater_CO2_XGWP", 0.233)
manager.calculate_all()

In [38]:
print(manager.summary_report(["Category1", "Category2", "Category3"]))

Final Summary Report:
Category1                 = 536.430928
Category2                 = 12603.492000
Category3                 = 0.000000


In [39]:
print(manager)

Model: FixedEmission
  NaturalGasCO2e            = 284.18404008
  FixedCO2e                 = 284.18404008
Model: EscapedEmission
  ChillerCO2e               = 93.93796800
  BuildingCO2e              = 10.91666400
  ExtinguisherCO2e          = 0.02050000
  SepticTankCO2e            = 81.62951898
  EscapedCO2e               = 186.50465098
Model: MobileEmission
  ULP92_Mower               = 0.37529746
  ULP98_CompanyCar          = 13.15824644
  ULP95_CompanyCar          = 21.72598940
  MotorPetrolCO2e           = 35.25953330
  Diesel_CompanyCar         = 2.95152772
  Diesel_Stacker            = 5.61646138
  Diesel_Truck              = 14.81294068
  DieselCO2e                = 23.38092977
  LPGCO2e                   = 7.10177353
  MobileCO2e                = 65.74223661
Model: Category1
  Category1                 = 536.43092767
Model: Category2
  PurchasedElectricityCO2e  = 12603.49200000
  Category2                 = 12603.49200000
Model: Category3
  TapWaterCO2e              = 0.000000

In [40]:
manager.visualize_model()

Model structure graph saved to: c:\Users\dongz\OneDrive\Desktop\Thesis\DTU-HCAI-Thesis\graphs\model_structure.png


In [None]:
def run_and_get_total(manager):
    manager.calculate_all()
    total = 0.0
    for model in manager.models.values():
        for stub, mode in model.formula_modes.items():
            if mode.result is not None:
                total += mode.result
    return total

def get_value(var_name, manager):
    for model in manager.models.values():
        if var_name in model.formula_modes:
            mode = model.formula_modes[var_name]
            return mode.result
        elif var_name in model.data_modes:
            mode = model.data_modes[var_name]
            return mode.value
    raise ValueError(f"Variable '{var_name}' not found")

def run_with_override(manager, override_dict):
    test_manager = deepcopy(manager)
    for model in test_manager.models.values():
        for stub, mode in model.data_modes.items():
            if stub in override_dict:
                mode.value = override_dict[stub]
    return run_and_get_total(test_manager)

def simulate_strategy_sensitivity(manager, strategy_mapping, percent_range=(0.05, 0.15), trials=5):
    baseline = run_and_get_total(manager)
    results = {}

    for strategy_name, variables in strategy_mapping.items():
        deltas = []
        sensitivities = []
        for _ in range(trials):
            override_vars = {}
            for var in variables:
                try:
                    original_value = get_value(var, manager)
                    random_percent = random.uniform(*percent_range)
                    override_vars[var] = original_value * (1 + random_percent)
                except Exception as e:
                    print(f"[WARN] Could not get value for {var}: {e}")
            new_total = run_with_override(manager, override_vars)
            delta = new_total - baseline
            sensitivity = delta / baseline if baseline != 0 else 0.0
            deltas.append(delta)
            sensitivities.append(sensitivity)

        avg_delta = sum(deltas) / trials
        avg_sensitivity = sum(sensitivities) / trials

        results[strategy_name] = {
            "baseline_total": baseline,
            "avg_new_total": baseline + avg_delta,
            "avg_delta": avg_delta,
            "avg_sensitivity_ratio": avg_sensitivity
        }
    return results

STRATEGY_MAPPING = {
    "Strategy1_ProcessImprovement": [
        "R410A_C", "R410A_C_XGWP", "R410A_AC", "R410A_AC_XGWP",  # for ChillerCO2e
        "Extinguisher", "Extinguisher_XGWP",                     # for BuildingCO2e
        "m3", "CO2_XGWP", "CH4_XGWP", "N2O_XGWP",                # for NaturalGasCO2e
        "ElectricityData", "Electricity_CO2_XGWP"                # for PurchasedElectricityCO2e
    ],
    "Strategy2_EnergySwitching": [
        "m3", "CO2_XGWP", "CH4_XGWP", "N2O_XGWP",                 # for NaturalGasCO2e
        "ElectricityData", "Electricity_CO2_XGWP"                 # for PurchasedElectricityCO2e
    ],
    "Strategy3_CircularEconomy": [
        "m3", "CO2_XGWP", "CH4_XGWP", "N2O_XGWP",                 # for NaturalGasCO2e
    ]
}


In [42]:
results = simulate_strategy_sensitivity(manager, STRATEGY_MAPPING, percent_range=(0.05, 0.15), trials=10)
for strategy, res in results.items():
    print(f"{strategy}: Avg Impact = {res['avg_delta']:.2f}, Avg Sensitivity = {res['avg_sensitivity_ratio']*100:.2f}%")



Strategy1_ProcessImprovement: Avg Impact = 5441.56, Avg Sensitivity = 20.25%
Strategy2_EnergySwitching: Avg Impact = 5632.15, Avg Sensitivity = 20.96%
Strategy3_CircularEconomy: Avg Impact = 176.41, Avg Sensitivity = 0.66%


#### Variable

In [43]:
def simulate_variable_sensitivity(manager, variable_list, percent_range=(0.05, 0.15), trials=5):
    top_n = 3 
    baseline = run_and_get_total(manager)
    results = {}

    for var in variable_list:
        deltas = []
        sensitivities = []
        for _ in range(trials):
            try:
                original_value = get_value(var, manager)
                perturbation = random.uniform(*percent_range)
                override_value = original_value * (1 + perturbation)
                override_dict = {var: override_value}

                new_total = run_with_override(manager, override_dict)
                delta = new_total - baseline
                sensitivity = delta / baseline if baseline != 0 else 0.0
                deltas.append(delta)
                sensitivities.append(sensitivity)
            except Exception as e:
                print(f"[WARN] Could not simulate variable '{var}': {e}")
                continue

        if deltas:
            avg_delta = sum(deltas) / len(deltas)
            avg_sensitivity = sum(sensitivities) / len(sensitivities)
            results[var] = {
                "avg_delta": avg_delta,
                "avg_sensitivity_ratio": avg_sensitivity
            }

    sorted_results = sorted(results.items(), key=lambda x: abs(x[1]["avg_sensitivity_ratio"]), reverse=True)
    top_results = dict(sorted_results[:top_n] if len(sorted_results) >= top_n else sorted_results)

    return top_results

In [45]:
print("Strategy 1: Process Improvement")
s1_top_vars = simulate_variable_sensitivity(manager, STRATEGY_MAPPING["Strategy1_ProcessImprovement"])
for var, data in s1_top_vars.items():
    print(f"{var}: avg_delta={data['avg_delta']:.2f}, Sensitivity={data['avg_sensitivity_ratio']:.4%}")

print("Strategy 2: Energy Switching")
s2_top_vars = simulate_variable_sensitivity(manager, STRATEGY_MAPPING["Strategy2_EnergySwitching"])
for var, data in s2_top_vars.items():
    print(f"{var}: avg_delta={data['avg_delta']:.2f}, Sensitivity={data['avg_sensitivity_ratio']:.4%}")


Strategy 1: Process Improvement
Electricity_CO2_XGWP: avg_delta=2448.13, Sensitivity=9.1093%
ElectricityData: avg_delta=2398.05, Sensitivity=8.9230%
CO2_XGWP: avg_delta=97.41, Sensitivity=0.3625%
Strategy 2: Energy Switching
ElectricityData: avg_delta=2636.78, Sensitivity=9.8113%
Electricity_CO2_XGWP: avg_delta=2365.15, Sensitivity=8.8006%
CO2_XGWP: avg_delta=95.37, Sensitivity=0.3549%
