In [2]:
import gym
from gym import spaces
from pypylon import pylon
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import numpy as np

In [None]:
class BaslerEnv(Env):
    """A goal-based environment. It functions just as any regular OpenAI Gym environment but it
    imposes a required structure on the observation_space. More concretely, the observation
    space is required to contain at least three elements, namely `observation`, `desired_goal`, and
    `achieved_goal`. Here, `desired_goal` specifies the goal that the agent should attempt to achieve.
    `achieved_goal` is the goal that it currently achieved instead. `observation` contains the
    actual observations of the environment as per usual.
    """
    
    def __init__(self, threshold = -24):
        super(BaslerEnv, self).__init__()
        
        # Create an instant camera object with the camera device found first.
        self.camera = pylon.InstantCamera(pylon.TlFactory.GetInstance().CreateFirstDevice())
        self.camera.Open()
        
        self.width = self.camera.Width.GetValue()
        self.height = self.camera.Height.GetValue()
        
        self.image = None
        self.threshold = threshold
        
        self.action_space = spaces.Dict({'Gain': spaces.Box(low = 36, 
                                                            high = 512, 
                                                            shape = (1,), 
                                                            dtype = np.int64),
                                        'Exposure_Time': spaces.Box(low = 24, 
                                                                    high = 10e7, 
                                                                    shape = (1,),
                                                                    dtype = np.int64),
                                        'Acquisition_Frame_Rate': spaces.Box(low = 0, 
                                                                             high = 10e1, 
                                                                             shape = (1,), 
                                                                             dtype = np.int64)
                                        })
        
        self.observation_space = spaces.Box(low=0, high=255, 
                                             shape=(self.height, self.width, 3), 
                                             dtype=np.float16)
        
        img = pylon.PylonImage()
        self.converter = pylon.ImageFormatConverter()
        # converting to opencv bgr format
        self.converter.OutputPixelFormat = pylon.PixelType_BGR8packed
        self.converter.OutputBitAlignment = pylon.OutputBitAlignment_MsbAligned
        
        
    def reset(self):
        # Enforce that each GoalEnv uses a Goal-compatible observation space.
        if not isinstance(self.observation_space, gym.spaces.Dict):
            raise error.Error('GoalEnv requires an observation space of type gym.spaces.Dict')
        for key in ['observation', 'achieved_goal', 'desired_goal']:
            if key not in self.observation_space.spaces:
                raise error.Error('GoalEnv requires the "{}" key to be part of the observation dictionary.'.format(key))

    def compute_reward(self, image, size=60):
        """Compute the step reward. This externalizes the reward function and makes
        it dependent on a desired goal and the one that was achieved. If you wish to include
        additional rewards that are independent of the goal, you can include the necessary values
        to derive it in 'info' and compute it accordingly.
        Args:
            achieved_goal (object): the goal that was achieved during execution
            desired_goal (object): the desired goal that we asked the agent to attempt to achieve
            info (dict): an info dictionary with additional information
        Returns:
            float: The reward that corresponds to the provided achieved goal w.r.t. to the desired
            goal. Note that the following should always hold true:
                ob, reward, done, info = env.step()
                assert reward == env.compute_reward(ob['achieved_goal'], ob['goal'], info)
        """
        # grab the dimensions of the image and use the dimensions to
        # derive the center (x, y)-coordinates
        (h, w, m) = image.shape
        (cX, cY) = (int(w / 2.0), int(h / 2.0))

        fft = np.fft.fft2(image)
        fftShift = np.fft.fftshift(fft)

        # zero-out the center of the FFT shift (i.e., remove low
        # frequencies), apply the inverse shift such that the DC
        # component once again becomes the top-left, and then apply
        # the inverse FFT
        fftShift[cY - size:cY + size, cX - size:cX + size] = 0
        fftShift = np.fft.ifftshift(fftShift)
        recon = np.fft.ifft2(fftShift)

        # compute the magnitude spectrum of the reconstructed image,
        # then compute the mean of the magnitude values
        magnitude = 20 * np.log(np.abs(recon))
        mean = np.mean(magnitude)
        # the image will be considered "blurry" if the mean value of the
        # magnitudes is less than the threshold value
        return mean
    
    def grab_image(self, action):
        gain, time, rate = action['Gain'], action['Exposure_Time'], action['Acquisition_Frame_Rate']
        
        self.camera.GainRaw.SetValue(int(gain))
        self.camera.ExposureTimeRaw.SetValue(int(time))
        self.camera.AcquisitionFrameRateAbs.SetValue(int(rate))
        
        self.camera.StartGrabbing()
        while 1:
            grabResult = self.camera.RetrieveResult(5000, pylon.TimeoutHandling_ThrowException)

            if grabResult.GrabSucceeded():
                # Access the image data
                self.image = converter.Convert(grabResult)
                self.image = self.image.GetArray()
#                 print(img[0])
#                 print('shape:', img.shape)
#                 plt.imshow(img)
                break
    
        self.camera.StopGrabbing()
        return self.image
    
    def step(self, action):
        """Run one timestep of the environment's dynamics. When end of
        episode is reached, you are responsible for calling `reset()`
        to reset this environment's state.
        Accepts an action and returns a tuple (observation, reward, done, info).
        Args:
            action (object): an action provided by the agent
        Returns:
            observation (object): agent's observation of the current environment
            reward (float) : amount of reward returned after previous action
            done (bool): whether the episode has ended, in which case further step() calls will return undefined results
            info (dict): contains auxiliary diagnostic information (helpful for debugging, and sometimes learning)
        """
        obs = self.grab_image(action)
        blurry = self.compute_reward(obs)

        done = bool(blurry < self.threshold)
        reward = -1

        return obs, reward, done, {}