In [12]:
!pip install Pillow



In [1]:
import gym 
from gym import Env
from gym.spaces import Discrete, Box, Dict, Tuple, MultiBinary, MultiDiscrete 
from gym.utils import seeding
import numpy as np
import random
import os
from stable_baselines3 import PPO, DQN, A2C, HER
from stable_baselines3.common.vec_env import VecFrameStack
from stable_baselines3.common.evaluation import evaluate_policy

from typing import Final, Any
import math
from shapely.geometry.polygon import Polygon

import PIL.ImageDraw as ImageDraw
import PIL.Image as Image
import cv2

In [2]:
class ActivVisionEnv(Env):
    def __init__(self, num_objects: int = 3,
                 simulation_frequency: float = 5,
                 width: int = 5000, height: int = 2000, max_velocity_player: int = 200, view: int = 78,
                 img_format: float = (1920 / 1080)):
        # Actions we can take, nothing, left, right, up, down, turnCam right, turn Cam left
        
        
        # consts
        self._object_min_size: Final = 60  # in mm
        self._object_max_size: Final = 120  # in mm
        self._object_max_z_velocity: Final = 20  # in mm
        self._object_max_velocity: Final = 200  # in mm/s
        self._camera_height: Final = 390  # in mm
        self._camera_max_angle: Final = 20  # in degree
        self._velocity_player_per_step = 50  # max velocity change at each cycle
        self._camera_per_step = 1  # max camera angle change at each cycle
        self._player_max_z = width + 1
        self._player_min_z = 1
        self._player_pos_y = height - self._camera_height
        self.timer = 3000
        self._random_force_enabled = False
        self._object_max_rnd_force_p_sec = 10 #in 1/percent from max
        
        # params
        self._num_objects = num_objects
        self._simulation_frequency = simulation_frequency
        self._width = width
        self._height = height
        self._max_velocity_player = max_velocity_player
        self._view = view
        self._img_format = img_format
        self._height_view = (1 / img_format) * view
        
        # Actions we can take, down, stay, up
        self.action_space = Discrete(5)
        # postition and velocities
        self.observation_space = Box(0,1, shape=(5 + num_objects * 6,))
        # Set start state
        self.state = []
        
        # viewer
        self.viewer = None
        
        for _ in range(0, 5 + 6 * num_objects):
            self.state.append(0)
            
        # states for player and objects as dictionary
        self.player = {}
        self._objects = []
        
        #for drawing
        self._last_points = 0
        
        self.reset()
        
    def _get_normalized_state(self):
        self.state[0] = self.player["pos_x"] / self._width
        self.state[1] = self.player["pos_z"] / self._player_max_z
        self.state[2] = (self.player["vel_x"] / (2 * self._max_velocity_player)) + 0.5
        self.state[3] = (self.player["vel_z"] / (2 * self._max_velocity_player)) + 0.5
        self.state[4] = (self.player["angle"] / (2 * self._camera_max_angle)) + 0.5


        for i in range(0, self._num_objects):

            self.state[5 + 6 * i] = self._objects[i]["pos_x"] / self._width
            self.state[6 + 6 * i] = self._objects[i]["pos_y"] / self._height
            self.state[7 + 6 * i] = (self._objects[i]["size"] - self._object_min_size) / (self._object_max_size - self._object_min_size)
            self.state[8 + 6 * i] = (self._objects[i]["vel_x"] / (2 * self._object_max_velocity)) + 0.5
            self.state[9 + 6 * i] = (self._objects[i]["vel_y"] / (2 * self._object_max_velocity)) + 0.5
            self.state[10 + 6 * i] = (self._objects[i]["vel_z"] / (2 * self._object_max_z_velocity)) + 0.5

        return self.state
        
    def step(self, action):
        # actions:
        # 0 go right
        # 1 go left
        
        # 2 turn cam right
        # 3 turn cam left
        
        # 4 do mothing
        
        # --- go front
        # --- go back

        if action == 0:
            if self.player["vel_x"] < self._max_velocity_player:
                self.player["vel_x"] += self._velocity_player_per_step
            if self.player["vel_x"] > self._max_velocity_player:
                self.player["vel_x"] = self._max_velocity_player

        if action == 1:
            if self.player["vel_x"] > -self._max_velocity_player:
                self.player["vel_x"] -= self._velocity_player_per_step
            if self.player["vel_x"] < -self._max_velocity_player:
                self.player["vel_x"] = -self._max_velocity_player
        """
        if action == 2:
            if self.player["vel_z"] < self._max_velocity_player:
                self.player["vel_z"] += self._velocity_player_per_step
            if self.player["vel_z"] > self._max_velocity_player:
                self.player["vel_z"] = self._max_velocity_player

        if action == 3:
            if self.player["vel_z"] > -self._max_velocity_player:
                self.player["vel_z"] -= self._velocity_player_per_step
            if self.player["vel_z"] < -self._max_velocity_player:
                self.player["vel_z"] = -self._max_velocity_player

        """

        if action == 2:
            if self.player["angle"] < self._camera_max_angle:
                self.player["angle"] += self._camera_per_step
            if self.player["angle"] > self._camera_max_angle:
                self.player["angle"] = self._camera_max_angle

        if action == 3:
            if self.player["angle"] > -self._camera_max_angle:
                self.player["angle"] -= self._camera_per_step
            if self.player["angle"] < -self._camera_max_angle:
                self.player["angle"] = -self._camera_max_angle


        # calc next frame
        self.progress_simulation()
        reward = self._add_points()     
        
        
        # Reduce timer by one step
        self.timer -= 1 
        
        # Calculate reward
        reward = self._add_points()        
        reward *= (1 - (abs(self.player["angle"] / self._camera_max_angle)) * (1 / self._num_objects))
        
        # reward -= abs(self.player["angle"] / self._camera_max_angle) * 0.05
        
        
        """
        if self.player["angle"] > self._camera_per_step and action == 2:
            reward -= (1/self._camera_max_angle+1)
        if self.player["angle"] > self._camera_per_step and action == 3:
            reward += 0.5 * (1/self._camera_max_angle+1)
            
        if self.player["angle"] < -self._camera_per_step and action == 3:
            reward -= (1/self._camera_max_angle+1)
        if self.player["angle"] < -self._camera_per_step and action == 2:
            reward += 0.5 * (1/self._camera_max_angle+1)
        """
            
        
        
        # Check if shower is done
        if self.timer <= 0: 
            done = True
        else:
            done = False
        
        
        # Set placeholder for info
        info = {}
        
        self._get_normalized_state()
        
        # Return step information
        self._last_points = reward
        return np.asarray(self.state), reward, done, info

    
    def progress_simulation(self):
        seconds_passed = 1 / self._simulation_frequency

        # move player
        self.player["pos_x"] = self.player["pos_x"] + seconds_passed * self.player["vel_x"]
        self.player["pos_z"] = self.player["pos_z"] + seconds_passed * self.player["vel_z"]

        # push in constraints
        if self.player["pos_x"] < 0:
            self.player["pos_x"] = 0
        if self.player["pos_x"] > self._width:
            self.player["pos_x"] = self._width
        if self.player["pos_z"] < self._player_min_z:
            self.player["pos_z"] = self._player_min_z
        if self.player["pos_z"] > self._player_max_z:
            self.player["pos_z"] = self._player_max_z

        # move objects
        for i in range(0, self._num_objects):
            self._objects[i]["pos_x"] = self._objects[i]["pos_x"] + seconds_passed * self._objects[i]["vel_x"]
            self._objects[i]["pos_y"] = self._objects[i]["pos_y"] + seconds_passed * self._objects[i]["vel_y"]
            self._objects[i]["size"] = self._objects[i]["size"] + seconds_passed * self._objects[i]["vel_z"]

        # collision with wall
        for i in range(0, self._num_objects):
            if self._objects[i]["pos_x"] - self._objects[i]["size"] / 2 <= 0:
                self._objects[i]["vel_x"] = -self._objects[i]["vel_x"]
                self._objects[i]["pos_x"] = self._objects[i]["size"] / 2 + 1
                self._objects[i]["vel_z"] = 0

            if self._objects[i]["pos_x"] + self._objects[i]["size"] / 2 >= self._width:
                self._objects[i]["vel_x"] = -self._objects[i]["vel_x"]
                self._objects[i]["pos_x"] = self._width - self._objects[i]["size"] / 2 - 1
                self._objects[i]["vel_z"] = 0

            if self._objects[i]["pos_y"] - self._objects[i]["size"] / 2 <= 0:
                self._objects[i]["vel_y"] = -self._objects[i]["vel_y"]
                self._objects[i]["pos_y"] = self._objects[i]["size"] / 2 + 1
                self._objects[i]["vel_z"] = 0

            if self._objects[i]["pos_y"] + self._objects[i]["size"] / 2 >= self._height:
                self._objects[i]["vel_y"] = -self._objects[i]["vel_y"]
                self._objects[i]["pos_y"] = self._height - self._objects[i]["size"] / 2 - 1
                self._objects[i]["vel_z"] = 0

            if self._objects[i]["size"] <= self._object_min_size:
                self._objects[i]["vel_z"] = -self._objects[i]["vel_z"]
                self._objects[i]["size"] = self._object_min_size + 1

            if self._objects[i]["size"] >= self._object_max_size:
                self._objects[i]["vel_z"] = -self._objects[i]["vel_z"]
                self._objects[i]["size"] = self._object_max_size - 1

        # object collision
        for i in range(0, self._num_objects):
            for j in range(i + 1, self._num_objects):
                if i == j:
                    continue
                polygon_i = Polygon([(self._objects[i]["pos_x"] - self._objects[i]["size"] / 2,
                                      self._objects[i]["pos_y"] - self._objects[i]["size"] / 2),
                                     (self._objects[i]["pos_x"] + self._objects[i]["size"] / 2,
                                      self._objects[i]["pos_y"] - self._objects[i]["size"] / 2),
                                     (self._objects[i]["pos_x"] - self._objects[i]["size"] / 2,
                                      self._objects[i]["pos_y"] + self._objects[i]["size"] / 2),
                                     (self._objects[i]["pos_x"] + self._objects[i]["size"] / 2,
                                      self._objects[i]["pos_y"] + self._objects[i]["size"] / 2)])

                polygon_j = Polygon([(self._objects[j]["pos_x"] - self._objects[j]["size"] / 2,
                                      self._objects[j]["pos_y"] - self._objects[j]["size"] / 2),
                                     (self._objects[j]["pos_x"] + self._objects[j]["size"] / 2,
                                      self._objects[j]["pos_y"] - self._objects[j]["size"] / 2),
                                     (self._objects[j]["pos_x"] - self._objects[j]["size"] / 2,
                                      self._objects[j]["pos_y"] + self._objects[j]["size"] / 2),
                                     (self._objects[j]["pos_x"] + self._objects[j]["size"] / 2,
                                      self._objects[j]["pos_y"] + self._objects[j]["size"] / 2)])

                if polygon_i.intersects(polygon_j):
                    v_x = self._objects[i]["vel_x"]
                    self._objects[i]["vel_x"] = self._objects[j]["vel_x"]
                    self._objects[j]["vel_x"] = v_x

                    v_y = self._objects[i]["vel_y"]
                    self._objects[i]["vel_y"] = self._objects[j]["vel_y"]
                    self._objects[j]["vel_y"] = v_y

                    v_z = self._objects[i]["vel_z"]
                    self._objects[i]["vel_z"] = self._objects[j]["vel_z"]
                    self._objects[j]["vel_z"] = v_z
                    
        if self._random_force_enabled:
            # apply force
            for i in range(0, self._num_objects):
                dvx = random.randint(-self._object_max_velocity, self._object_max_velocity) / \
                      (self._simulation_frequency * self._object_max_rnd_force_p_sec)
                dvy = random.randint(-self._object_max_velocity, self._object_max_velocity) / \
                      (self._simulation_frequency * self._object_max_rnd_force_p_sec)
                dvz = random.randint(-self._object_max_z_velocity, self._object_max_z_velocity) / \
                      (self._simulation_frequency * self._object_max_rnd_force_p_sec)

                self._objects[i]["vel_x"] += dvx
                self._objects[i]["vel_y"] += dvy
                self._objects[i]["vel_z"] += dvz

                # push it in the constraints
                if self._objects[i]["vel_x"] < -self._object_max_velocity:
                    self._objects[i]["vel_x"] = -self._object_max_velocity

                if self._objects[i]["vel_x"] > self._object_max_velocity:
                    self._objects[i]["vel_x"] = self._object_max_velocity

                if self._objects[i]["vel_y"] < -self._object_max_velocity:
                    self._objects[i]["vel_y"] = -self._object_max_velocity

                if self._objects[i]["vel_y"] > self._object_max_velocity:
                    self._objects[i]["vel_y"] = self._object_max_velocity

                if self._objects[i]["vel_z"] < -self._object_max_z_velocity:
                    self._objects[i]["vel_z"] = -self._object_max_z_velocity

                if self._objects[i]["vel_z"] > self._object_max_z_velocity:
                    self._objects[i]["vel_z"] = self._object_max_z_velocity
    
    
    def _add_points(self):
        corners_view = self._get_view()
        polygon = Polygon([corners_view["left_top"], corners_view["right_top"],
                           corners_view["right_bot"], corners_view["left_bot"]])

        intersections_sum = 0
        intersections_num = 0

        area_view = polygon.area
        for i in range(0, self._num_objects):
            size = self._objects[i]["size"]
            obj_x = self._objects[i]["pos_x"]
            obj_y = self._objects[i]["pos_y"]
            polygon_object = Polygon([
                (round(obj_x - size / 2), round(obj_y - size / 2)),
                (round(obj_x + size / 2), round(obj_y - size / 2)),
                (round(obj_x + size / 2), round(obj_y + size / 2)),
                (round(obj_x - size / 2), round(obj_y + size / 2))
            ])
            if area_view > 1:
                intersection = (polygon.intersection(polygon_object)).area
                if intersection > (size/4):
                    intersections_sum += intersection
                    intersections_num += 1

        return intersections_num
   

    
    
    def _get_view(self) -> Any:
        """
        calculates the four corner points of the view of the camera in the big canvas in points in mm from top left
        :return: dict of the four points
        """

        # calc left , right border
        right_border = self.player["pos_x"] + self.player["pos_z"] * \
                       math.tan((self._view / 2 - self.player["angle"]) * (math.pi / 180))
        left_border = self.player["pos_x"] - self.player["pos_z"] * \
                      math.tan((self._view / 2 + self.player["angle"]) * (math.pi / 180))

        # calc top and bot borders, thinking of distortion
        left_height_diff = math.tan((self._height_view / 2) * (math.pi / 180)) * \
                           (self.player["pos_z"] / math.sin((self._view / 2 - self.player["angle"]) * (math.pi / 180)))

        right_height_diff = math.tan((self._height_view / 2) * (math.pi / 180)) * \
                            (self.player["pos_z"] / math.sin((self._view / 2 + self.player["angle"]) * (math.pi / 180)))

        right_top = self.player["pos_y"] - right_height_diff
        right_bot = self.player["pos_y"] + right_height_diff
        left_top = self.player["pos_y"] - left_height_diff
        left_bot = self.player["pos_y"] + left_height_diff
        
        left_border = round(left_border)
        left_top = round(left_top)
        right_border = round(right_border)
        right_top = round(right_top)
        left_bot = round(left_bot)
        right_bot = round(right_bot)
        

        return {
            "left_top": (left_border, left_top),
            "right_top": (right_border, right_top),
            "left_bot": (left_border, left_bot),
            "right_bot": (right_border, right_bot)
        }
    
    
    def render(self, mode): 
        image = Image.new("RGB", (self._width, self._height))
        draw = ImageDraw.Draw(image)

        corners_view = self._get_view()
        points = (corners_view["left_top"], corners_view["right_top"], corners_view["right_bot"],
                  corners_view["left_bot"], corners_view["left_top"])
        
        point_1_view = (round(self.player["pos_x"]) - 20, round(self.player["pos_y"]) - 20)
        point_2_view = (round(self.player["pos_x"]) + 20, round(self.player["pos_y"]) + 20)
        draw.ellipse([point_1_view, point_2_view], fill="white")
        draw.line(points, fill="white", width=20)
        colors = [(255, 0, 0), (0, 255, 0), (0, 0, 255), (120, 120, 0), (0, 120, 120), (120, 0, 120)]
        for i in range(0, self._num_objects):
            point_1 = (round(self._objects[i]["pos_x"]) - 20, round(self._objects[i]["pos_y"]) - 20)
            point_2 = (round(self._objects[i]["pos_x"]) + 20, round(self._objects[i]["pos_y"]) + 20)
            draw.ellipse([point_1, point_2], fill=colors[i % len(colors)])
            size = self._objects[i]["size"]
            left_top_x = self._objects[i]["pos_x"]
            left_top_y = self._objects[i]["pos_y"]

            poly = [(round(left_top_x - size / 2), round(left_top_y - size / 2)),
                    (round(left_top_x + size / 2), round(left_top_y - size / 2)),
                    (round(left_top_x + size / 2), round(left_top_y + size / 2)),
                    (round(left_top_x - size / 2), round(left_top_y + size / 2)),
                    (round(left_top_x - size / 2), round(left_top_y - size / 2))]
            draw.line(poly, fill=colors[i % len(colors)], width=20)

        open_cv_image = np.array(image)
        open_cv_image = open_cv_image[:, :, ::-1].copy()
        open_cv_image = cv2.resize(open_cv_image, (900, round(640 / self._img_format)))
        font = cv2.FONT_HERSHEY_SIMPLEX
        text = str("%.2f" % round(self._last_points, 2))
        cv2.putText(open_cv_image,text,(10,50), font, 1, (0, 0, 255), 2, cv2.LINE_AA)
        cv2.imshow("simulation", open_cv_image)
        cv2.waitKey(1)
        
      

    def close(self):
        if self.viewer:
            self.viewer.close()
            self.viewer = None
    
    def reset(self):
        self.points = 0.0
        self.timer = 3000
           
        self.player = {
            "pos_x": self._width / 2 * 1.0,
            "pos_y": self._height - self._camera_height * 1.0,
            "pos_z": 1200.0,
            "vel_x": 0.0,
            "vel_z": 0.0,
            "angle": 0
        }
        self._objects = []
        for i in range(0, self._num_objects):
            dict_to_append = {
                "pos_x": (self._width / 2) + (i - math.floor(self._num_objects / 2)) * self._object_max_size,
                "pos_y": (self._height / 2) + i * self._object_max_size,
                "vel_x": random.randint(-self._object_max_velocity, self._object_max_velocity) * 0.5,
                "vel_y": random.randint(-self._object_max_velocity, self._object_max_velocity) * 0.3,
                "vel_z": random.randint(-self._object_max_z_velocity, self._object_max_z_velocity) * 0.3,
                "size": random.randint(self._object_min_size, self._object_max_size) * 1.0
            }
            self._objects.append(dict_to_append)
            
        self._get_normalized_state()
        return np.asarray(self.state, dtype=np.float32)
    
        
        # state:
        # 0 pos player x
        # 1 pos player z
        # 2 vel player x
        # 3 vel player z
        # 4 angle

        # 5 pos obj 1 x
        # 6 pos obj 1 y
        # 7 pos obj 1 z
        # 8 vel obj 1 x
        # 9 vel obj 1 y
        # 10 vel obj 1 z
        # the 5-10 in repeat for every object
        
        
        

    


