In [None]:
import gymnasium as gym
import logging
import numpy as np
import pandas as pd
from gymnasium import spaces
from matplotlib import pyplot as plt
from stable_baselines3 import PPO
from stable_baselines3.common.callbacks import EvalCallback

h24: int = 24 * 12
min5: float = 1 / 12
week = h24 * 7
month = week * 4
full_period = month * 5

logging.basicConfig(
    level=logging.WARNING,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

In [None]:
class EnergySim:
    def __init__(self, power_series: pd.Series, max_step: int = None, max_24h: int = None,
                 energy_type: str = "production") -> None:
        self.current_energy: int = 0
        self.energy_series: list[float] = power_series * min5
        self.step_index: int = 0
        self.energy_type: str = energy_type
        self.max_step: int = int(max_step or max(self.energy_series))
        self.sliding_sum = self._get_sliding_sum()
        self.max_24h: int = int(max_24h or max(self.sliding_sum))

    def _get_sliding_sum(self):
        zeros_before = pd.Series(np.zeros(h24), index=range(-h24, 0))
        zeros_after = pd.Series(np.zeros(h24 * 3),
                                index=range(len(self.energy_series), len(self.energy_series) + h24 * 3))
        expanded_series = pd.concat([zeros_before, self.energy_series, zeros_after])
        sliding_sum = expanded_series.rolling(window=h24).sum().dropna().reset_index(drop=True)
        return sliding_sum

    def reset(self, seed=0):
        self.step_index: int = 0
        self.current_energy: int = 0

    def step(self) -> int:
        self.current_energy = self.energy_series[self.step_index]
        self.step_index += 1
        return int(self.current_energy)

    def get_energy(self) -> int:
        return int(self.current_energy)

    def get_energy_last_24h(self) -> int:
        return self.sliding_sum[self.step_index]

    def get_energy_next_24h(self) -> int:
        return self.sliding_sum[self.step_index + h24]

    def get_energy_24h_following_next_24h(self) -> int:
        return self.sliding_sum[self.step_index + h24 + h24]

In [None]:
class GridSim:
    def __init__(self, feed_in_max, feed_in_min, voltage_max, voltage_min, max_taken_from, energy_price_sell,
                 energy_price_buy, voltage_series):
        self.feed_in_max_known = feed_in_max
        self.feed_in_min = feed_in_min
        self.voltage_max = voltage_max
        self.voltage_min_known = voltage_min
        self.max_taken_from = max_taken_from
        self.energy_price_sell = energy_price_sell
        self.energy_price_buy = energy_price_buy
        self.current_feed_to_grid = 0.
        self.current_taken_from_grid = 0.
        self.voltage_series = voltage_series
        self.step_index = 0
        self.power_for_voltage = (
                (self.feed_in_max_known - self.feed_in_min)
                / (self.voltage_max - self.voltage_min_known)
        )

    def reset(self, seed=0):
        self.step_index: int = 0
        self.current_feed_to_grid: int = 0
        self.current_taken_from_grid: int = 0

    def step(self, energy_balance: int) -> int:
        self.current_feed_to_grid = 0
        self.current_taken_from_grid = 0
        self.step_index += 1
        if energy_balance > 0:  # Excess energy: try to feed into the grid
            self.current_feed_to_grid = min(energy_balance, int(self.get_grid_acceptance()))
            energy_balance -= self.current_feed_to_grid
        elif energy_balance < 0:  # Energy deficit: try to take from the grid
            self.current_taken_from_grid = min(-energy_balance, int(self.max_taken_from * min5))
            energy_balance += self.current_taken_from_grid

        return int(energy_balance)

    def get_grid_acceptance(self):
        current_voltage = self.voltage_series[self.step_index]
        grid_acceptance_capacity = self._map_voltage_to_power(current_voltage) * min5
        return grid_acceptance_capacity

    def _map_voltage_to_power(self, voltage):
        if voltage >= self.voltage_max:
            return self.feed_in_min

        power_acceptance = self.feed_in_min + (self.voltage_max - voltage) * self.power_for_voltage
        return power_acceptance

    def get_feed_to(self):
        return self.current_feed_to_grid

    def get_taken_from(self):
        return self.current_taken_from_grid

In [None]:
class BatterySim:
    def __init__(self, max_charge_rate: int, max_discharge_rate: int, capacity: int, battery_wear_rate: float,
                 efficiency: float = 0.95, current_charge: float = 0):
        self.max_charge_rate = max_charge_rate  # kW, max power for charging
        self.max_discharge_rate = max_discharge_rate  # kW, max power for discharging
        self.capacity = capacity  # kWh, max stored energy
        self.battery_wear_rate = battery_wear_rate  # Wear rate (unit-less)
        self.efficiency = efficiency  # Charging/discharging efficiency
        self.starting_charge = current_charge
        self.current_charge = current_charge  # kWh, current stored energy in the battery
        self.current_charge_rate = 0.0  # kW, power charged in this step
        self.current_discharge_rate = 0.0  # kW, power discharged in this step

    def reset(self, seed=0):
        self.current_charge: int = int(self.starting_charge)
        self.current_charge_rate: int = 0
        self.current_discharge_rate: int = 0

    def step(self, energy_balance: int) -> int:
        self.current_charge_rate = 0.0
        self.current_discharge_rate = 0.0

        if energy_balance < 0:
            self.current_discharge_rate = min(float(-energy_balance), self.get_allowed_discharge())
            energy_balance += self.current_discharge_rate
            self.current_charge = max(
                0.0,
                self.current_charge - (self.current_discharge_rate / self.efficiency)
            )
        elif energy_balance > 0:
            self.current_charge_rate = min(float(energy_balance), self.get_allowed_charge())
            energy_balance -= self.current_charge_rate
            self.current_charge = min(
                float(self.capacity),
                self.current_charge + (self.current_charge_rate * self.efficiency)
            )

        return int(energy_balance)

    def get_allowed_charge(self):
        return min(
            self.max_charge_rate * min5,
            (self.capacity - self.current_charge) / self.efficiency
        )

    def get_allowed_discharge(self):
        return min(
            self.max_discharge_rate * min5,
            self.current_charge * self.efficiency
        )

    def get_charge_rate(self) -> int:
        return int(self.current_charge_rate)

    def get_discharge_rate(self) -> int:
        return int(self.current_discharge_rate)

    def get_stored(self) -> int:
        return int(self.current_charge)



In [None]:
class InverterEnv(gym.Env):

    def __init__(
            self,
            prod_sim: EnergySim,
            cons_sim: EnergySim,
            batt_sim: BatterySim,
            grid_sim: GridSim,
            timestamps: pd.Series,
            max_steps: int = 1000,
    ):
        super(InverterEnv, self).__init__()

        self.prod_sim = prod_sim
        self.cons_sim = cons_sim
        self.batt_sim = batt_sim
        self.grid_sim = grid_sim
        self.timestamps = timestamps
        self.max_steps = max_steps

        self.action_space = spaces.Discrete(2)

        self.observation_space = spaces.Box(low=0, high=1, shape=(15,), dtype=np.float64)

        self.state = np.zeros(15)
        self.current_step = 0

        # Precompute sine and cosine values for each timestep
        self.precomputed_timesteps = self._precompute_timesteps()

        self.inv_factors = np.array([
            1.0 / self.prod_sim.max_step,
            1.0 / self.cons_sim.max_step,
            1.0 / self.batt_sim.max_charge_rate,
            1.0 / self.batt_sim.max_discharge_rate,
            1.0 / self.batt_sim.capacity,
            1.0 / self.grid_sim.feed_in_max_known * 2,
            1.0 / self.grid_sim.max_taken_from,
            1.0 / self.prod_sim.max_24h,
            1.0 / self.cons_sim.max_24h,
            1.0 / self.prod_sim.max_24h,
            1.0 / self.cons_sim.max_24h,
            1.0 / self.prod_sim.max_24h,
            1.0 / self.cons_sim.max_24h
        ])

    def _precompute_timesteps(self):
        timesteps = []
        for timestamp in self.timestamps:
            timestep = (timestamp.hour * 12 + timestamp.minute / 5) / 288 * 2 * np.pi
            sin_cos = (np.array([np.sin(timestep), np.cos(timestep)]) / 2) + 0.5
            timesteps.append(sin_cos)
        return np.array(timesteps)

    def _get_timestep(self):
        # Retrieve the precomputed value for the current step
        return self.precomputed_timesteps[self.current_step]

    def reset(
            self,
            seed=0,
            **kwargs
    ):
        self.state = np.zeros(15)
        self.current_step = 0
        self.batt_sim.reset()
        self.grid_sim.reset()
        self.prod_sim.reset()
        self.cons_sim.reset()
        return self.state, {}

    def step(
            self,
            action
    ):
        self.current_step += 1

        # Calculate energy balance (production - consumption)
        energy_balance = self.prod_sim.step() - self.cons_sim.step()
        logging.info(f'Step {self.current_step}: Calculated energy balance = {energy_balance:.2f}')

        # Apply action logic (energy management mode)
        if action == 1:  # Max-self-consumption (Mode A)
            _ = self._manage_energy_mode_a(energy_balance)
        else:  # Full-feed-to-grid (Mode B)
            _ = self._manage_energy_mode_b(energy_balance)

        self._update_state()
        reward, _, _, _ = self.calc_reward()
        done = False if self.current_step <= self.max_steps else True
        truncated = False

        return self.state, reward, done, truncated, {}

    def _manage_energy_mode_a(
            self,
            energy_balance
    ):
        """
        Manage energy in Mode A (max-self-consumption): prioritize balance, battery, then grid.
        """

        energy_balance_after_batt = self.batt_sim.step(energy_balance)  # Battery handles excess or deficit
        energy_balance_after_grid = self.grid_sim.step(energy_balance_after_batt)  # Grid handles remaining energy
        logging.info(
            f'Mode A: Balance after battery = {energy_balance_after_batt:.2f}, '
            f'after grid = {energy_balance_after_grid:.2f}')
        return energy_balance_after_grid

    def _manage_energy_mode_b(
            self,
            energy_balance
    ):
        """
        Manage energy in Mode B (full-feed-to-grid): prioritize balance, grid, then battery.
        """
        energy_balance -= self.grid_sim.get_grid_acceptance()
        energy_balance_after_batt = self.batt_sim.step(energy_balance)  # Battery handles excess or deficit
        if energy_balance_after_batt >= 0:
            energy_balance_after_grid = self.grid_sim.step(self.grid_sim.get_grid_acceptance())
        else:
            energy_balance_after_batt = self.grid_sim.get_grid_acceptance() + energy_balance_after_batt
            energy_balance_after_grid = self.grid_sim.step(energy_balance_after_batt)
        logging.info(
            f'Mode A: Balance after battery = {energy_balance_after_batt:.2f}, '
            f'after grid = {energy_balance_after_grid:.2f}')
        return energy_balance_after_grid

    def _update_state(self):
        values = np.array([
            self.prod_sim.get_energy(),
            self.cons_sim.get_energy(),
            self.batt_sim.get_charge_rate(),
            self.batt_sim.get_discharge_rate(),
            self.batt_sim.get_stored(),
            self.grid_sim.get_feed_to(),
            self.grid_sim.get_taken_from(),
            self.prod_sim.get_energy_last_24h(),
            self.cons_sim.get_energy_last_24h(),
            self.prod_sim.get_energy_next_24h(),
            self.cons_sim.get_energy_next_24h(),
            self.prod_sim.get_energy_24h_following_next_24h(),
            self.cons_sim.get_energy_24h_following_next_24h()
        ])

        # Perform element-wise multiplication
        scaled_values = values * self.inv_factors

        # Include additional timestep values if needed
        self.state = np.concatenate([scaled_values, self._get_timestep()])

    def calc_reward(
            self
    ):
        feed_in_reward = self.grid_sim.get_feed_to() * self.grid_sim.energy_price_sell

        energy_deficit = self.cons_sim.get_energy() - (self.prod_sim.get_energy() + self.batt_sim.max_discharge_rate)

        avoidable_grid_purchase = max(0, self.grid_sim.get_taken_from() - max(0., energy_deficit))
        purchase_penalty = avoidable_grid_purchase * self.grid_sim.energy_price_buy

        battery_wear_penalty = ((
                                        self.batt_sim.get_charge_rate() + self.batt_sim.get_discharge_rate()) *
                                self.batt_sim.battery_wear_rate)

        total_reward = feed_in_reward - purchase_penalty - battery_wear_penalty

        return total_reward, feed_in_reward, purchase_penalty, battery_wear_penalty


In [None]:
class InverterEnvSimple(InverterEnv):

    def __init__(
            self,
            prod_sim: EnergySim,
            cons_sim: EnergySim,
            batt_sim: BatterySim,
            grid_sim: GridSim,
            timestamps: pd.Series,
            max_steps: int = week,

    ):
        super().__init__(
            prod_sim,
            cons_sim,
            batt_sim,
            grid_sim,
            timestamps,
            max_steps

        )

        self.observation_space = spaces.Box(low=0, high=1, shape=(7,), dtype=np.float64)

        # Initialize internal states
        self.state = np.zeros(7)

    def reset(
            self,
            seed=0,
            **kwargs
    ):
        """Resets the environment for a new episode.
        """
        super().reset()
        self.state = np.zeros(7)

        return self.state, {}

    def step(
            self,
            action
    ):
        """
        Executes one time step in the environment based on the action taken.

        Args:
            action (int): 0 (Mode B) for full-feed-to-grid or 1 (Mode A) for max-self-consumption.

        Returns:
            tuple: observation (state), reward, done (bool), and additional info (empty dict).
        """
        self.state, reward, done, truncated, _ = super().step(action)
        #         self.state= np.array([np.sin(2 * np.pi *self.state[-1]),np.cos(2 * np.pi *self.state[-1])])
        self.state = np.array([self.state[4], *self.state[-6:]])
        return self.state, reward, done, truncated, {}



In [None]:
complete_series_csv = '/kaggle/input/simulation-inputs/complete_series.csv'
df = pd.read_csv(complete_series_csv, parse_dates=['timestamp'])
prod_sim = EnergySim(max_step=7200, power_series=df.production_w)
cons_sim = EnergySim(max_step=6000, power_series=df.consumption_w, energy_type="consumption")
battery_wear_rate = 0
batt_sim = BatterySim(
    max_charge_rate=5000,
    max_discharge_rate=5000,
    capacity=10000,
    battery_wear_rate=battery_wear_rate,
    current_charge=10000
)
grid_sim = GridSim(
    feed_in_max=3500,
    feed_in_min=0,
    voltage_max=250,
    voltage_min=230,
    max_taken_from=6000,
    energy_price_sell=.1 / 1000,
    energy_price_buy=.8 / 1000,
    voltage_series=df.grid_voltage
)

env = InverterEnv(
    prod_sim=prod_sim,
    cons_sim=cons_sim,
    batt_sim=batt_sim,
    grid_sim=grid_sim,
    timestamps=df.timestamp,
    max_steps=week,
)

In [None]:
def test_plot(env, steps=300, model=None):
    state_history = []
    obs, _ = env.reset()
    for _ in range(steps):
        action = 1
        if model:
            action, _ = model.predict(obs, deterministic=True)
        obs, reward, done, _, _ = env.step(action)

        state = list(env.state[:])
        state = state + list(env.calc_reward())
        state_history.append(state)
        if done:
            break
    state_df = pd.DataFrame(
        state_history,
        columns=[
            'production_energy',
            'consumption_energy',
            'battery_charged_energy',
            'battery_discharged_energy',
            'battery_stored_energy',
            'grid_feed_energy',
            'grid_taken_energy',
            'production_energy_24_last',
            'consumption_energy_24_last',
            'production_energy_24_next',
            'consumption_energy_24_next',
            'production_energy_24_next_skip',
            'consumption_energy_24_next_skip',
            'timestep_sin',
            'timestep_cos',
            'reward',
            'feed_in_reward',
            'purchase_penalty',
            'battery_wear_penalty'
        ]
    )

    print(
        f"{sum(state_df.reward):.1f}="
        f"{sum(state_df.feed_in_reward):.1f}"
        f"-{sum(state_df.purchase_penalty) :.1f}"
        f"-{sum(state_df.battery_wear_penalty):.1f}"
    )
    state_df.reward = state_df.reward * 40
    state_df = state_df[[
        #         'production_energy',
        #         'consumption_energy',
        #         'battery_charged_energy',
        #         'battery_discharged_energy',
        'battery_stored_energy',
        #         'grid_feed_energy',
        #         'grid_taken_energy',
        #             'production_energy_24_last',
        #             'consumption_energy_24_last',
        'production_energy_24_next',
        'consumption_energy_24_next',
        'production_energy_24_next_skip',
        'consumption_energy_24_next_skip',
        'timestep_sin',
        'timestep_cos',
        'reward'
    ]]
    plt.figure(figsize=(12, 8))
    for column in state_df.columns:
        plt.plot(state_df[column], label=column)
    plt.xlabel('Time Step')
    plt.ylabel('State Value')
    plt.legend()
    plt.title('State Evolution over Time Steps')
    plt.show()


In [None]:
class ConservativeModel:
    def predict(self, obs, *args, **kwargs):
        action = 1
        return action, {}


class GreedyModel:
    def predict(self, obs, *args, **kwargs):
        action = 0
        return action, {}


class SimpleModel:
    def __init__(self, lowerbound, upperbound):
        self.lowerbound = lowerbound
        self.upperbound = upperbound

    def predict(self, obs, *args, **kwargs):
        sin, cos = (obs[-2:] - .5) * 2
        timestep = np.arctan2(sin, cos) / np.pi / 2 * 288
        timestep = int(timestep if timestep > 0 else timestep + 288)
        action = 0 if timestep > self.lowerbound and timestep < self.upperbound else 1
        return action, {}

In [None]:
env.max_steps = full_period

test_plot(
    env,
    full_period,
#     week,
    #     GreedyModel(),
    #     ConservativeModel(),
    SimpleModel(60, 210),
)

In [None]:
model = PPO.load(
    "/kaggle/input/ppo-inverter/ppo_inverter_best_15",
#         "/kaggle/working/logs/best_model/best_model.zip", 
    env=env
)
test_plot(
    env,
    full_period,
#     week,
    model
)

In [None]:
# check_env(env)
# model = PPO(
#     "MlpPolicy", 
#     env, 
#     verbose=1,
#     ent_coef=0.01, 
#     learning_rate=0.0003, 
#     batch_size=2048,
# )

In [None]:
# model = PPO.load("ppo_inverter", env=env,verbose=1)
# env.reset()
# model.learn(total_timesteps=1000000)
# model.save("ppo_inverter")

In [None]:
# env.max_steps=10000
# model = PPO.load(
#     "ppo_inverter", 
#     env=SubprocVecEnv([lambda: env] * 20),
#     verbose=1
# )
# model.learn(total_timesteps=5000000)
# model.save("ppo_inverter")

In [None]:
env.max_steps = full_period

# multi_env=SubprocVecEnv([lambda: env] * 20)
# train_env=multi_env
train_env = env

eval_callback = EvalCallback(
    train_env,
    best_model_save_path="./logs/best_model",
    log_path="./logs/results",
    eval_freq=full_period * 5,
    deterministic=True,
    render=False,
    verbose=1
)

model = PPO.load(
    "/kaggle/input/ppo-inverter/ppo_inverter_best_15",
    env=train_env,
    verbose=0
)

model.learn(
    total_timesteps=full_period * 5*100,
    callback=eval_callback
)

In [None]:
# model.save("ppo_inverter")

In [None]:
model = PPO.load(
#     "/kaggle/input/ppo-inverter/ppo_inverter_best_15",
        "/kaggle/working/logs/best_model/best_model.zip", 
    env=env
)
test_plot(
    env,
    full_period,
#     week,
    model
)

In [None]:
# model = PPO.load(
#     "/kaggle/input/ppo-inverter/ppo_inverter_best_17",
#     #     "./logs/best_model", 
#     env=env
# )
# test_plot(
#     env,
#     full_period,
#     model
# )