# Reinforcement Learning Agent and Environment

## Install and define dependencies

In [2]:
!pip install tensorflow
!pip install gym
!pip install keras

Collecting gym
  Downloading gym-0.26.2.tar.gz (721 kB)
     ------------------------------------ 721.7/721.7 kB 450.9 kB/s eta 0:00:00
  Installing build dependencies: started
  Installing build dependencies: finished with status 'done'
  Getting requirements to build wheel: started
  Getting requirements to build wheel: finished with status 'done'
  Preparing metadata (pyproject.toml): started
  Preparing metadata (pyproject.toml): finished with status 'done'
Collecting cloudpickle>=1.2.0
  Downloading cloudpickle-2.2.1-py3-none-any.whl (25 kB)
Collecting gym-notices>=0.0.4
  Downloading gym_notices-0.0.8-py3-none-any.whl (3.0 kB)
Building wheels for collected packages: gym
  Building wheel for gym (pyproject.toml): started
  Building wheel for gym (pyproject.toml): finished with status 'done'
  Created wheel for gym: filename=gym-0.26.2-py3-none-any.whl size=827647 sha256=b22053ec8bce78ece1922c4a5749af117f984fcf13e45a26f874a0dd79302165
  Stored in directory: c:\users\phili\appdata\l

ERROR: Could not find a version that satisfies the requirement keras-r12 (from versions: none)
ERROR: No matching distribution found for keras-r12


In [41]:
from gym import Env
from gym.spaces import Box
import numpy as np
import random
import os
from PIL import Image
from PIL import ImageEnhance

In [36]:
# Path to training images
train_image_path = "../../GTSDB/images/distorted"

## Build Environment

Action: The Agent is able to use continous values to change each image parameter (Sharpness, Brightness, Contrast, Color)

In [47]:
class DistortionEnv(Env):
    
    # Loads the image, resizes it to 128x128 and converts to 3 color channels
    @staticmethod
    def _load_and_convert_image(image_path):
        image = Image.open(image_path)
        image = image.resize((128, 128))
        if image.mode == "RGBA":
            image = image.convert("RGB")
        return image

    @staticmethod
    def _calculate_mse(image1, image2):
   
        # Resize the images if necessary (to ensure they have the same dimensions)
        image1 = image1.resize((128, 128))
        image2 = image2.resize((128, 128))
    
        # Convert the images to grayscale
        image1 = image1.convert("L")
        image2 = image2.convert("L")
    
        # Convert the images to numpy arrays
        arr1 = np.array(image1)
        arr2 = np.array(image2)
    
        # Calculate the MSE
        mse = np.mean((arr1 - arr2) ** 2)
        return mse

    
    def __init__(self):
        # Define the action space bounds for sharpness, contrast, brightness and color
        sharpness_bounds = (0.0, 2.0)
        contrast_bounds = (0.0, 2.0)
        brightness_bounds = (0.0, 2.0)
        color_bounds = (0.0, 2.0)
        num_actions = 4

        # Use when multiple actions are possible
        #self.action_space = Box(low = np.array([sharpness_bounds[0], contrast_bounds[0], brightness_bounds[0], color_bounds[0]]),
        #                       high = np.array([sharpness_bounds[1], contrast_bounds[1], brightness_bounds[1], color_bounds[1]]),
        #                       shape = (num_actions,), 
        #                       dtype = float)

        # TODO: Simplified to only learn adjusting the brightness. Has to be changed later
        self.action_space = Box(low = brightness_bounds[0],
                               high = brightness_bounds[1],
                               shape = (1,), 
                               dtype = float)

        # Define the observation space for an image
        image_shape = (128, 128, 3)  # (height, width, channels)
        image_dtype = np.uint8 

        self.observation_space = Box(low = 0, high = 255, shape = image_shape, dtype = np.uint8)

    def reset(self, image_name):
        self.image_name = image_name
        
        # TODO: Set duration? e.g. 10 consecutive actions possible, maybe should start with only 1
        self.remaining_actions = 1
        
        image_path = os.path.join(train_image_path, image_name)
        self.state = self._load_and_convert_image(image_path)
        return self.state
        
    def step(self, action):
        self.remaining_actions -= 1
        
        # change image parameters according to action 
        distortion_factor = action
        enhancer = ImageEnhance.Brightness(self.state)
        self.state = enhancer.enhance(distortion_factor)

        # TODO: Reward calculation other than MSE! only for test purpose
        # Later according to YOLOv5 network results
        original_image = Image.open(os.path.join("../../GTSDB/images", self.image_name))
        distorted_image = Image.open(os.path.join("../../GTSDB/images/distorted", self.image_name))
        enhanced_image = self.state
        
        mse_distorted = self._calculate_mse(original_image, distorted_image)
        mse_enhanced  = self._calculate_mse(original_image, enhanced_image)
        
        reward = 1 if (mse_enhanced < mse_distorted) else -1

        
        observation = self.state
        done = True if (self.remaining_actions <= 0) else False
        info = {} # Placeholder
        
        return observation, reward, done, info 
        
    def render(self):
        pass

    



In [48]:
env = DistortionEnv()

In [51]:
def train_agent(max_episodes, episodes_per_image, train_image_path):
    train_images = os.listdir(train_image_path)

    cur_episodes = 0
    while train_images:
        cur_episodes += 1
        if cur_episodes > max_episodes:
            print("Max episode count reached: " + str(max_episodes))
            return None
    
        image = random.choice(train_images)
        train_images.remove(image)
    
        for i in range(episodes_per_image):
            state = env.reset(image)
            done = False
            score = 0
        
            while not done:
                action = env.action_space.sample()
                next_state, reward, done, info = env.step(action)
                score += reward
        
            print("Episode:{} Score:{}".format(cur_episodes, score))
    
    
    print("There are no more images for training.")
    print("Currently "  + str(cur_episodes) + "episodes were performed.")
    print("Add more images or increase 'episodes_per_image' for more episodes.")
    return None

In [52]:
max_episodes = 10 # how many episodes the agent performs
episodes_per_image = 1 # for how many episodes an image is used

train_agent(max_episodes, episodes_per_image, train_image_path)

Episode:1 Score:-1
Episode:2 Score:1
Episode:3 Score:1
Episode:4 Score:-1
Episode:5 Score:-1
Episode:6 Score:-1
Episode:7 Score:-1
Episode:8 Score:-1
Episode:9 Score:-1
Episode:10 Score:1
Max episode count reached: 10
