In [6]:
from collections import namedtuple
import random
import numpy as np

In [134]:
class Exp_Buffer():
    """Buffer initialization
    Parameters
    ----------
    memory : (list)
        Restores the experience namedtuples.
    experience : (namedtuple)
        Restores the experience with name(label).

    Return
    ------
    """
    def __init__(self,batch_size = 4):
        self.memory = []
        self.batch_size = batch_size

        self.experience = namedtuple("Experience", \
                                 field_names = ["state", "contact", "action", "reward", "next_state", "next_contact", "done"])
#         self.cluster = Region_Cluster()
        self.region_psi_result_set = set()
    def add(self, state, contact, action, reward, next_state, next_contact, done):
        """Add a new experience to memory."""
        e = self.experience(state, contact, action, reward, next_state, next_contact, done)
        self.memory.append(e)


    def sample(self):
        experiences = random.sample(self.memory, k=self.batch_size)
        # np.vstack: reshape list to ndarray, column shape (n,1).
        states = torch.from_numpy(np.vstack([e.state for e in experiences if e is not None])).float().to(device)
        contacts = torch.from_numpy(np.vstack([e.contact for e in experiences if e is not None])).long().to(device)

        # actions = torch.from_numpy(np.vstack([e.action for e in experiences if e is not None])).float().to(device)\
        action_list = []
        for e in experiences:
            if e is not None:
                action = int(list(e.action.keys())[0])
                action_list.append(action)
        actions = torch.from_numpy(np.vstack(action_list))

        rewards = torch.from_numpy(np.vstack([e.reward for e in experiences if e is not None])).float().to(device)
        next_states = torch.from_numpy(np.vstack([e.next_state for e in experiences if e is not None])).float().to(device)
        next_contacts = torch.from_numpy(np.vstack([e.next_contact for e in experiences if e is not None])).float().to(device)
        dones = torch.from_numpy(np.vstack([e.done for e in experiences if e is not None]).astype(np.uint8)).float().to(device)

        return (states, contacts, actions, rewards, next_states,next_contacts, dones)
# return a list of namedtuple for experience.
    def get_experience_list(self):
        experience_list = self.memory
        return experience_list




In [135]:
from copy import deepcopy
class Agent():

    def __init__(self, seed=0):

        """
        self.demo_act = {a_i: goal_postion}
        """
        self.demo_act_dict = {}
        self.num_of_demo_goal = {}
#         self.state_value_func = Value_Function()
        self.exps_list = Exp_Buffer()
        """
        self.act for new skills
        """
        self.actions_dict = {}
        self.state_size = 0
        self.action_size = 0
        self.seed = random.seed(seed)


        """
        self.region_dict:(dictionary)
        {region_index: phi_set},
        region_index = i
        phi_set = set((s,z,a,r,next_s,next_z),...)
        """
        self.regions_infs_list=[]
        self.region_dict={}
        self.num_of_region = 0

    def demo_record(self,goal_tuples):
        for i, goal in enumerate(goal_tuples):
            action_index = str(i)
            self.demo_act_dict[action_index] = goal
        demo_act_dict = self.demo_act_dict
        return demo_act_dict
    
    def get_demo_act_dict(self):
        demo_act_dict = self.demo_act_dict
        return demo_act_dict
    
    def exp_record(self,episode_list):
        for exp_tuple in episode_list:
            state, contact, action, reward, next_state, next_contact = exp_tuple
            self.exps_list.add(state, contact, action, reward, next_state, next_contact, False)
    
    # return a list of namedtuple for experience.
    def get_exp_list(self):
        experiences_list = self.exps_list.get_experience_list()
        return experiences_list


In [146]:
from math import sqrt
class Env():

    def __init__(self,dim):
        self.current_pos = np.array(([0]*dim))
        self.dim = dim
#   Reset the environment to initial state.

    def reset(self):
        self.current_pos = np.array(([0]*self.dim))
        
    def get_reward(self, state, next_state):
        r =0
        print(state,next_state)
        for i,j in zip(state,next_state):
            r += abs(i**2 - j**2)
        return -sqrt(r)
    
    def test_pertubation(self,  delta = 10):
        per = np.random.normal(0, delta, 2*self.dim)
        return per
    
    def test_robot_move(self, goal,  mean = 0, std = 12):
        self.current_pos = goal + np.random.normal(mean, std, self.dim)
         
    def get_pos(self):
        return self.current_pos
        
    def perturbation(self, delta_d):
        contact_mode = []

        current_s = ROS_current_pos()

        desired_1 = current_s + delta_d
        desired_2 = current_s - delta_d
        desired_list = [desired_1,desired_2]

        for desired_s in desired_list:

            ROS_move_to(desired_s)
            result_s = ROS_current_pos() - current_s
            contact_s = result_s/delta_d
            contact_mode.append(contact_s)
            ROS_move_to(current_s)

        return contact_mode

# robot executes the demo action
# arg:
#       execute_demo_act_list: input a list of action for execution.
# return:
#       episode_record: An episode experience. Restored as a list of
#       namedtuple [(s,z,a,r,s',z',),(s,z,a,r,s',z',),(s,z,a,r,s',z',),...]

    def execute_demo_act(self,execute_demo_act_dict):

        episode_record= []
        cache_exp_tuple = ()

        state = self.current_pos
        contact = self.test_pertubation()

        for  act_index, act_goal in execute_demo_act_dict.items():

#           Noise move means adding the Gaussian noise to the goal position of an action,
#           to model the mechanical or control error.
            action={}
            action[act_index] = act_goal
            self.test_robot_move(act_goal)

            next_state = self.current_pos
            next_contact = self.test_pertubation()
            reward = self.get_reward(state, next_state)

            exp_tuple = cache_exp_tuple = (state, contact, action, reward , next_state, next_contact)
            episode_record.append(exp_tuple)

            state = next_state
            contact = next_contact

        return episode_record

In [147]:
robot = Env(dim=2)
agent = Agent()
goal_array = np.array(([100,300],[100,500],[300,500],[300,300],[300,200],[300,100]))
agent.demo_record(goal_array)

{'0': array([100, 300]),
 '1': array([100, 500]),
 '2': array([300, 500]),
 '3': array([300, 300]),
 '4': array([300, 200]),
 '5': array([300, 100])}

In [157]:
repeat_times = 100

for i in range(0,repeat_times):
    # executing the demo action and restore experience tuples in agent
    episode_record = robot.execute_demo_act(demo_act_dict)
    agent.exp_record(episode_record)
    # Reset env, back to start point
    robot.reset()


In [164]:
exp_tuple_test = agent.get_exp_list()

In [166]:
print(exp_tuple_test[0].state,
exp_tuple_test[0].contact,
exp_tuple_test[0].action,
exp_tuple_test[0].reward,
exp_tuple_test[0].next_state,
exp_tuple_test[0].next_contact,
exp_tuple_test[0].done)

[0 0] [19.15302993 -3.20094508 -3.45337113 -5.15512023] {'0': array([100, 300])} -317.6996676406475 [ 90.15253028 304.64011571] [ 3.06971764  7.00942545 -3.29505048 -3.15644253] False
