In [104]:
import numpy as np
import pandas as pd
from pathlib import Path
import matplotlib.pyplot as plt
from scipy.interpolate import interp1d


In [105]:
# Use LaTeX for text rendering
plt.rcParams.update({
    "text.usetex": True,
    "font.family": "serif",
    "font.serif": ["Computer Modern Roman"],
    "axes.labelsize": 12,
    "axes.titlesize": 18,
    "font.size": 12,
    "legend.fontsize": 12,
    "xtick.labelsize": 12,
    "ytick.labelsize": 12,
})

## Parameters

In [106]:
data_folder = "./Data"

## Methods - Functions

In [None]:
# given in system definition + the following
wind_production_data = [30 for hour in range(1, 25)]
demand_data = [50 if hour < 7 or hour > 18 else 100 for hour in range(1, 25)]
electricity_sell_price_data = [1 for hour in range(1, 25)]
electricity_buy_price_data = [3 for hour in range(1, 25)]



In [61]:
states = {
    "wind_production": wind_production_data,
    "reservoirs_level": {
        "V1": [0 for hour in range(1, 25)],
        "V2": [0 for hour in range(1, 25)],
        "V3": [0 for hour in range(1, 25)],
    }
}

# actions
actions = {
    "sell": [0 for hour in range(1, 25)],
    "buy" : [0 for hour in range(1, 25)]
}

actions["sell"][0] = 2

def action(t):
    global sell, buy
    actions["sell"][t] = min(wind_production_data[t], demand_data[t])
    actions["buy"][t] = max(0, demand_data[t] - wind_production_data[t])

def reward(t):
    return electricity_sell_price_data[t]*actions["sell"][t] + electricity_buy_price_data[t]*actions["buy"][t]

for t in range(24):
    
    action(t)

    print(f"Hour {t}: Sell = {actions['sell'][t]}, Buy = {actions['buy'][t]}, Reward = {reward(t)}")

Hour 0: Sell = 30, Buy = 20, Reward = 90
Hour 1: Sell = 30, Buy = 20, Reward = 90
Hour 2: Sell = 30, Buy = 20, Reward = 90
Hour 3: Sell = 30, Buy = 20, Reward = 90
Hour 4: Sell = 30, Buy = 20, Reward = 90
Hour 5: Sell = 30, Buy = 20, Reward = 90
Hour 6: Sell = 30, Buy = 70, Reward = 240
Hour 7: Sell = 30, Buy = 70, Reward = 240
Hour 8: Sell = 30, Buy = 70, Reward = 240
Hour 9: Sell = 30, Buy = 70, Reward = 240
Hour 10: Sell = 30, Buy = 70, Reward = 240
Hour 11: Sell = 30, Buy = 70, Reward = 240
Hour 12: Sell = 30, Buy = 70, Reward = 240
Hour 13: Sell = 30, Buy = 70, Reward = 240
Hour 14: Sell = 30, Buy = 70, Reward = 240
Hour 15: Sell = 30, Buy = 70, Reward = 240
Hour 16: Sell = 30, Buy = 70, Reward = 240
Hour 17: Sell = 30, Buy = 70, Reward = 240
Hour 18: Sell = 30, Buy = 20, Reward = 90
Hour 19: Sell = 30, Buy = 20, Reward = 90
Hour 20: Sell = 30, Buy = 20, Reward = 90
Hour 21: Sell = 30, Buy = 20, Reward = 90
Hour 22: Sell = 30, Buy = 20, Reward = 90
Hour 23: Sell = 30, Buy = 20, Re

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as patches


