In [5]:
import random
import math
import numpy as np
import cv2
from random import choice
import gym
from gym import Env, spaces
import stable_baselines3
from stable_baselines3 import DQN, PPO, A2C
from stable_baselines3.common.env_checker import check_env
from stable_baselines3.common.env_util import make_vec_env

In [6]:
#### COLORS (BGR)
BLUE =  [255, 0,   0]
GREEN = [0,   255, 0]
RED =   [0,   0,   255]
WHITE = [255, 255, 255]
GREY =  [240, 240, 240]
BLACK = [0,   0,   0]

#### BUILDINGS
WASTELAND = 0
OFFICE    = 1
HOUSE     = 2

#### reset le programme quand il reste STOP cases vide
STOP = 10

In [7]:
print(choice([i for i in range(0,9) if i!= 2]))

4


In [38]:
# https://blog.paperspace.com/creating-custom-environments-openai-gym/
# https://towardsdatascience.com/creating-a-custom-openai-gym-environment-for-stock-trading-be532be3910e

class City(Env):
    def __init__(self, observation_shape = (5, 5), start = (3, 3)):
        super(City, self).__init__()
        
        self.HOUSE = HOUSE * max(observation_shape)
        self.OFFICE = OFFICE * max(observation_shape)
        
        self.observation_shape = observation_shape
        self.observation_space = spaces.Box(low=0, high=2, shape=(observation_shape[0],observation_shape[1]), dtype = int)#dtype=np.float16)
        self.start = start
        
        self.canvas_shape = 700, 700, 3 # width, height, color (BGR)
        self.canvas = np.ones(self.canvas_shape, dtype = np.uint8) * 0
        
        self.action_space = spaces.Discrete(2)
        
        # 
        self.is_placing_house = True
        
        # set the map
        self.map = np.ones(self.observation_shape, dtype = np.uint8) * WASTELAND
        self.offices = []
        self.houses = []
        self.adjacents_cells = {}
        self.reward = 0
        pass
    
    def reset(self, random_start = True):
        
        # reset the player's position in the middle of the map
        self.position = self.observation_shape[0] // 2, self.observation_shape[1] // 2
        

        self.is_placing_house = True
        
        # reset the map with WASTELAND
        self.map = np.ones(self.observation_shape, dtype = np.uint8) * WASTELAND
        self.offices = []
        self.houses = []
        self.adjacents_cells = {}
        self.reward = 0
        
        start_shape = len(self.start), len(self.start[0])
        
        # (re)place random houses and offices in the middle of the map
        if random_start : 
            maisonX=random.randrange((self.observation_shape[1] - start_shape[1]) // 2, (self.observation_shape[1] + start_shape[1]) // 2)
            maisonY=random.randrange((self.observation_shape[0] - start_shape[0]) // 2, (self.observation_shape[0] + start_shape[0]) // 2)
            oficeX=choice([i for i in range((self.observation_shape[1] - start_shape[1]) // 2, (self.observation_shape[1] + start_shape[1]) // 2) if i != maisonX])
            oficeY=choice([i for i in range((self.observation_shape[0] - start_shape[0]) // 2, (self.observation_shape[0] + start_shape[0]) // 2) if i != maisonY])
            
            for j in range(start_shape[0]):
                for i in range(start_shape[1]):
                    y = (self.observation_shape[1] - start_shape[1]) // 2 + j
                    x = (self.observation_shape[0] - start_shape[0]) // 2 + i
                    
                    self.map[y][x] = self.start[j][i]
                    if   self.map[y][x] == OFFICE : self.offices.append((y, x))
                    elif self.map[y][x] == HOUSE  : self.houses.append((y, x))
                    
                    self.delete_cell((y, x))
                    self.mark_adjacents_cells((y, x))
                    
        return self.map
    
    def __search_nearest_office(self, position):
        return int(min([math.dist(position, office) for office in self.offices]))

    
    def __search_nearest_house(self, position):
        return int(min([math.dist(position, house) for house in self.houses]))

        
    # test if a position is occupied
    def __is_free(self, position):
        return self.map[position] == WASTELAND
    
    def delete_cell(self, position):
        try : del self.adjacents_cells[position]
        except KeyError : pass
    
    def mark_cell(self, position):
        y, x = position
        if x < 0 or x >= self.observation_shape[0] : return
        if y < 0 or y >= self.observation_shape[1] : return
        if tuple(position) in self.houses : return
        if tuple(position) in self.offices : return
        try :
            self.adjacents_cells[tuple(position)] += 1
        except KeyError :
            self.adjacents_cells[tuple(position)] = 1
    
    def mark_adjacents_cells(self, position):
        y, x = position
        for position in [[y - 1, x - 1], [y - 1, x], [y - 1, x + 1], [y, x - 1], [y, x + 1], [y + 1, x - 1], [y + 1, x], [y + 1, x + 1]] :
            self.mark_cell(position)
        pass
    
    def __place(self, is_placing_house):
        
        if is_placing_house :
            # place the house
            self.houses.append(self.position)
            self.map[self.position] = HOUSE

            # calculate the reward
            #reward = 1/(math.sqrt(self.__search_nearest_office(self.position)**2))
            reward = self.__search_nearest_office(self.position)
        else :
            # place the office
            self.offices.append(self.position)
            self.map[self.position] = OFFICE
            
            #reward = 1/(math.sqrt(self.__search_nearest_house(self.position)**2))
            reward = self.__search_nearest_house(self.position)
        
        reward = reward ** 2
        reward = math.sqrt(reward)
        reward = 1 / reward
        
        self.delete_cell(self.position)
        self.mark_adjacents_cells(self.position)
        
        return reward
    
    # test if a position if out of bound
    def __is_oob(self, position):
        return not(0 <= position[0] < self.observation_shape[0]) \
            or not(0 <= position[1] < self.observation_shape[1])
    
    def select_random_cell(self):
        for position in self.adjacents_cells :
            if self.adjacents_cells[position] >= 2 :
                return position
        return self.adjacents_cells[0]
    
    def step(self, action):
        reward = 0
        
        self.position = self.select_random_cell()
        x0=self.position[0]
        y0=self.position[1]
    
        reward = self.__place(action) # 1 = HOUSE / 0 = OFFICE
        self.reward = reward
        self.draw_elements_on_canvas()
        
        
        if len(self.adjacents_cells) < STOP + 1 :
            self.position = 3, 3
            x0 = self.position[0]
            y0 = self.position[1]
            OBSMAP = self.getMap(self.map,x0,y0)
            return OBSMAP, reward, True, {}
        
        
        
        OBSMAP=self.getMap(self.map,x0,y0)
        
        return OBSMAP, reward, False, {}
    
    def getMap(self,mape,x,y):
        def pad_with(vector, pad_width, iaxis, kwargs):
            pad_value = kwargs.get('padder', 10)
            vector[:pad_width[0]] = pad_value
            vector[-pad_width[1]:] = pad_value
        mape=np.pad(mape, 2, pad_with, padder=0)
        x=x+2
        y=y+2
        
        return mape[x-2:x+3,y-2:y+3]
        
        
        
        
    def __draw_element_on_canvas(self, y, x, color):
        observation_width, observation_height = self.observation_shape
        canvas_width, canvas_height, _ = self.canvas_shape

        drawing_width = int(canvas_width / observation_width)
        drawing_height = int(canvas_height / observation_height)

        # fit element to the canvas
        for j in range(y * drawing_height, y * drawing_height + drawing_height):
            for i in range(x * drawing_width, x * drawing_width + drawing_width):
                try : self.canvas[i, j] = color
                except IndexError : pass
                
        for j in range(y * drawing_height, y * drawing_height + drawing_height):
            try : self.canvas[x * drawing_width, j] = GREY
            except IndexError : pass
            
            
        for i in range(x * drawing_width, x * drawing_width + drawing_width):
            try : self.canvas[i, y * drawing_height] = GREY
            except IndexError : pass
        pass

    def __draw_player_position(self, thickness = 3): # thickness must be odd 
        y, x = self.position
        thickness_range = range(- (thickness // 2), thickness // 2 + 1)
        
        observation_width, observation_height = self.observation_shape
        canvas_width, canvas_height, _ = self.canvas_shape

        drawing_width = int(canvas_width / observation_width)
        drawing_height = int(canvas_height / observation_height)
        
        for j in range(y * drawing_height, y * drawing_height + drawing_height):
            try :
                for t in thickness_range:
                    self.canvas[x * drawing_width + t, j] = BLACK
                    self.canvas[(x + 1) * drawing_width + t, j] = BLACK
            except IndexError : pass

        for i in range(x * drawing_width, x * drawing_width + drawing_width):
            try :
                for t in thickness_range:
                    self.canvas[i, y * drawing_height + t] = BLACK
                    self.canvas[i, (y + 1) * drawing_height + t] = BLACK
            except IndexError : pass
            
        pass
    
    def draw_elements_on_canvas(self):
        
        # draw each element of the map
        for y in range(len(self.map)):
            for x in range(len(self.map[0])):
                
                color = WHITE
                if   self.map[y, x] == OFFICE : color = BLUE
                elif self.map[y, x] == HOUSE  : color = RED
                
                self.__draw_element_on_canvas(y, x, color)
            pass
               
        # draw player's position
        self.__draw_player_position()
        pass
    
    def render(self, mode = "console"):
        if mode == "human" :
            cv2.putText(self.canvas, str(self.reward), (100,100), cv2.FONT_HERSHEY_SIMPLEX, 2, (0, 255, 0), 2, cv2.LINE_AA)
            cv2.imshow("", self.canvas)
            cv2.waitKey(1)
            return self.canvas
        if mode == "console" :
            print(self.position)
    
    def close(self):
        pass

In [39]:
#env = City((5, 5), start_shape = (3, 3))
#env = make_vec_env(lambda: env, n_envs = 1)

#model = DQN("MlpPolicy", env, verbose=1)
#model.learn(total_timesteps=100)
#model.save("test")

In [62]:
def to_text(n, max):
    result = ""
    for _ in range(len(str(n)), len(str(max))):
        result += "0"
    return result + str(n)

In [63]:
start = [
    [2, 0, 2, 2, 2],
    [2, 1, 2, 0, 2],
    [2, 0, 1, 0, 1],
    [0, 1, 2, 0, 2],
    [2, 2, 2, 2, 1],
]

steps = 2000
size = 100, 100

env = City(size, start=start)

In [64]:
# RANDOM ACTION

env.reset()

mean_reward = 0
sum_reward = 0
directory = r'.\random_action'

for i in range(steps):
    _, reward, done, _ = env.step(env.action_space.sample())
    image = env.render("human")
    cv2.imwrite(directory + '\\' + to_text(i, steps) + ".png", image)

    sum_reward += reward
    
    if done : env.reset()
mean_reward = sum_reward / steps

print(sum_reward)
print(mean_reward)

cv2.waitKey(0)
cv2.destroyAllWindows()

KeyboardInterrupt: 

In [72]:
# ONLY HOUSES

env.reset()

mean_reward = 0
sum_reward = 0
directory = r'.\only_houses'

for i in range(steps):
    _, reward, done, _ = env.step(1)
    image = env.render("human")
    cv2.imwrite(directory + '\\' + to_text(i, steps) + ".png", image)

    sum_reward += reward
    
    if done : env.reset()
mean_reward = sum_reward / steps

print(sum_reward)
print(mean_reward)

cv2.waitKey(0)
cv2.destroyAllWindows()

KeyboardInterrupt: 

In [76]:
##### images to video

import cv2
import numpy as np
import glob

img_array = []
# il faut mettre le chemin vers le fichier contenant toutes les images :
for filename in glob.glob(r'C:\Users\ignee\Documents\Apprentissage Par Renforcement\ProjetApprentissageRenforcement\random_action\*.png'):
    img = cv2.imread(filename)
    height, width, layers = img.shape
    size = (width,height)
    img_array.append(img)

out = cv2.VideoWriter('random_action.avi', cv2.VideoWriter_fourcc(*'DIVX'), 15, size)
 
for i in range(len(img_array)):
    out.write(img_array[i])
out.release()