## Submission Notebook Template 

<h3> <font color='red'> WARNING : </font>  </h3>

<font color='red'> No matter which approach you've chosen, you need to re-install any custom packages you had to install to make your code work ! </font>

<b> Install your packages below: </b>

In [1]:
!pip install git+https://github.com/Total-RD/pymgrid/
    
import json
import os
import pickle
import sys
import time

import matplotlib.pyplot as plt
import numpy as np
import pymgrid

In the section below, you must run your methodology for solving the problem from start to finish :

In [2]:
"""
The buildings mentionned below are specific to the hackathon and are not available in this repo.
You can replace them with any MicroGrid object generated from pymgrid
"""

with open("building_1.pkl", "rb") as f:
    building_1 = pickle.load(f)
    building_1.train_test_split()

with open("building_2.pkl", "rb") as f:
    building_2 = pickle.load(f)
    building_2.train_test_split()

with open("building_3.pkl", "rb") as f:
    building_3 = pickle.load(f)
    building_3.train_test_split()

buildings = [building_1, building_2, building_3]

<h2> Evaluation for "Simple" Reinforcement Learning based approaches <h2>

 <font color='red'> <b> Be careful, the rewards returned by the Gym environment are negative ! Don't forget to multiply by -1 the total reward as the Profitability you will need to submit needs to be positive ! </b> </font>

<b> 1) Import all used libraries and scripts here </b>

In [3]:
import DiscreteEnvironment as DiscreteEnvironment # Imposed Discrete Environment

<b> 2) Agent & Environment Setup before your training </b>

In [4]:
"""
Environment initialisation
"""
building_environments = []
for i, building in enumerate(buildings):
    env_config = {"building": building}
    building_environments.append(DiscreteEnvironment.Environment(env_config))
    
"""
Q-Table initialization
"""
def init_qtable(env, nb_action):
    state = []
    Q = {}

    for i in range(
        -int(env.mg.parameters["PV_rated_power"] - 1),
        int(env.mg.parameters["load"] + 2),
    ):
        for j in np.arange(
            round(env.mg.battery.soc_min, 1),
            round(env.mg.battery.soc_max + 0.1, 1),
            0.1,
        ):
            j = round(j, 1)
            state.append((i, j))

    # Initialize Q(s,a) at zero
    for s in state:

        Q[s] = {}

        for a in range(nb_action):

            Q[s][a] = 0

    return Q

"""
Useful functions, epsilon_decreasing, max_dict, updated_epsilon
"""
def espilon_decreasing_greedy(action, epsilon, nb_action):

    p = np.random.random()

    if p < (1 - epsilon):
        randomm = 0
        return action, randomm

    else:
        randomm = 1
        return np.random.choice(nb_action), randomm
    
def max_dict(d):

    max_key = None
    max_val = float("-inf")

    for k, v in d.items():

        if v > max_val:

            max_val = v
            max_key = k

    return max_key, max_val

def update_epsilon(epsilon):

    epsilon = epsilon - epsilon * 0.02

    if epsilon < 0.1:

        epsilon = 0.1

    return epsilon

In [5]:
"""
Training Q_learning algo
"""
def training_Q_Learning(
    building_environment,
    horizon,
    nb_episode=100,
    epsilon=0.99,
    alpha=0.1,
    gamma=0.99,
    max_actions=7,
):

    nb_action = building_environment.action_space.n

    Q = init_qtable(building_environment, nb_action)
    nb_state = len(Q)

    print_training = "Training Progressing ."

    for e in range(nb_episode + 1):

        value_print = (
            "\r" + print_training + "Episode " + str(e) + "/" + str(nb_episode)
        )
        sys.stdout.write(value_print)
        sys.stdout.flush()

        building_environment.reset(testing = False)

        state = building_environment.state
        action = max_dict(Q[state])[0]
        action, randomm = espilon_decreasing_greedy(action, epsilon, nb_action)

        for i in range(horizon):

            state_, reward, done, info = building_environment.step(action)

            action_ = max_dict(Q[state_])[0]

            if i == horizon - 1:

                Q[state][action] += alpha * (reward - Q[state][action])

            else:

                old_Q = Q[state][action]
                target = reward + gamma * Q[state_][action_]
                td_error = target - Q[state][action]
                Q[state][action] = (1 - alpha) * Q[state][action] + alpha * td_error

            state, action = state_, action_

        epsilon = update_epsilon(epsilon)

    return Q

"""
Function that trains Qlearners over buildings
"""
def train_buildings(model_params_res):

    QStrat = []
    for i, building_environment in enumerate(building_environments):
        print(f"\n -- Training for building {i+1} --")
        horizon = building_environment.mg._data_length - 2
        Q = training_Q_Learning(
            building_environment,
            horizon,
            nb_episode=model_params_res["nb_episode"],
            epsilon=model_params_res["epsilon"],
            alpha=model_params_res["alpha"],
            gamma=model_params_res["gamma"],
            max_actions=model_params_res["max_actions"],
        )

        QStrat.append(Q)

    return QStrat