class GridWorld:
    def __init__(self, filename, reward=None):
        if reward is None:
            reward = {0: -0.04, 1: 1.0, 2: -1.0, 3: np.NaN}
        self.map = np.array(
            [[0, 0, 0, 1],
            [0, 3, 0, 2],
            [0, 0, 0, 0]]
        )
        self.num_rows = self.map.shape[0]
        self.num_cols = self.map.shape[1]
        self.num_states = self.num_rows * self.num_cols
        self.num_actions = 4
        self.reward = reward
        self.reward_function = self.get_reward_function()
        self.transition_model = self.get_transition_model()

    def get_state_from_pos(self, pos):
        return pos[0] * self.num_cols + pos[1]

    def get_pos_from_state(self, state):
        return state // self.num_cols, state % self.num_cols
    
    def get_reward_function(self):
        reward_table = np.zeros(self.num_states)
        for r in range(self.num_rows):
            for c in range(self.num_cols):
                s = self.get_state_from_pos((r, c))
                reward_table[s] = self.reward[self.map[r, c]]
        return reward_table
    
    def get_transition_model(self, random_rate=0.2):
        transition_model = np.zeros((self.num_states, self.num_actions, self.num_states))
        for r in range(self.num_rows):
            for c in range(self.num_cols):
                s = self.get_state_from_pos((r, c))
                neighbor_s = np.zeros(self.num_actions)
                if self.map[r, c] == 0:
                    for a in range(self.num_actions):
                        new_r, new_c = r, c
                        if a == 0:
                            new_r = max(r - 1, 0)
                        elif a == 1:
                            new_c = min(c + 1, self.num_cols - 1)
                        elif a == 2:
                            new_r = min(r + 1, self.num_rows - 1)
                        elif a == 3:
                            new_c = max(c - 1, 0)
                        if self.map[new_r, new_c] == 3:
                            new_r, new_c = r, c
                        s_prime = self.get_state_from_pos((new_r, new_c))
                        neighbor_s[a] = s_prime
                else:
                    neighbor_s = np.ones(self.num_actions) * s
                for a in range(self.num_actions):
                    transition_model[s, a, int(neighbor_s[a])] += 1 - random_rate
                    transition_model[s, a, int(neighbor_s[(a + 1) % self.num_actions])] += random_rate / 2.0
                    transition_model[s, a, int(neighbor_s[(a - 1) % self.num_actions])] += random_rate / 2.0
        return transition_model
    
    def generate_random_policy(self):
        return np.random.randint(self.num_actions, size=self.num_states)
    
    def execute_policy(self, policy, start_pos=(2, 0)):
        s = self.get_state_from_pos(start_pos)
        total_reward = 0
        state_history = [s]
        while True:
            temp = self.transition_model[s, policy[s]]
            s_prime = np.random.choice(self.num_states, p=temp)
            state_history.append(s_prime)
            r = self.reward_function[s_prime]
            total_reward += r
            s = s_prime
            if r == 1 or r == -1:
                break
        return total_reward

In [42]:
env = GridWorld("gridworld.txt")

random_policy = env.generate_random_policy()
rewards = []
for _ in range(1000):
    total_reward = env.execute_policy(random_policy)
    rewards.append(total_reward)

hist = plt.hist(rewards, bins=50, edgecolor="black")

KeyboardInterrupt: 

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as patches


class MDP_HydroWind:
    def __init__(self):
        reward = {0: -0.04, 1: 1.0, 2: -1.0, 3: np.NaN}

        self.num_states = self.num_rows * self.num_cols
        self.num_actions = 4
        self.reward = reward
        self.reward_function = self.get_reward_function()
        self.transition_model = self.get_transition_model()

    def get_state_from_pos(self, pos):
        return pos[0] * self.num_cols + pos[1]

    def get_pos_from_state(self, state):
        return state // self.num_cols, state % self.num_cols
    
    def get_reward_function(self):
        reward_table = np.zeros(self.num_states)
        for r in range(self.num_rows):
            for c in range(self.num_cols):
                s = self.get_state_from_pos((r, c))
                reward_table[s] = self.reward[self.map[r, c]]
        return reward_table
    
    def get_transition_model(self, random_rate=0.2):
        transition_model = np.zeros((self.num_states, self.num_actions, self.num_states))
        for r in range(self.num_rows):
            for c in range(self.num_cols):
                s = self.get_state_from_pos((r, c))
                neighbor_s = np.zeros(self.num_actions)
                if self.map[r, c] == 0:
                    for a in range(self.num_actions):
                        new_r, new_c = r, c
                        if a == 0:
                            new_r = max(r - 1, 0)
                        elif a == 1:
                            new_c = min(c + 1, self.num_cols - 1)
                        elif a == 2:
                            new_r = min(r + 1, self.num_rows - 1)
                        elif a == 3:
                            new_c = max(c - 1, 0)
                        if self.map[new_r, new_c] == 3:
                            new_r, new_c = r, c
                        s_prime = self.get_state_from_pos((new_r, new_c))
                        neighbor_s[a] = s_prime
                else:
                    neighbor_s = np.ones(self.num_actions) * s
                for a in range(self.num_actions):
                    transition_model[s, a, int(neighbor_s[a])] += 1 - random_rate
                    transition_model[s, a, int(neighbor_s[(a + 1) % self.num_actions])] += random_rate / 2.0
                    transition_model[s, a, int(neighbor_s[(a - 1) % self.num_actions])] += random_rate / 2.0
        return transition_model
    
    def generate_random_policy(self):
        return np.random.randint(self.num_actions, size=self.num_states)
    
    def execute_policy(self, policy, start_pos=(2, 0)):
        s = self.get_state_from_pos(start_pos)
        total_reward = 0
        state_history = [s]
        while True:
            temp = self.transition_model[s, policy[s]]
            s_prime = np.random.choice(self.num_states, p=temp)
            state_history.append(s_prime)
            r = self.reward_function[s_prime]
            total_reward += r
            s = s_prime
            if r == 1 or r == -1:
                break
        return total_reward