In [None]:
import pandas as pd

from tgp_research_quantitative_tools.businessObjects.ForwardCurves import ForwardCurve, ForwardCurveGroup
from tgp_research_quantitative_tools.businessObjects.PricesGeneral import CO2_Indexes, Maturity_Types, POWER_Indexes, \
    GAS_Indexes
from tgp_research_quantitative_tools.optimisation.ccgt.backwards_optim import CCGTOptimalDispatch_Constraints
from tgp_research_quantitative_tools.optionpricing.ClosedForm.Black import SpreadOptionThreeAssets
import datetime as dt

import os
os.environ['MONGO_QTA_AUCTIONS_DEV_RW'] = 'TGP'
os.environ['MONGO_FUNDAMENTALS_DEV_PWD'] = 'TGP'
os.environ['DM_KDB_APP_ID'] = 'andres.mr'
os.environ['DM_KDB_APP_PASSWORD'] = 'TGP'
os.environ['DM_KDB_APP_TENANT'] = 'TGP'

In [2]:
class CCGT_Pricer:
    
    # Constructor
    def __init__(self, as_of, start_date, end_date, granularity, index_power, index_gas, index_co2, efficiency=0.5, co2_factor=0.185, variable_operating_costs=0.0, volume_mw=1):
        self.as_of = as_of
        self.efficiency = efficiency
        self.co2_factor = co2_factor
        self.granularity = granularity
        fwd_c_power = ForwardCurve(as_of, index_power, granularity, end_date=end_date)
        fwd_c_gas = ForwardCurve(as_of, index_gas, granularity, end_date=end_date)
        fwd_c_co2 = ForwardCurve(as_of, index_co2, granularity, end_date=end_date)
        self.css_fwd_group = ForwardCurveGroup(as_of=as_of, list_fwd_curves=[fwd_c_power, fwd_c_gas, fwd_c_co2])
        self.udl_prices = self.css_fwd_group.get_fwd_curves().ffill() # get prices
        if index_co2 == CO2_Indexes.UKA:
            self.udl_prices.iloc[:, 2] += 18
        # Compute the Combined Spread Value (intrinsic value): power price - fuel cost / plant efficiency - variable operating costs
        # CSS = Power Price − (1/efficiency) × (Gas Price + CO₂ Factor × CO₂ Price) − Variable Operating Costs
        self.udl_prices["CSS"] = self.udl_prices.iloc[:, 0] - 1 / efficiency * (self.udl_prices.iloc[:, 1] + co2_factor * self.udl_prices.iloc[:, 2]) - variable_operating_costs
        self.udl_prices = self.udl_prices.loc[start_date:]
        self.variable_operating_costs = variable_operating_costs
        self.volume = volume_mw

    @property
    def correls(self):
        if not hasattr(self, "_correls"):
            self.get_correls()
        return self._correls

    @correls.setter
    def correls(self, correls_df):
        self._correls = correls_df

    @property
    def volas(self):
        if not hasattr(self, "_volas"):
            self.get_volas()
        return self._volas

    @volas.setter
    def volas(self, volas_df):
        self._volas = volas_df

    def get_correls(self, granularity=Maturity_Types.Q):
        correls = self.css_fwd_group.calculate_correlation_bw_maturities(granularity=granularity, date_index=True)
        correls = correls.reindex(self.udl_prices.index).ffill().bfill()
        self._correls = correls

    def get_volas(self, type="historical", relative=True, granularity=Maturity_Types.Q):
        volas = self.css_fwd_group.calculate_all_volas(type=type, relative=relative, granularity=granularity,
                                                  date_index=True)
        volas = volas.reindex(self.udl_prices.index).ffill().bfill()
        self._volas = volas

    # Compute the list of options, as we are going to treat each maturity as an independent spread-call option
    def get_list_options(self, type="Call", strike=None, rate=0.0, limit_time_to_expiry=None):
        options_list = []
        if strike is None:  # If no strike provided, use the variable operating costs
            strike = self.variable_operating_costs
        udl_prices = self.udl_prices
        udl_prices[udl_prices <= 0] = 0.000001
        # For each row of the prices df, treat it as an independent call option
        for i, row in self.udl_prices.iterrows():
            # Compute time to expiry
            time_to_expiry = (dt.datetime.fromtimestamp(row.name.value/10**9) - dt.datetime.combine(self.as_of, dt.datetime.min.time())).days/365
            if limit_time_to_expiry is not None:
                time_to_expiry = min(limit_time_to_expiry, time_to_expiry)
            # Compute each option using the 3-asset Spread option
            opt = SpreadOptionThreeAssets(
                type = type,
                underlying1 = row[0],
                underlying2 = row[1],
                underlying3 = row[2],
                coef1 = 1,
                coef2 = 1 / self.efficiency,
                coef3 = 1 / self.efficiency * self.co2_factor,
                strike = strike,
                time_to_expiry = time_to_expiry,
                volatility1 = self.volas.loc[i][0], 
                volatility2 = self.volas.loc[i][1],
                volatility3 = self.volas.loc[i][2],
                correlation12 = self.correls.loc[i][0],
                correlation13 = self.correls.loc[i][1],
                correlation23 = self.correls.loc[i][2],
                rate = rate
            )
            options_list.append(opt)
        self.options_list = options_list

    def get_value(self):
        """
        Calculates the valuation of the options and updates the dispatch decision table if available.
        """
        # Initialize an empty list to store the premium values for each option
        list_valuation = []
        
        # Loop through each option in the options list and calculate its premium
        for opt in self.options_list:
            list_valuation.append(opt.price())
        
        # Create a DataFrame from the list of premiums with the same index as udl_prices
        df_output = pd.DataFrame(list_valuation, index=self.udl_prices.index, columns=["Premium"])
        
        # Copy the combined spread value (CSS) from the udl_prices DataFrame
        CSS = self.udl_prices.CSS.copy(deep=True)
        # Replace any negative CSS values with 0 to ensure that intrinsic values are not negative (same as taking IV = max(payoff,0))
        CSS[CSS < 0] = 0
        # Add the intrinsic value (IV) column to the DataFrame using the cleaned CSS values
        df_output["IV"] = CSS
        
        # Calculate the extrinsic value (EV) as the difference between the option premium and the intrinsic value
        df_output["EV"] = df_output.Premium - df_output.IV
        # Ensure extrinsic value is non-negative by setting any negative values to zero
        df_output.loc[df_output["EV"] < 0, "EV"] = 0
        
        # Store the valuation DataFrame in the instance
        self.valuation = df_output
        
        # If optimal dispatch decisions have already been computed, update them with the new valuation data
        if hasattr(self, "optimal_decisions"):
            # Update the dispatch table with the extrinsic option value
            self.optimal_decisions["EV_Option"] = self.valuation.EV
            # Set the intrinsic value in the dispatch table as the current Payoff
            self.optimal_decisions["IV"] = self.optimal_decisions.Payoff
            # Recompute the extrinsic value (EV) for dispatch by multiplying the option EV by the load
            self.optimal_decisions["EV"] = self.optimal_decisions["EV_Option"] * self.optimal_decisions.Load
            # Update the total payoff as the sum of intrinsic and extrinsic values
            self.optimal_decisions["Payoff"] = self.optimal_decisions.IV + self.optimal_decisions.EV
            # Calculate the average payoff per load unit, handling any division by zero by filling with zero
            self.optimal_decisions["AvgPayoff"] = (self.optimal_decisions.Payoff / self.optimal_decisions.Load).fillna(0)

    def get_dispatch(self, status_table, initial_status="Off_cold_5", eoh_cost=0, output_pnl=False):

        status_table_values = status_table.reset_index().values

        optimal_decisions = CCGTOptimalDispatch_Constraints(
            self.udl_prices.iloc[:, 0].to_list(),
            self.udl_prices.iloc[:, 1].to_list(),
            self.udl_prices.iloc[:, 2].to_list(),
            status_table_values,
            self.variable_operating_costs,
            self.co2_factor,
            initial_status,
            eoh_cost,
            output_pnl
        )
        df_output = pd.DataFrame(optimal_decisions, index=self.udl_prices.index)
        list_ = []
        for i in range(len(optimal_decisions)):
            if i == 0:
                list_.append(initial_status)
            else:
                list_.append(status_table.loc[list_[i - 1], optimal_decisions[i]])
        df_output["SubStatus"] = list_
        df_output = pd.merge(df_output, status_table, left_on="SubStatus", right_index=True).sort_index()
        df_output = pd.concat([self.udl_prices, df_output], axis=1)
        df_output["Payoff"] = \
                    df_output.loc[:, "Load"] * (
                            df_output.iloc[:, 0] - (
                    df_output.iloc[:, 1] + df_output.iloc[:, 2] * self.co2_factor
                    ) / df_output.loc[:,"Efficiency"] - self.variable_operating_costs) - df_output.loc[:,"StartupCost"]
        self.optimal_decisions = df_output

    def get_result_table(self, maturity_type=Maturity_Types.M):
        if hasattr(self, "optimal_decisions"):
            if maturity_type==Maturity_Types.S:
                # Define a function to map months to seasons
                def map_to_season(date):
                    month = date.month
                    if month in [10, 11, 12, 1, 2, 3]:
                        year = date.year - 1 if month <=3 else date.year
                        return f"{year}WIN"
                    elif month in [4, 5, 6, 7, 8, 9]:  # Summer: Apr to Sep
                        return f"{date.year}SUM"

                optimal_decisions = self.optimal_decisions.copy(deep=True)

                # Add a "Season" column based on the mapping function
                optimal_decisions['Season'] = optimal_decisions.index.map(map_to_season)

                # Group by "Season" and aggregate
                return optimal_decisions.groupby("Season").agg({
                    "Load": [
                        ("Running Hours", lambda x: (x > 0).sum()),  # Count where Load > 0
                        ("Production", "sum"),  # Sum of Load
                        ("Running Hours %", lambda x: (x > 0).sum() / len(x) * 100),  # Percentage of Running Hours
                    ],
                    "StartupCost": [
                        ("Number Starts", lambda x: (x > 0).sum()),  # Count where Load > 0
                        ("Startup Cost", "sum"),  # Sum of Load
                    ],
                    "CSS": "mean",
                    "IV": "sum",
                    "EV": "sum",
                    "Payoff": "sum",
                    "AvgPayoff": "mean",
                })
            else:
                # Resample by standard maturity periods
                return self.optimal_decisions.resample(f"{maturity_type.value}S").agg({
                    "Load": [
                        ("Running Hours", lambda x: (x > 0).sum()),  # Count where Load > 0
                        ("Production", "sum"),  # Sum of Load
                        ("Running Hours %", lambda x: (x > 0).sum() / len(x) * 100),  # Percentage of Running Hours
                    ],
                    "StartupCost": [
                        ("Number Starts", lambda x: (x > 0).sum()),  # Count where Load > 0
                        ("Startup Cost", "sum"),  # Sum of Load
                    ],
                    "CSS": "mean",
                    "IV": "sum",
                    "EV": "sum",
                    "Payoff": "sum",
                    "AvgPayoff": "mean",
                })
        elif hasattr(self, "valuation"):
            valuation = self.valuation.copy(deep=True)

            valuation["Days"] = ((valuation.resample(self.granularity.value).count().index - valuation.index).days + 1)

            def weighted_mean(group):
                weights = group["Days"]
                weighted_avg = (group.multiply(weights, axis=0).sum()) / weights.sum()
                return weighted_avg

            # Resample valuation using weighted mean
            valuation = valuation.resample(f"{maturity_type.value}S").apply(weighted_mean)

            if maturity_type in (Maturity_Types.M, Maturity_Types.Q, Maturity_Types.Y):
                valuation["Hours"] = ((valuation.resample(maturity_type.value).count().index - valuation.index).days + 1) * 24
            elif maturity_type == Maturity_Types.D:
                valuation["Hours"] = 24
            elif maturity_type == Maturity_Types.H:
                valuation["Hours"] = 1
            valuation["Volume"] = valuation.Hours * self.volume
            valuation["EV_Total"] = valuation.Volume * valuation.EV
            valuation["IV_Total"] = valuation.Volume * valuation.IV
            valuation["Premium_Total"] = valuation.Volume * valuation.Premium
            return valuation[["IV", "EV", "Premium", "EV_Total", "IV_Total", "Premium_Total"]]

