# A Q-Learning implementation for PymGrid (class mode)

Don'f forget to add you own path to sys

In [2]:
import sys
sys.path.append('/Users/j0406142/Documents/Python/ploutos_git/')
import tqdm
sys.path.append('../')
from pymgrid import MicrogridGenerator as MG
#from RL_Algo.QLearning import *
#from RL_Algo.RandomPolicy import *
import numpy as np
import time
#from MicroGridEnv import *
import random 
import matplotlib.pyplot as plt

## Class MicroGridEnv:  Markov Decision Process modeling the microgrid dynamics.

In [3]:

class MicroGridEnv():
    """
    Markov Decision Process associated to the microgrid.

        Parameters
        ----------
            microgrid: microgrid, mandatory
                The controlled microgrid.
            random_seed: int, optional
                Seed to be used to generate the needed random numbers to size microgrids.
    
    """

    def __init__(self, microgrid, seed = 0):
        # Set seed
        np.random.seed(seed)
        # Microgrid
        self.mg = microgrid
        # State space
        self.observation_space = self.states()
        # Action space
        self.action_space = [0,1,2,3]
        # Number of states
        self.Ns = len(self.observation_space)
        # Number of actions
        self.Na = len(self.action_space)
        self.state = None
        self.round = None

        # Start the first round
        self.reset()

    # Transition function
    def transition(self):
        net_load = round(self.mg.load - self.mg.pv)
        soc = round(self.mg.battery.soc,1)
        s_ = (net_load, soc)  # next state
        return s_
            
    # Reward function
    def reward(self):
        return -self.mg.get_cost() - self.mg.penalty(0.5)
    
    def step(self, action):
        control_dict = self.get_action(action)
        _ = self.mg.run(control_dict)
        reward = self.reward()
        s_ = self.transition()
        self.state = s_
        done = self.round == self.mg.horizon
        self.round += 1
        return s_, reward, done, {}
    
        
    def reset(self):
        self.round = 1
        # Reseting microgrid
        self.mg.reset()
        # Building first state
        net_load = round(self.mg.load - self.mg.pv)
        soc = round(self.mg.battery.soc,1)
        self.state = (net_load, soc)

    # Building the observations_space from the forecast time series
    def states(self):
        observation_space = []
        mg = self.mg
        net_load = mg.forecast_load() - mg.forecast_pv()
        for i in range(int(net_load.min()-1),int(net_load.max()+2)):
            for j in np.arange(round(mg.battery.soc_min,1),round(mg.battery.soc_max+0.1,1),0.1):    
                j = round(j,1)
                observation_space.append((i,j)) 
        return observation_space
    
    # Mapping between action and the control_dict
    def get_action(self, action):
        """
        :param action: current action
        :return: control_dict : dicco of controls
        """
        mg = self.mg
        pv = mg.pv
        load = mg.load
        net_load = load - pv
        capa_to_charge = mg.battery.capa_to_charge
        p_charge_max = mg.battery.p_charge_max
        p_charge = max(0,min(-net_load, capa_to_charge, p_charge_max))
        
        capa_to_discharge = mg.battery.capa_to_discharge
        p_discharge_max = mg.battery.p_discharge_max
        p_discharge = max(0,min(net_load, capa_to_discharge, p_discharge_max))
    
        control_dict = {'pv_consummed': min(pv,load),
                        'battery_charge': 0,
                        'battery_discharge': 0,
                        'grid_import': 0,
                        'grid_export':0
                               }
        if action == 0:
            control_dict['battery_charge'] = p_charge*(p_charge > 0) + net_load*(p_charge <=0)  
            control_dict['grid_export'] = max(0,pv - min(pv,load) - p_charge)
        
        elif action == 1:
            control_dict['battery_discharge'] = p_discharge*(p_discharge > 0) + net_load*(p_discharge <=0)  
            control_dict['grid_import'] = max(0,load - min(pv,load) - p_discharge)
        
        elif action == 2:
            control_dict['grid_import'] = abs(net_load)
            
        elif action == 3:
            control_dict['grid_export'] = abs(net_load)
            
        return control_dict

## Q Learning Class

