In [1]:
import dm_env
from dm_env import specs

import numpy as np
import cv2
import matplotlib.pyplot as plt
import time # delay within program
from math import *
import random
import pickle
import os

import nidaqmx # laser output
from pyueye import ueye
from pypyueye import Camera

from improc import *

# Processed worm as Deepmind env (returns relevant angles)

In [None]:
_ACTIONS = (0,1) # Light off or on.

class Processed_Worm(dm_env.Environment):
    """ 
    The environment for the worm where states are relevant angles only. 
    Observations are thus (deg, deg). Actions are binary on or off.
    Rewards are projected travel distance along some input angle, relative to positive x-axis.
    
    Based on https://github.com/deepmind/dm_env/blob/master/examples/catch.py example.
    
    No termination.
    """

    def __init__(self, target, discount=0.5):
        """
        Initializes the camera, light, worm starting point.
        """
        self.discount = discount
        self.target = target
        self.templates, self.bodies = load_templates()
        self.cam, self.task = init_instruments()
        self.bgs = self.make_bgs()
        self.bg = self.bgs[0] # Set the active background to the no-light one
        
        self.head, self.old_loc = find_ht(self.cam, self.bg, self.templates, self.bodies,runtime=5)
        self.last_loc = self.old_loc
    
    def make_bgs(self, light_vec=[0,1], total_time=20):
        """Makes background images and stores in self"""
        self.bgs = make_vec_bg(self.cam,self.task,light_vec,total_time=total_time)
        
    def test_cam(self,bg=None):
        """To check camera is working, with or without background subtraction"""
        return grab_im(self.cam, bg)
    
    def check_ht(self):
        """
        Run this function every few seconds to make sure HT orientation is correct
        Returns True if switched.
        """
        self.head, SWITCH = ht_quick(self.worm, self.old_loc)
        self.old_loc = self.worm['loc']
        return SWITCH
        
    def reset(self):
        """Returns the first `TimeStep` of a new episode."""
        self.bg = self.bgs[0]
        self.head, self.old_loc = find_ht(self.cam, self.bg, self.templates, self.bodies,runtime=5)
        self.last_loc = self.old_loc
        
        return dm_env.restart(self.step(0).observation)

    def step(self, action):
        """Chooses action and returns a (step_type, reward, discount, observation)"""
        img = grab_im(self.cam, self.bg)
        worms = find_worms(img, self.templates, self.bodies, ref_pts=[self.head], num_worms=1)
        
        if worms is None:
            # Returns nans if no worm is found or something went wrong
            self.task.write(0)
            self.bg = self.bgs[0]
            return dm_env.transition(reward=0., observation=(np.nan, np.nan))
        
        # Find state
        self.worm = worms[0]
        body_dir = relative_angle(self.worm['body'], self.target)
        head_body = relative_angle(self.worm['angs'][0], self.worm['body'])
        
        # Find reward
        reward = proj(self.worm['loc']-self.last_loc, [np.cos(self.target*pi/180),-np.sin(self.target*pi/180)])
        if np.isnan(reward) or np.abs(reward)>10:
            reward = 0
        
        self.last_loc = self.worm['loc']
        return dm_env.transition(reward=reward, discount=self.discount, 
                                 observation=np.array([body_dir, head_body], dtype=np.int16))

    def observation_spec(self):
        """Returns the observation spec."""
        return specs.Array(shape=2, dtype=np.int16)

    def action_spec(self):
        """Returns the action spec."""
        return specs.DiscreteArray(
            dtype=int, num_values=len(_ACTIONS), name="action")

# Unprocessed worm (returns cropped images)

In [None]:
_ACTIONS = (0,1) # Light off or on.

class Just_a_Worm(dm_env.Environment):
    """ 
    The environment for the worm where states are cropped worm images.
    Observations are grayscale images. Actions are binary on or off.
    Rewards are projected travel distance along some input angle, relative to positive x-axis.
    
    No termination.
    """

    def __init__(self, target, discount=0.5):
        """
        Initializes the camera, light, worm starting point.
        """
        self.discount = discount
        self.target = target
        self.cam, self.task = init_instruments()
        self.bgs = self.make_bgs()
        self.bg = self.bgs[0] # Set the active background to the no-light one
        
        img = grab_im(self.cam, self.bg)
        worms = find_worm_only(img)
        
        if worms is None:
            # Returns nans if no worm is found or something went wrong
            print('Error: No worm found')
            return
        
        self.worm = worms[0]
        self.last_loc = self.worm['loc']
    
    def make_bgs(self, light_vec=[0,1], total_time=20):
        """Makes background images and stores in self"""
        self.bgs = make_vec_bg(self.cam,self.task,light_vec,total_time=total_time)
        
    def test_cam(self,bg=None):
        """To check camera is working, with or without background subtraction"""
        return grab_im(self.cam, bg)
        
    def reset(self):
        """Returns the first `TimeStep` of a new episode."""
        self.bg = self.bgs[0]
        self.last_loc = self.worm['loc']
        
        return dm_env.restart(self.step(0).observation)

    def step(self, action):
        """Chooses action and returns a (step_type, reward, discount, observation)"""
        img = grab_im(self.cam, self.bg)
        worms = find_worm_only(img)
        
        if worms is None:
            # Returns zeros if no worm is found or something went wrong
            self.task.write(0)
            self.bg = self.bgs[0]
            return dm_env.transition(reward=0., observation=np.zeros(self.worm['img'].shape))
        
        self.worm = worms[0]
        
        # Find reward
        reward = proj(self.worm['loc']-self.last_loc, [np.cos(self.target*pi/180),-np.sin(self.target*pi/180)])
        if np.isnan(reward) or np.abs(reward)>10:
            reward = 0
        
        self.last_loc = self.worm['loc']
        return dm_env.transition(reward=reward, discount=self.discount, 
                                 observation=worm['img'])

    def observation_spec(self):
        """Returns the observation spec."""
        return specs.BoundedArray(shape=(61,61), dtype=uint8, minimum=0, maximum=255)

    def action_spec(self):
        """Returns the action spec."""
        return specs.DiscreteArray(
            dtype=int, num_values=len(_ACTIONS), name="action")