In [3]:
date = dt.date(2025, 2, 28)
start_date = dt.date(2025, 10, 1)
end_date = dt.date(2030, 9, 30)

status_table = [['CoolDown', 76.606, 0.266, 200.0, 'Off_cold_1', 'Off_cold_1',
    'Off_cold_1'],
    ['FullLoad', 432.0, 0.511, 0.0, 'CoolDown', 'FullLoad',
    'FullLoad'],
    ['On_1', 432.0, 0.511, 200.0, 'On_2', 'On_2', 'On_2'],
    ['On_2', 432.0, 0.511, 200.0, 'On_3', 'On_3', 'On_3'],
    ['On_3', 432.0, 0.511, 200.0, 'FullLoad', 'FullLoad', 'FullLoad'],
    ['Off_cold_1', 0.0, 0.0, 0.0, 'Off_cold_2', 'Ramp_Fast_Hot',
    'Ramp_Fast_Hot'],
    ['Off_cold_2', 0.0, 0.0, 0.0, 'Off_cold_3', 'Ramp_Fast_Hot',
    'Ramp_Fast_Hot'],
    ['Off_cold_3', 0.0, 0.0, 0.0, 'Off_cold_4', 'Ramp_Fast_Hot',
    'Ramp_Fast_Hot'],
    ['Off_cold_4', 0.0, 0.0, 0.0, 'Off_cold_5', 'Ramp_Fast_Hot',
    'Ramp_Fast_Hot'],
    ['Off_cold_5', 0.0, 0.0, 0.0, 'Off_cold_6', 'Ramp_Fast_Hot',
    'Ramp_Fast_Hot'],
    ['Off_cold_6', 0.0, 0.0, 0.0, 'Off_cold_7', 'Ramp_Hot_1',
    'Ramp_Hot_1'],
    ['Off_cold_7', 0.0, 0.0, 0.0, 'Off_cold_8', 'Ramp_Hot_1',
    'Ramp_Hot_1'],
    ['Off_cold_8', 0.0, 0.0, 0.0, 'Off_cold_9', 'Ramp_Hot_1',
    'Ramp_Hot_1'],
    ['Off_cold_9', 0.0, 0.0, 0.0, 'Off_cold_10', 'Ramp_Hot_1',
    'Ramp_Hot_1'],
    ['Off_cold_10', 0.0, 0.0, 0.0, 'Off_cold_11', 'Ramp_Hot_1',
    'Ramp_Hot_1'],
    ['Off_cold_11', 0.0, 0.0, 0.0, 'Off_cold_12', 'Ramp_Warm_1',
    'Ramp_Warm_1'],
    ['Off_cold_12', 0.0, 0.0, 0.0, 'Off_cold_13', 'Ramp_Warm_1',
    'Ramp_Warm_1'],
    ['Off_cold_13', 0.0, 0.0, 0.0, 'Off_cold_14', 'Ramp_Warm_1',
    'Ramp_Warm_1'],
    ['Off_cold_14', 0.0, 0.0, 0.0, 'Off_cold_15', 'Ramp_Warm_1',
    'Ramp_Warm_1'],
    ['Off_cold_15', 0.0, 0.0, 0.0, 'Off_cold_16', 'Ramp_Warm_1',
    'Ramp_Warm_1'],
    ['Off_cold_16', 0.0, 0.0, 0.0, 'Off_cold_17', 'Ramp_Warm_1',
    'Ramp_Warm_1'],
    ['Off_cold_17', 0.0, 0.0, 0.0, 'Off_cold_18', 'Ramp_Warm_1',
    'Ramp_Warm_1'],
    ['Off_cold_18', 0.0, 0.0, 0.0, 'Off_cold_19', 'Ramp_Warm_1',
    'Ramp_Warm_1'],
    ['Off_cold_19', 0.0, 0.0, 0.0, 'Off_cold_20', 'Ramp_Warm_1',
    'Ramp_Warm_1'],
    ['Off_cold_20', 0.0, 0.0, 0.0, 'Off_cold_21', 'Ramp_Warm_1',
    'Ramp_Warm_1'],
    ['Off_cold_21', 0.0, 0.0, 0.0, 'Off_cold_22', 'Ramp_Warm_1',
    'Ramp_Warm_1'],
    ['Off_cold_22', 0.0, 0.0, 0.0, 'Off_cold_23', 'Ramp_Warm_1',
    'Ramp_Warm_1'],
    ['Off_cold_23', 0.0, 0.0, 0.0, 'Off_cold_24', 'Ramp_Warm_1',
    'Ramp_Warm_1'],
    ['Off_cold_24', 0.0, 0.0, 0.0, 'Off_cold_25', 'Ramp_Warm_1',
    'Ramp_Warm_1'],
    ['Off_cold_25', 0.0, 0.0, 0.0, 'Off_cold_25', 'Ramp_Cold_1',
    'Ramp_Cold_1'],
    ['Ramp_Fast_Hot', 123.108, 0.26778, 2600.0, 'On_1', 'On_1',
    'On_1'],
    ['Ramp_Hot_1', 27.4485, 0.21952, 2600.0, 'Ramp_Hot_2',
    'Ramp_Hot_2', 'Ramp_Hot_2'],
    ['Ramp_Hot_2', 115.9185, 0.27581, 200.0, 'On_2', 'On_2', 'On_2'],
    ['Ramp_Warm_1', 30.025, 0.21487, 2600.0, 'Ramp_Warm_2',
    'Ramp_Warm_2', 'Ramp_Warm_2'],
    ['Ramp_Warm_2', 149.566, 0.25609, 200.0, 'On_2', 'On_2', 'On_2'],
    ['Ramp_Cold_1', 41.039, 0.21214, 2600.0, 'Ramp_Cold_2',
    'Ramp_Cold_2', 'Ramp_Cold_2'],
    ['Ramp_Cold_2', 83.954, 0.21215, 200.0, 'Ramp_Cold_3',
    'Ramp_Cold_3', 'Ramp_Cold_3'],
    ['Ramp_Cold_3', 95.526, 0.28455, 200.0, 'On_3', 'On_3', 'On_3']]