In [4]:
class QLearning:
    """
    Implementation of Q-learning algorithm with epsilon-greedy exploration

    If learning_rate is None; alpha(x,a) = 1/max(1, N(s,a))**alpha
    
    Parameters
        ----------
            env: environment modeled by an MDP.
    
    """
    
    def __init__(self, env, gamma, alpha=0.6, learning_rate=None, min_learning_rate=0.01, epsilon=0.99, epsilon_decay=0.9995,
                 epsilon_min=0.25, seed=42):
        self.env = env
        self.gamma = gamma
        self.alpha = alpha
        self.learning_rate = learning_rate
        self.min_learning_rate = min_learning_rate
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay
        self.epsilon_min = epsilon_min
        self.Q = np.zeros((env.Ns, env.Na)) # Numpy Array. States are indexed with integers
        self.Nsa = np.zeros((env.Ns, env.Na))
        self.state = env.reset()
        self.RS = np.random.RandomState(seed)

    def get_delta(self, r, x_i, a, y_i):
        """
        :param r: reward
        :param x_i: index of current state
        :param a: current action
        :param y_i: index of next state
        :return: 
        """
        max_q_y_a = self.Q[y_i, :].max()
        q_x_a = self.Q[x_i, a]

        return r + self.gamma*max_q_y_a - q_x_a

    def get_learning_rate(self, x, a):
        """
        :param x: current state
        :param a: current action
        :return: 
        """
        if self.learning_rate is None:
            return max(1.0/max(1.0, self.Nsa[x, a])**self.alpha, self.min_learning_rate)
        else:
            return max(self.learning_rate, self.min_learning_rate)

    def get_action(self, x_i):
        """
        :param x_i: index of current state
        :return: 
        """
        if self.RS.uniform(0, 1) < self.epsilon:
            # explore
            return random.choice(self.env.action_space)
        else:
            # exploit
            a = self.Q[x_i, :].argmax()
            return a

    def step(self):
        # Current state
        x = self.env.state
        x_i = self.env.observation_space.index(x)
        # Choose action
        a = self.get_action(x_i)

        # Learning rate
        alpha = self.get_learning_rate(x_i, a)

        # Take step
        y, reward, done, info = self.env.step(a) 
        r = reward
        y_i = self.env.observation_space.index(y)
        delta = self.get_delta(r, x_i, a, y_i)

        # Update
        self.Q[x_i, a] = self.Q[x_i, a] + alpha*delta

        self.Nsa[x_i, a] += 1
        
        if done:
            # print(x, observation, reward)
            self.epsilon = max(self.epsilon*self.epsilon_decay, self.epsilon_min)
            self.env.reset()
        return done

Launching the Q Learning algorithm on the created microgrid

In [9]:

mg_generator = MG.MicrogridGenerator(nb_microgrid=1)
mg_generator.generate_microgrid(verbose = False)
microgrid = mg_generator.microgrids[0]

# Itiniation of a MicroGridEnv
env = MicroGridEnv(microgrid = microgrid)

gamma = 0.9 # Discount factor
n_episodes = 10 # Number of episodes

"""
Q LEARNING
"""
# Initiation of a qlearning object
qlearning = QLearning(env, gamma=gamma, epsilon = 0.2)

print("----------------------------------------------")
print("     Training a Q Learning Policy            ")
print("----------------------------------------------")

outer = tqdm.tqdm(total = n_episodes, position=0)


for episode in range(n_episodes):
    outer.update(1)
    done = False
    while not done:
        done = qlearning.step()

        
print("-----------------------------------------------")        
print("              Q values & greedy policy         ")
print("-----------------------------------------------")

print("\nQ values (Q Learning);\n", qlearning.Q)
print("\nPolicy (Q Learning): ", np.argmax(qlearning.Q, axis=1))
       


  0%|          | 0/10 [00:00<?, ?it/s]

----------------------------------------------
     Training a Q Learning Policy            
----------------------------------------------


100%|██████████| 10/10 [00:02<00:00,  4.02it/s]

-----------------------------------------------
              Q values & greedy policy         
-----------------------------------------------

Q values (Q Learning);
 [[0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 ...
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]]

Policy (Q Learning):  [0 0 0 ... 0 0 0]
