# Energy storage optimization with RL in MicroGrids

In the section below, you must run your methodology for solving the problem from start to finish :

In [None]:
pip install pymgrid

In [None]:
import pickle
from pymgrid import Microgrid

with open('data/building_1.pkl', 'rb') as f:
    building_1 = pickle.load(f)

with open('data/building_2.pkl', 'rb') as f:
    building_2 = pickle.load(f)
    
with open('data/building_3.pkl', 'rb') as f:
    building_3 = pickle.load(f)

buildings = [building_1, building_2, building_3]

## "Simple" Reinforcement Learning based approaches

**Libraries**

In [None]:
import time # Evaluate frugality
import json # Storing results
import DiscreteEnvironment as DiscreteEnvironment # Imposed Discrete Environment
import DiscreteEnvironment_modified as DiscreteEnvironment_modified # Imposed Discrete Environment
import Agent as Agent # Defined Agent class

**Agent & Environment Setup before your training**

In [None]:
building_environments = [
    DiscreteEnvironment_modified.Environment(env_config={'building':buildings[i]}) for i in range(3)
]
"""
Agent, potential Q-Table & other necessary setup code here 
"""


In [None]:
def init_qtable(env, nb_action, building):
    """
    Initialize the Q-table of the Q-learning.
    The state is an intersection between net load possibilities (from -(pv production) to load),
    and the state of charge of the battery (from soc_min and soc_max).

    -- Input :
        env : Enivronment object
        nb_action : Number of action can be taken by each state during the Q-learning [Integer]

    -- Ouput :
        Q : A dict of state containing the value of weight of each action for each state
            (Dict of Dict)
    """

    state = []
    Q = {}

    # --- Defining state possibilities ---------------------------
    for i in range(-int(env.parameters["PV_rated_power"] - 1), int(env.parameters["load"] + 2)):
        for j in np.arange(round(env.battery.soc_min, 1), round(env.battery.soc_max + 0.1, 1), 0.1):
            state.append((i, round(j, 1)))

    # --- Initialize Q(s, a) at zero ----------------------------
    for s in state:
        Q[s] = {}
        for a in range(nb_action):
            if building==3:
                if a==4:
                    Q[s][a] = -50
                elif a==6:
                    Q[s][a] = -30
                else:
                    Q[s][a] = 0
            else:
                Q[s][a] = 0

    return Q

def epsilon_decreasing_greedy(action, epsilon, nb_action):
    """
    Adding random aleas for the choice of actions

    -- Input :
        action : integer representing the action taken [Integer]
        epsilon : share of aleas to consider (biggest espsilon is and biggest part of
                  aleas is taken [Float]
        nb_action : Number of action can be taken by each state during the Q-learning [Integer]

    -- Output :
        action : integer representing the action taken [Integer]
        randomm : binary value to consider if a aleas has been taken for action choice [Integer]
    """
    p = np.random.random()

    if p < (1 - epsilon):
        randomm = 0
        return action, randomm

    else:
        randomm = 1
        return np.random.choice(nb_action), randomm
    
def max_dict(d):
    """
    Returning the tuple (action, val) maximizing the reward in the Q-table depending of a state.
    Reward correspond to the amount paid to answer the consumption (count negatively)
    => Maximizing a negative number = Minimizing its absolute value

    -- Input :
        d : dict of action and reward associate of a state [Dict]

    -- Output :
        max_key : action corresponding to the maximal reward [Integer]
        max_value : value of the maximal reward [Integer]
    """
    max_key = None
    max_val = float("-inf")

    for k, v in d.items():
        if v > max_val:
            max_val = v
            max_key = k

    return max_key, max_val

def update_epsilon(epsilon):
    """
    Update epsilon value (share of aleas in the choice of an action) to minimize it through iteration.

    -- Input :
        epsilon : share of aleas to consider (biggest espsilon is and biggest part of
                  aleas is taken [Float]

    -- Output :
        epsilon (updated) : share of aleas to consider (biggest espsilon is and biggest part of
                            aleas is taken [Float]
    """
    epsilon = epsilon - epsilon * 0.02

    if epsilon < 0.1:
        epsilon = 0.1

    return epsilon