status_df = pd.DataFrame(status_table,
                            columns=["Status", "Load", "Efficiency", "StartupCost", "Off", "Min", "Full"]).set_index(
    "Status")


In [4]:
status_df

Unnamed: 0_level_0,Load,Efficiency,StartupCost,Off,Min,Full
Status,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
CoolDown,76.606,0.266,200.0,Off_cold_1,Off_cold_1,Off_cold_1
FullLoad,432.0,0.511,0.0,CoolDown,FullLoad,FullLoad
On_1,432.0,0.511,200.0,On_2,On_2,On_2
On_2,432.0,0.511,200.0,On_3,On_3,On_3
On_3,432.0,0.511,200.0,FullLoad,FullLoad,FullLoad
Off_cold_1,0.0,0.0,0.0,Off_cold_2,Ramp_Fast_Hot,Ramp_Fast_Hot
Off_cold_2,0.0,0.0,0.0,Off_cold_3,Ramp_Fast_Hot,Ramp_Fast_Hot
Off_cold_3,0.0,0.0,0.0,Off_cold_4,Ramp_Fast_Hot,Ramp_Fast_Hot
Off_cold_4,0.0,0.0,0.0,Off_cold_5,Ramp_Fast_Hot,Ramp_Fast_Hot
Off_cold_5,0.0,0.0,0.0,Off_cold_6,Ramp_Fast_Hot,Ramp_Fast_Hot


In [None]:
pricer = CCGT_Pricer(date, start_date, end_date, Maturity_Types.H, POWER_Indexes.UK, GAS_Indexes.NBP, CO2_Indexes.UKA, 0.511, 0.1859, 0.58, 432)

Error Secret 21080 - 400 : Bad Request on https://pwd.dts.corp.local/winauthwebservices/api/v1/secrets/21080
Secret not found in Secret Server. Trying in Env variables App ID, Password, Tenant ID


ValueError: Unable to get authority configuration for https://login.microsoftonline.com/TGP. Authority would typically be in a format of https://login.microsoftonline.com/your_tenant or https://tenant_name.ciamlogin.com or https://tenant_name.b2clogin.com/tenant.onmicrosoft.com/policy.  Also please double check your tenant name or GUID is correct.

: 

In [None]:
pricer.get_dispatch(status_df)
pricer.get_volas(type="historical", relative=False)
pricer.get_list_options(limit_time_to_expiry=1)
pricer.get_value()
result = pricer.get_result_table()