In [None]:
%matplotlib inline
import numpy as np
import pandas as pd
import random
import matplotlib
import matplotlib.pyplot as plt
from IPython import display
from time import sleep
from IPython.display import clear_output

class EnergyEnvironment:

    def __init__(self, t_lim=8760, data_link="data/train_data.pkl"):

        plt.ion()
        self.t = 0
        self.t_lim = t_lim
        self.is_ipython = 'inline' in matplotlib.get_backend()
        self.endogenous_state_vars = ['Month', 'Hour', 'Day Type', 'Consumption_kWh', 'Solar_Generation_W']
        self.pricing_state_vars = ['Price','Price_6h','Price_12h','Price_24h', 'kg_CO2/kWh']
        self.weather_state_vars = ['Temperature', 'Humidity','Diffuse_Radiation','Direct_Radiation']
        self.include_battery_charge = True
        self.state_variables = self.endogenous_state_vars + self.pricing_state_vars + self.weather_state_vars + ['Battery_Charge']
        self.current_state_vars = self.state_variables

        self.data = pd.read_pickle(data_link)

        self.costs = np.zeros(self.t_lim)
        self.costs_no_battery = np.zeros(self.t_lim)

        self.savings = np.zeros(self.t_lim)
        self.savings_accum = np.zeros(self.t_lim)
        self.reward_val = None
        self.state_val = None
        self.action_cur = None
        self.action_eff = None
        self.now_action = None

        self.action_cur_list = np.zeros(self.t_lim)
        self.action_eff_list = np.zeros(self.t_lim)
        self.action_now_list = np.zeros(self.t_lim)
        self.reward_val_list = np.zeros(self.t_lim)

        self.battery_E_max = 1     # MDoC
        self.battery_E_min = 0.1   # MDoD
        self.battery_charge = random.uniform(self.battery_E_min,self.battery_E_max)
        self.battery_capacity_nom_KWh = 6.4

        self.observation_space = self._get_observation_space()
        self.action_space = np.ones((1,2))
        self.action_space[0,0] = -1
        self.action_limits = (-1,1)

    def _get_observation_space(self):
        observation_space = np.ones((len(self.current_state_vars),2))
        for i, item in enumerate(self.current_state_vars):
            if item == 'Battery_Charge':
                observation_space[i,0] = self.battery_E_min
                observation_space[i,1] = self.battery_E_max
            else:
                observation_space[i,0] = self.data.min()[item]
                observation_space[i,1] = self.data.max()[item]

        return observation_space

    def drop_state_variables(self, var_list):
        for item in var_list:
            if item not in self.state_variables:
                print("Invalid state variable {}".format(item))
                pass
            else:
                if item == 'Battery_Charge':
                    self.include_battery_charge = True
                    self.current_state_vars.remove(item)
                else:
                    self.data.drop([item], axis=1)
                    self.current_state_vars.remove(item)

        self.observation_space = self._get_observation_space()

    def step(self, action, psi=10, smoothing=True):
        t = self.t

        self.action_cur = action
        self.action_cur_list[t] = action
        if t > 1:
            if smoothing:
                self.now_action = self.action_cur_list[t] - self.action_cur_list[t-1]
            else:
                self.now_action = self.action_cur_list[t]
        else:
            self.now_action = action

        self.action_now_list[t] = self.now_action
        action = self._change_battery_state(self.now_action)
        self.action_eff = action
        self.action_eff_list[t] = action

        state = self.data.iloc[t].values

        if self.include_battery_charge :
            state = np.concatenate((state, np.array([self.battery_charge])))

        self.state_val = state

        done = 1 if self.t == self.t_lim-1 else 0

        reward = self.reward(action, psi)
        self.reward_val_list[t] = reward
        self.t += 1

        return state, reward, done

    def calculate_cost(self, action):
        t = self.t
        net_consumption_KWh = self.data.iloc[t]['Consumption_kWh'] - self.data.iloc[t]['Solar_Generation_W']/1000 + action * self.battery_capacity_nom_KWh
        net_consumption_KWh = np.max([0, net_consumption_KWh])

        net_consumption_KWh_no_bat = self.data.iloc[t]['Consumption_kWh'] - self.data.iloc[t]['Solar_Generation_W']/1000
        net_consumption_KWh_no_bat = np.max([0, net_consumption_KWh_no_bat])

        electricity_price = self.data.iloc[t]['Price'] * net_consumption_KWh
        price_of_emmissions_per_Kg = 60/1000 # EUR/Kg CO2
        price_of_emmissions = self.data.iloc[t]['kg_CO2/kWh'] * net_consumption_KWh * price_of_emmissions_per_Kg

        electricity_price_no_bat = self.data.iloc[t]['Price'] * net_consumption_KWh_no_bat
        price_of_emmissions_no_bat = self.data.iloc[t]['kg_CO2/kWh'] * net_consumption_KWh_no_bat * price_of_emmissions_per_Kg

        To_be_paid = price_of_emmissions + electricity_price
        To_be_paid_no_bat = price_of_emmissions_no_bat + electricity_price_no_bat

        self.costs[t] = To_be_paid
        self.costs_no_battery[t] = To_be_paid_no_bat
        self.savings[t] = To_be_paid_no_bat - To_be_paid
        self.savings_accum[t] = np.sum(self.savings[:t])

        return To_be_paid, To_be_paid_no_bat

    def reward(self, action, psi):

        To_be_paid, To_be_paid_no_bat = self.calculate_cost(action)
        savings = To_be_paid_no_bat-To_be_paid

        self.reward_val = savings - psi*abs(abs(self.now_action)-abs(self.action_eff))

        return self.reward_val

    def reset(self):
        self.t = 0
        t = self.t
        self.battery_charge = random.uniform(self.battery_E_min,self.battery_E_max)
        state = self.data.iloc[t].values
        if self.include_battery_charge :
            state = np.concatenate((state, np.array([self.battery_charge])))
        return state

    def render(self, episode, render_mode='plot'):
        if render_mode == 'plot':
            plt.figure(1)
            plt.clf()
            plt.title('Working...')
            plt.xlabel('Hours')
            plt.ylabel('Costs [€]')
            plt.plot(self.costs[:self.t], label='Cost with battery')
            plt.plot(self.costs_no_battery[:self.t], label='Cost without battery')
            plt.plot(self.savings[:self.t], label='Savings')
            plt.plot(self.savings_accum[:self.t], label='Accumalated Savings')
            plt.legend(loc="upper left")

            '''
            plt.figure(2)
            plt.clf()
            plt.title('Actions...')
            plt.xlabel('Hours')
            plt.plot(self.action_eff_list[:self.t], label='Effective action')
            plt.plot(self.action_now_list[:self.t], label='Actual action')
            plt.plot(self.action_cur_list[:self.t], label='Agent action')
            plt.legend(loc="upper left")

            plt.figure(3)
            plt.clf()
            plt.title('Rewards...')
            plt.xlabel('Hours')
            plt.plot(self.reward_val_list[:self.t], label='Reward')
            plt.legend(loc="upper left")
            '''
            plt.pause(0.01)  # pause a bit so that plots are updated
            if self.is_ipython:
                if self.t != self.t_lim :
                    display.display(plt.gcf())
                    display.clear_output(wait=True)
                else:
                    display.display(plt.gcf())
        elif render_mode == 'text':
            clear_output(wait=True)
            print(f"Current state: {self.state_val}")
            print(f"Current action: {self.action_cur}")
            print(f"Effective action: {self.action_eff}")
            print(f"Current charge: {self.state_val[-1]}")
            print(f"rewards now: {self.reward_val}")
            print(f"total savings: {np.sum(self.savings[:self.t-1])}")
            print(f"Environment time: {self.t - 1}")
            print(f"Current episode: {episode}")
            sleep(.1)
        else:
            return

    def _change_battery_state(self, action):
        self.battery_charge += action
        delta_action = 0
        if self.battery_charge > 1:
            delta_action = 1 - self.battery_charge
            self.battery_charge = 1

        if self.battery_charge < 0.1:
            delta_action = 0.1 - self.battery_charge
            self.battery_charge = 0.1

        adjusted_action = action + delta_action
        return adjusted_action



In [None]:
pd.__version__

'1.5.2'