In [6]:
def get_optimal_and_rule_based(building_environment,Q):
    
    #optimal policy
    action = max_dict(Q[building_environment.state])[0]
    
    #rule based
    building = building_environment.mg
    building_data = building.get_updated_values()
    
    load = building_data['load']
    pv = building_data['pv']
    capa_to_charge = building_data['capa_to_charge']
    capa_to_dischare = building_data['capa_to_discharge']

    p_disc = max(0, min(load-pv, capa_to_dischare, building.battery.p_discharge_max))
    p_char = max(0, min(pv-load, capa_to_charge, building.battery.p_charge_max))

    status = building_data['grid_status']
    
    if load - pv >=  0:
        control_dict = {'battery_charge': 0,
                    'battery_discharge': p_disc,
                    'grid_import': 0,
                    'grid_export':0,
                    'pv_consummed': min(pv, load),
                    'genset': max(0, load-pv-p_disc),
               }

    if load - pv <  0:
        control_dict = {'battery_charge': p_char,
                            'battery_discharge': 0,
                            'grid_import': 0,
                            'grid_export': 0,#abs(min(load-pv,0)),
                            'pv_consummed': min(pv, load+p_char),
                            'genset': 0,
                       }
        
    return status, action, control_dict

In [7]:
#perform a step with optimal policy if connected of a rule if not
def super_step(building_environment, grid_status, action, control_dict):
    if grid_status == 1:
        state, reward, done, info = building_environment.step(action)
    else: 
        test0 = building_environment.reward
        building_environment.mg.run(control_dict)
        state, reward = building_environment.transition(), building_environment.get_reward()
    
    return state, reward

In [8]:
"""
Function that applies the optimal policy on a building
"""
def testing_Q_Learning(building_environment, Q):

    building_environment.reset(testing=True)  #
    horizon = building_environment.mg._data_length - 2  #

    grid_status, action, control_dict =get_optimal_and_rule_based(building_environment, Q)
    
    cost = []
    total_cost = 0

    for i in range(horizon):

        state, reward = super_step(building_environment, grid_status, action, control_dict)
    
        cost.append(-reward)
        total_cost += -reward
        
        grid_status, action, control_dict = get_optimal_and_rule_based(building_environment, Q)
        
    return total_cost, cost

"""
Function that tests our qlearners over buildings
"""
def test_buildings(model_params_res, QStrat):

    total_costs = []
    costs = []
    for building_environment, Q in zip(building_environments, QStrat):

        total_cost, cost = testing_Q_Learning(building_environment, Q)
        total_costs.append(total_cost)

    return model_params_res, total_costs

<b> 3) Training of the agent </b>

In [9]:
#you can set here the number of trainings you want to get a mean perf
n_trainings = 10

In [None]:
nb_episode = 30
epsilon = 0.99
alpha = 0.1
gamma = 0.99
max_actions = 5

model_params_res = {
    "name": "classic",
    "nb_episode": nb_episode,
    "epsilon": epsilon,
    "alpha": alpha,
    "gamma": gamma,
    "max_actions": max_actions,
}

QStrats = []

train_start = time.process_time()

for k in range(n_trainings):
    print("\n" + "Training number " + str(k))
    QStrat = train_buildings(model_params_res)
    QStrats.append(QStrat)

train_end = time.process_time()


Training number 0

 -- Training for building 1 --
Training Progressing .Episode 30/30
 -- Training for building 2 --
Training Progressing .Episode 30/30
 -- Training for building 3 --
Training Progressing .Episode 30/30
Training number 1

 -- Training for building 1 --
Training Progressing .Episode 30/30
 -- Training for building 2 --
Training Progressing .Episode 30/30
 -- Training for building 3 --
Training Progressing .Episode 30/30
Training number 2

 -- Training for building 1 --
Training Progressing .Episode 30/30
 -- Training for building 2 --
Training Progressing .Episode 30/30
 -- Training for building 3 --
Training Progressing .Episode 30/30
Training number 3

 -- Training for building 1 --
Training Progressing .Episode 30/30
 -- Training for building 2 --
Training Progressing .Episode 30/30
 -- Training for building 3 --
Training Progressing .Episode 30/30
Training number 4

 -- Training for building 1 --
Training Progressing .Episode 30/30
 -- Training for building 2 --
Tr

In [None]:
#train_frugality = train_end - train_start
mean_train_frugality = (train_end - train_start)/n_trainings

<b> 4) Test of the agent </b>

In [None]:
batch_total_costs = []

test_start = time.process_time()

for k in range(n_trainings):
    print(k)
    #model_params_res, total_costs = test_buildings(model_params_res, QStrats)
    model_params_res, total_costs = test_buildings(model_params_res, QStrats[k])
    
    batch_total_costs.append(total_costs)
    
test_end = time.process_time()

In [None]:
#test_frugality = test_end - test_start
mean_test_frugality = (test_end - test_start)/n_trainings

<b> 5) Store & Export Results in JSON format </b>

In [None]:
final_results = {
    "building_1_performance" : np.mean([tot[0] for tot in batch_total_costs]),
    "building_2_performance" : np.mean([tot[1] for tot in batch_total_costs]),
    "building_3_performance" : np.mean([tot[2] for tot in batch_total_costs]),
    "frugality" : mean_train_frugality + mean_test_frugality,
}
print(final_results)