def change_name_action(idx, building):
    """
    Print function
    """
    if building==3:
        if idx == 0:
            action_name = "PV + Charge + Export"
        elif idx == 5:
            action_name = "PV + Discharge + Import"
        elif idx == 2:
            action_name = "Import"
        elif idx == 3:
            action_name = "Full Export"
        elif idx == 4:
            action_name = "Genset"
        elif idx == 1:
            action_name = "Export/Import"
        elif idx == 6:
            action_name = "Genset"
    else:
        if idx == 0:
            action_name = "PV + Charge + Export"
        elif idx == 1:
            action_name = "PV + Discharge + Import"
        elif idx == 2:
            action_name = "Import"
        elif idx == 3:
            action_name = "Full Export"
        elif idx == 4:
            action_name = "Export/Import"
    
    return action_name

def print_welcome(idx):
    """
    Print function
    """
    if idx == 0:
        print("------------------------------------")
        print("|        WELCOME TO PYMGRID        |")
        print("------------------------------------")
    elif idx == 1:

        print("t -     STATE  -  ACTION - COST")
        print("================================")

In [None]:
def training_Q_Learning_DE(env, nb_action, building, horizon):

    # --- Defining parameters ----------------------------------
    Q = init_qtable(env.mg, nb_action, building)
    nb_state = len(Q)
    nb_episode = 15
    alpha = 0.1    #  Learning rate
    epsilon = 0.1  #  Aleas
    gamma = 0.99
    record_cost = []
    t0 = time.time()
    t = t0
    print_training = "Training Progressing .   "
    print_welcome(0)
    print("\n")

    for e in range(nb_episode + 1):

        # --- Initialize episode variables --------------------------
        episode_cost = 0
        env.reset()
        net_load = round(env.mg.load - env.mg.pv)
        soc = round(env.mg.battery.soc, 1)
        s = (net_load, soc)  # First state
        a = max_dict(Q[s])[0]  # First action
        a, randomm = epsilon_decreasing_greedy(a, epsilon, nb_action)  # Adding aleas in the first action

        # --- Q-learning accros horizon ------------------------------
        for i in range(horizon):

            # Run action choosen precedently
            status, reward, done, info = env.step(a)
            
            # Compute cost with the previous actions
            r = reward
            episode_cost = env.get_cost()

            # Update variables depending on the precedent action
            net_load = round(env.mg.load - env.mg.pv)
            soc = round(env.mg.battery.soc, 1)
            s_ = (net_load, soc)
            a_ = max_dict(Q[s_])[0]

            if i == horizon - 1:
                Q[s][a] += alpha * (r - Q[s][a])

            # Update reward depending on the action choosen
            else:
                old_Q = Q[s][a]  # Previous reward
                target = (r + gamma * Q[s_][a_])  # Target = reward of the previous action + expectation of reward of the last action
                td_error = target - Q[s][a]  # Difference of cost between two episode
                Q[s][a] = (1 - alpha) * Q[s][a] + alpha * td_error  # Update weight in the Q-table with the reward of the last action
            s, a = s_, a_
        epsilon = update_epsilon(epsilon)

    return Q

def testing_Q_Learning_DE(env, Q, horizon, building):

    # --- Initialize variables --------------------------
    env.reset()
    net_load = round(env.mg.load - env.mg.pv)
    soc = round(env.mg.battery.soc, 1)
    s = (net_load, soc)
    a = max_dict(Q[s])[0]
    total_cost = 0
    print_welcome(1)

    # --- Q-learning accros horizon ----------------------
    for i in range(horizon):

        # Run action choosen precedently
        action_name = change_name_action(a, building)        
        status, reward, done, info = env.step(a)
        cost = - reward
        total_cost = env.get_cost()

        # Print function
        #if i % 500 == 0 or i == horizon - 1:
        #print(i, " -", (int(net_load), soc), action_name, round(total_cost, 1), "€")

        #  Update variables depending on the last action
        net_load = round(env.mg.load - env.mg.pv)
        soc = round(env.mg.battery.soc, 1)

        #  Defining the next state and action corresponding
        s_ = (net_load, soc)
        a_ = max_dict(Q[s_])[0]
        s, a = s_, a_
        
    return total_cost

**Training of the agent**

In [None]:

train_start = time.process_time()

"""
Training code
"""
Q0_DE = training_Q_Learning_DE(building_environments[0], 5, 1, 8757)
Q1_DE = training_Q_Learning_DE(building_environments[1], 5, 2, 8757)
Q2_DE = training_Q_Learning_DE(building_environments[2], 7, 3, 8757)