In [3]:
env=ActivVisionEnv()

In [20]:
env.observation_space.sample()

array([0.38226798, 0.53483295, 0.25878283, 0.01773131, 0.99714977,
       0.13238326, 0.10229607, 0.4958193 , 0.3336368 , 0.26561204,
       0.9914827 , 0.22469492, 0.36096227, 0.53730017, 0.9778157 ,
       0.75629646, 0.0514673 , 0.6150094 , 0.02086553, 0.13781987,
       0.11339293, 0.8398807 , 0.6960859 ], dtype=float32)

In [5]:
(env.reset())

array([0.5       , 0.23995201, 0.5       , 0.5       , 0.5       ,
       0.476     , 0.5       , 0.2       , 0.4125    , 0.35075   ,
       0.4775    , 0.5       , 0.56      , 0.78333336, 0.46625   ,
       0.45875   , 0.365     , 0.524     , 0.62      , 0.48333332,
       0.69125   , 0.37625   , 0.395     ], dtype=float32)

In [6]:
Box(0,1, shape=(5 + 3 * 6,)).sample()

array([0.7224686 , 0.10055874, 0.09142943, 0.7147802 , 0.26198882,
       0.42549172, 0.05038687, 0.4537625 , 0.9451006 , 0.0349783 ,
       0.6177249 , 0.8583874 , 0.29039767, 0.24891557, 0.08741511,
       0.45354038, 0.84802467, 0.9595081 , 0.7236449 , 0.3884699 ,
       0.6316933 , 0.80885744, 0.39915746], dtype=float32)

