In [None]:
from ixmp import Platform

from message_ix import Scenario

MODEL = "test_emissions_price"

mp = Platform("test_feature_price_emission")
mp.add_unit("MtCO2")
mp.add_unit("tCO2/kWa")
mp.add_unit("USD/kW")

scen = Scenario(
    mp,
    MODEL,
    scenario="many_tecs",
    version="new",
)

In [None]:
from message_ix.tests.test_feature_price_emission import model_setup

# 450 is too much; this bound will not affect the first model year
# (EMISS is not reduced from without cumulative bound to with it)
# In a model year without binding bound, no price_emission is produced ->
# there is one value missing for price_emission in period-specific bound
cumulative_bound = 1
# cumulative_bound = 500
# cumulative_bound = 550
years = [2020, 2030, 2040, 2050]
# years = [2020, 2025, 2030, 2040, 2050]
# years = [2020, 2030, 2040, 2045, 2050]

filters = {"node": "World"}

model_setup(scen=scen, years=years, simple_tecs=False)

In [None]:
from message_ix.tests.test_feature_price_emission import solve_args

scen.commit("initialize test scenario")
scen.solve(quiet=True, **solve_args)
scen.var("EMISS", filters)

In [None]:
scenario_cumulative_bound = scen.clone(
    MODEL,
    "cumulative_emission_bound",
    "introducing a cumulative emissions bound",
    keep_solution=False,
)
scenario_cumulative_bound.check_out()

scenario_cumulative_bound.add_cat("year", "cumulative", years)
scenario_cumulative_bound.add_par(
    "bound_emission",
    ["World", "GHG", "all", "cumulative"],
    cumulative_bound,
    "MtCO2",
)
scenario_cumulative_bound.commit("initialize test scenario")
scenario_cumulative_bound.solve(quiet=True, **solve_args)
scenario_cumulative_bound.var("EMISS", filters)

In [None]:
emiss = scenario_cumulative_bound.var("EMISS", filters).set_index("year").lvl
price_emission = (
    scenario_cumulative_bound.var("PRICE_EMISSION", filters).set_index("year").lvl
)

In [None]:
# --------------------------------------------------------
# Run scenario with annual-emission bound based on `EMISS`
# from cumulative constraint scenario.
# --------------------------------------------------------

scenario_period_bound = scen.clone(
    MODEL,
    "period_bound_many_tecs",
    "introducing a period-specific emission_bound",
    keep_solution=False,
)
scenario_period_bound.check_out()
for year in years:
    scenario_period_bound.add_cat("year", year, year)

# use emissions from cumulative-constraint scenario as period-emission bounds
emiss_period_bound = (
    scenario_cumulative_bound.var("EMISS", {"node": "World"})
    .rename(columns={"year": "type_year", "lvl": "value"})
    .drop("emission", axis=1)
)
emiss_period_bound["type_emission"] = "GHG"
emiss_period_bound["unit"] = "MtCO2"
scenario_period_bound.add_par("bound_emission", emiss_period_bound)
scenario_period_bound.commit("initialize test scenario for periodic emission bound")
scenario_period_bound.solve(quiet=True, **solve_args)
scenario_period_bound.var("EMISS", filters)

In [None]:
import numpy.testing as npt

# check -emissions are close between cumulative and yearly-bound scenarios
emiss_period_bound = scenario_period_bound.var("EMISS", filters).set_index("year").lvl
npt.assert_allclose(emiss, emiss_period_bound)

In [None]:
scenario_cumulative_bound.par("emission_factor")

In [None]:
print(price_emission)
scenario_period_bound.var("PRICE_EMISSION")

In [None]:
# check "PRICE_EMISSION" is close between cumulative- and yearly-bound scenarios
price_emission_period_bound = (
    scenario_period_bound.var("PRICE_EMISSION", filters).set_index("year").lvl
)
npt.assert_allclose(price_emission, price_emission_period_bound)

In [None]:
scenario_cumulative_bound.par("bound_emission")

In [None]:
scen_tax = Scenario(
    mp,
    MODEL,
    scenario="tax_many_tecs",
    version="new",
)
model_setup(scen_tax, years, simple_tecs=False)
for year in years:
    scen_tax.add_cat("year", year, year)
# use emission prices from cumulative-constraint scenario as taxes
taxes = scenario_cumulative_bound.var("PRICE_EMISSION").rename(
    columns={"year": "type_year", "lvl": "value"}
)
taxes["unit"] = "USD/tCO2"
taxes["node"] = "node"
scen_tax.add_par("tax_emission", taxes)

In [None]:
scen_tax.commit("initialize test scenario for taxes")
scen_tax.solve(quiet=True)

In [None]:
print(scenario_cumulative_bound.var("PRICE_EMISSION"))
print(scen_tax.var("PRICE_EMISSION"))
print(scen_tax.par("tax_emission"))
print(scenario_cumulative_bound.var("EMISS"))
print(scen_tax.var("EMISS"))

In [None]:
print(scenario_cumulative_bound.var("ACT"))

In [None]:
print(scen_tax.var("ACT"))

In [None]:
scenario_cumulative_bound.par("demand")

In [None]:
scenario_cumulative_bound.equ("EMISSION_EQUIVALENCE")

In [None]:
price_emission_tax = scen_tax.var("PRICE_EMISSION").set_index("year").lvl
npt.assert_allclose(price_emission, price_emission_tax)

In [None]:
# check emissions are close between cumulative and tax scenarios
emiss_tax = scen_tax.var("EMISS", filters).set_index("year").lvl
npt.assert_allclose(emiss, emiss_tax, rtol=0.05)

In [None]:
mp.close_db()