train_end = time.process_time()



In [None]:
train_frugality = train_end - train_start
print(train_frugality)

**Test of the agent**

In [None]:
test_start = time.process_time()
total_cost = [0,0,0]

building_environments[0].reset(testing=True)
total_cost[0] = testing_Q_Learning_DE(building_environments[0], Q0_DE, 8757, 1)

building_environments[1].reset(testing=True)
total_cost[1] = testing_Q_Learning_DE(building_environments[1], Q1_DE, 8757, 2)

building_environments[2].reset(testing=True)
total_cost[2] = testing_Q_Learning_DE(building_environments[2], Q2_DE, 8757, 3)
    
test_end = time.process_time()

In [None]:
"""
Note :
* To make your work as reproductible as possible, have a full-greedy approach (no exploration) on the test buildings
* If your algorithm has some unavoidable randomness, consider running it for many loops and return a
  mean profitability and mean frugality
  
"""

test_start = time.process_time()
total_cost = [0,0,0]

for i,building_env in enumerate(building_environments):
    
    obs = building_env.reset(testing=True)
    done = False
    while not done:
        action = np.random.randint(building_env.action_space.n)
        obs, reward, done, info = building_env.step(action)
        total_cost[i]+=reward

test_end = time.process_time()

In [None]:
test_frugality = test_end - test_start
print(test_frugality)

**Store & Export Results in JSON format**

In [None]:
final_results = {
    "building_1_performance" : total_cost[0],
    "building_2_performance" : total_cost[1],
    "building_3_performance" : total_cost[2],
    "frugality" : train_frugality + test_frugality,
}
print(final_results)

In [None]:
team_name = 'team35'

with open('results/' + team_name + '.txt', 'w') as json_file:
    json.dump(final_results, json_file)

## Deep Reinforcement Learning based approache

**Libraries**

In [None]:
import time # Necessary to evaluate frugality
from pymgrid.Environments.pymgrid_cspla import MicroGridEnv # Imposed Environment
import numpy as np
import torch 
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np

## Import your favourite Deep Learning library for RL and other packages here

**Agent & Environment Setup**

In [None]:
"""
Below is an environment initialization without a Deep RL library, the code can vary depending on which library you 
use
"""
building_environments = [MicroGridEnv(env_config={'microgrid':buildings[i]}) for i in range(3)]


**Deploying the agent**

In [None]:
agent = Agent(gamma=0.99, epsilon=1.0, batch_size=128, nb_actions=5,
             eps_end=0.01, input_dims=[10], lr= 0.0001)

In [None]:
train_start = time.process_time()

nb_episode = 15

for idx, env in enumerate(building_environments):
    print(f"building {idx}")
    for i in range(nb_episode):
        score = 0
        done = False
        observation = env.reset()
        while not done:
            action = agent.choose_action(observation)
            observation_, reward, done, info = env.step(action)
            score -= reward
            agent.store_transition(observation, action, reward, observation_, done)
            agent.train()
            observation = observation_
        print(f"- episode : {i} | score : {score}")
train_end = time.process_time()
train_frugality = train_end - train_start

In [None]:
print(train_frugality)

**Test of the agent**

In [None]:
"""
Note :
* To make your work as reproductible as possible, have a full-greedy approach (no exploration) on the test buildings
* If your algorithm has some unavoidable randomness, consider running it for many loops and return a
mean profitability and mean frugality
"""

test_start = time.process_time()

test_start = time.process_time()
total_cost = [0,0,0]

for i,building_env in enumerate(building_environments):

    obs = building_env.reset(testing=True)
    done = False
    while not done:
        action = agent.choose_action(obs)
        obs, reward, done, info = building_env.step(action)
        total_cost[i]-=reward

test_end = time.process_time()

test_frugality = test_end - test_start

**Store & Export Results in JSON format**

In [None]:
final_results = {
    "building_1_performance" : total_cost[0],
    "building_2_performance" : total_cost[1],
    "building_3_performance" : total_cost[2],
    "frugality" : train_frugality + test_frugality,
}
print(final_results)

In [None]:
import json
with open('results/' + '.txt', 'w') as json_file:
    json.dump(final_results, json_file)