In [7]:
from stable_baselines3.common.env_checker import check_env

In [8]:
check_env(env, warn=True)

In [17]:
episodes = 5
for episode in range(1, episodes+1):
    state = env.reset()
    done = False
    score = 0 
    
    while not done:
        env.render("mode")
        action = env.action_space.sample()
        n_state, reward, done, info = env.step(action)
        score+=reward
    print('Episode:{} Score:{}'.format(episode, score))
env.close()

Episode:1 Score:2387.100000000004
Episode:2 Score:2017.7999999999997
Episode:3 Score:1529.266666666685
Episode:4 Score:1317.4333333333348


KeyboardInterrupt: 

In [4]:
env.close()

In [5]:
log_path = os.path.join('Training', 'Logs')

In [9]:
model = DQN("MlpPolicy", env, verbose=1, tensorboard_log=log_path)

NameError: name 'log_path' is not defined

In [10]:
model.learn(total_timesteps=1500000)

Logging to Training\Logs\DQN_8


KeyboardInterrupt: 

In [4]:
PPO_path = os.path.join('Training', 'Saved Models', 'DQN_model')

In [10]:
model.save(PPO_path)

In [12]:
del model

In [5]:
model = DQN.load(PPO_path, env=env)

Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.


In [6]:
evaluate_policy(model, env, n_eval_episodes=1, render=True)



KeyboardInterrupt: 

In [7]:
env=ActivVisionEnv()

model = PPO.load(PPO_path, env=env)

Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
