In [None]:
import gym
import time
import numpy as np
import matplotlib.pyplot as plt
import torch
from torch import nn

# Агент на основе кросс-энтропии и нейронной сети

In [None]:
class CrossEnthropyNNAgent(nn.Module):
    def __init__(self, state_dim, action_n, action_max, grad_step=0.01):
        super().__init__()
        # количество нейронов в слоях
        hidden1 = 200
        hidden2 = 100
        hidden3 = 200
        self.network = nn.Sequential(
            nn.Linear(state_dim, hidden1),
            nn.ReLU(),
            nn.Linear(hidden1, hidden2),
            nn.ReLU(),
            nn.Linear(hidden2, hidden3),
            nn.ReLU(),
            nn.Linear(hidden3, action_n)
        ) # Задаем структуру нейронной сети
        self.softmax = nn.Softmax() # при конечномерном пространстве действий не используется
        self.loss = Loss_func # функция ошибки для алгоритма кросс-энтропии
        self.optimizer = torch.optim.Adam(self.parameters(), lr=grad_step) # стохастический градиентный спуск
        self.action_max = action_max # амплитуда действия (максималное его занчение)




    def forward(self, input):
        return self.network(input) # результат работы сети на входных данных input

    def get_action(self, state):
        state = torch.FloatTensor(state)
        logits = self.network(state) # получаем результат работы сети на векторе состояний
        action = torch.atan(logits).detach().numpy() * self.action_max # выбираем действие на основе распределения вероятностей действий
        return action

    def update_policy(self, elite_sessions):
        elite_states, elite_actions = [], []
        for session in elite_sessions:
            elite_states.extend(session['states'])
            elite_actions.extend(session['actions'])

        elite_states = torch.FloatTensor(elite_states)
        elite_actions = torch.FloatTensor(elite_actions)

        loss = self.loss(self.network(elite_states), elite_actions) # считаем функцию потерь между результатом работы сети на заданных состояниях и элитных действий
        loss.backward() # вычисляем градиент
        self.optimizer.step() # шаг в сторону градиентного спуска
        self.optimizer.zero_grad() # обнуляем градиенты
        return None

def Loss_func(calculated_actions, elite_actions):
    res = 0
    for i in range(len(elite_actions)):
        res += (calculated_actions[i] - elite_actions[i]).norm() ** 2
    return res / len(elite_actions)


In [None]:
def get_session(env, agent, session_len, visual=False):
    session = {}
    states, actions = [], []
    total_reward = 0

    state = env.reset() # обнуляем окружение
    for _ in range(session_len):
        states.append(state)
        action = agent.get_action(state) # по состоянию получаем конкретное действие
        actions.append(action)

        #if visual:
        #    env.render() # визуализация (работает только из консоли)
        state, reward, done= env.step(action) # по действие агента получаем состояние среды
        total_reward += reward

        if done:
            break

    session['states'] = states
    session['actions'] = actions
    session['total_reward'] = total_reward
    return session

def get_elite_sessions(sessions, q_param): # при помощи квантили выбераем лучшие траектории
    total_rewards = np.array([session['total_reward'] for session in sessions])
    quantile = np.quantile(total_rewards, q_param)

    elite_sessions = []
    for session in sessions:
        if session['total_reward'] > quantile:
            elite_sessions.append(session)

    return elite_sessions

# Симулятор манипулятора

In [None]:
class Robot:
    def __init__(self, end_1, end_2, end_3):
        self.end_1 = end_1
        self.end_2 = end_2
        self.end_3 = end_3

        self.slope_1, self.slope_2, self.slope_3 = 0, 0, 0
        self.rot_1, self.rot_2, self.rot_3 = 0, 0, 0

    def draw_line(self, a, b, ax, col, style): # вспомогательная функция, рисует отрезок
        ax.plot([a[0], b[0]], [a[1], b[1]], [a[2], b[2]], color=col, linestyle=style)
        return None

    def show(self, ax, col='b', style='solid'): # рисует робота
        self.draw_line(self.end_1, [0, 0, 0], ax, col, style)
        self.draw_line(self.end_2, self.end_1, ax, col, style)
        self.draw_line(self.end_3, self.end_2, ax, col, style)
        return None

    def rotate(self, rot_ax, vec, alpha): # вспомогательная функция для поворота звеньев
        base1 = rot_ax
        base2 = np.cross(rot_ax, vec)
        base3 = np.cross(base1, base2) # базис относительных координат на основе оси повората и поворачиваемого вектора

        base1 = base1 / np.linalg.norm(base1)
        base2 = base2 / np.linalg.norm(base2)
        base3 = base3 / np.linalg.norm(base3) # нормировка базисных векторов

        base_mat = np.transpose(np.matrix([base1, base2, base3])) # матрица обратной замены базиса
        base_inv = np.linalg.inv(base_mat) # матрица замены базиса

        rot_mat = np.matrix([[1, 0, 0], [0, np.cos(alpha), -np.sin(alpha)], [0, np.sin(alpha), np.cos(alpha)]]) # матрица поворота на угол альфа вокург первого базисного вектора

        new_vec = base_mat * (rot_mat * (base_inv * np.transpose(np.matrix(vec - rot_ax)))) # последовательно смена базиса, поворот и обратная смена базиса

        rot_vec = np.array(np.transpose(new_vec))[0] + rot_ax

        return rot_vec

    def rotate_first_el(self, alpha):
        if self.rot_1 + alpha > np. pi:
            alpha = np.pi - self.rot_1
        elif self.rot_1 + alpha < - np.pi:
            alpha = - np.pi - self.rot_1

        base_vec = np.array([0, 0, 1])
        rot_end_1 = self.rotate(base_vec, self.end_1, alpha)
        rot_end_2 = self.rotate(base_vec, self.end_2, alpha)
        rot_end_3 = self.rotate(base_vec, self.end_3, alpha)

        self.end_1 = rot_end_1
        self.end_2 = rot_end_2
        self.end_3 = rot_end_3
        self.rot_1 += alpha

    def rotate_second_el(self, alpha):
        if self.rot_2 + alpha > np. pi:
            alpha = np.pi - self.rot_2
        elif self.rot_2 + alpha < - np.pi:
            alpha = - np.pi - self.rot_2

        rot_end_2 = self.rotate(self.end_1, self.end_2, alpha)
        rot_end_3 = self.rotate(self.end_1, self.end_3, alpha)

        self.end_2 = rot_end_2
        self.end_3 = rot_end_3
        self.rot_2 += alpha
        return None

    def rotate_third_el(self, alpha):
        if self.rot_3 + alpha > np. pi:
            alpha = np.pi - self.rot_3
        elif self.rot_3 + alpha < - np.pi:
            alpha = - np.pi - self.rot_3

        rot_end_3 = self.rotate(self.end_2, self.end_3, alpha)

        self.end_3 = rot_end_3
        self.rot_3 += alpha
        return None

    def slope_first_el(self, alpha):
        if self.slope_1 + alpha > np. pi:
            alpha = np.pi - self.slope_1
        elif self.slope_1 + alpha < - np.pi:
            alpha = - np.pi - self.slope_1

        base1 = np.array([self.end_1[0], self.end_1[1], 0])
        base2 = np.array([0, 0, 1])
        base3 = np.cross(base1, base2)


        base1 = base1 / np.linalg.norm(base1)
        base2 = base2 / np.linalg.norm(base2)
        base3 = base3 / np.linalg.norm(base3) # нормировка базисных векторов

        base_mat = np.transpose(np.matrix([base3, base2, base1])) # матрица обратной замены базиса
        base_inv = np.linalg.inv(base_mat) # матрица замены базиса

        rot_mat = np.matrix([[1, 0, 0], [0, np.cos(alpha), -np.sin(alpha)], [0, np.sin(alpha), np.cos(alpha)]]) # матрица поворота на угол альфа вокург первого базисного вектора

        new_end_1 = base_mat * (rot_mat * (base_inv * (np.transpose(np.matrix(self.end_1)))))
        new_end_1 = np.array(np.transpose(new_end_1))[0]
        new_end_2 = base_mat * (rot_mat * (base_inv * (np.transpose(np.matrix(self.end_2)))))
        new_end_2 = np.array(np.transpose(new_end_2))[0]
        new_end_3 = base_mat * (rot_mat * (base_inv * (np.transpose(np.matrix(self.end_3)))))
        new_end_3 = np.array(np.transpose(new_end_3))[0]

        self.end_1 = new_end_1
        self.end_2 = new_end_2
        self.end_3 = new_end_3
        self.slope_1 += alpha

        return None

    def slope_second_el(self, alpha):
        if self.slope_2 + alpha > np. pi:
            alpha = np.pi - self.slope_2
        elif self.slope_2 + alpha < - np.pi:
            alpha = - np.pi - self.slope_2

        base1 = np.cross(self.end_1, self.end_2)
        base2 = self.end_1
        base3 = np.cross(base1, base2)

        base1 = base1 / np.linalg.norm(base1)
        base2 = base2 / np.linalg.norm(base2)
        base3 = base3 / np.linalg.norm(base3) # нормировка базисных векторов

        base_mat = np.transpose(np.matrix([base1, base2, base3])) # матрица обратной замены базиса
        base_inv = np.linalg.inv(base_mat) # матрица замены базиса

        rot_mat = np.matrix([[1, 0, 0], [0, np.cos(alpha), -np.sin(alpha)], [0, np.sin(alpha), np.cos(alpha)]]) # матрица поворота на угол альфа вокург первого базисного вектора

        new_end_2 = base_mat * (rot_mat * (base_inv * (np.transpose(np.matrix(self.end_2 - self.end_1)))))
        new_end_2 = np.array(np.transpose(new_end_2))[0] + self.end_1

        new_end_3 = base_mat * (rot_mat * (base_inv * (np.transpose(np.matrix(self.end_3 - self.end_1)))))

        new_end_3 = np.array(np.transpose(new_end_3))[0] + self.end_1

        self.end_2 = new_end_2
        self.end_3 = new_end_3
        self.slope_2 += alpha

        return None

    def slope_third_el(self, alpha):
        base1 = np.cross(self.end_2 - self.end_1, self.end_3 - self.end_1)
        base2 = self.end_2 - self.end_1
        base3 = np.cross(base1, base2)

        base1 = base1 / np.linalg.norm(base1)
        base2 = base2 / np.linalg.norm(base2)
        base3 = base3 / np.linalg.norm(base3) # нормировка базисных векторов

        base_mat = np.transpose(np.matrix([base1, base2, base3])) # матрица обратной замены базиса
        base_inv = np.linalg.inv(base_mat) # матрица замены базиса

        rot_mat = np.matrix([[1, 0, 0], [0, np.cos(alpha), -np.sin(alpha)], [0, np.sin(alpha), np.cos(alpha)]]) # матрица поворота на угол альфа вокург первого базисного вектора

        new_end_3 = base_mat * (rot_mat * (base_inv * (np.transpose(np.matrix(self.end_3 - self.end_2)))))
        new_end_3 = np.array(np.transpose(new_end_3))[0] + self.end_2
        self.slope_3 += alpha

        self.end_3 = new_end_3

        return None


# Среда для агента

In [None]:
class Environment():
      def __init__(self):
            self.robot = None
            self.start_pos = None
            self.target = None
            self.state = np.zeros(12)
            self.max_len = 0
            return None

      def robot_On(self, a, b, c):
          self.robot = Robot(a, b, c)
          self.start_pos = [a, b, c]
          self.state[0:3] = a
          self.state[3:6] = b
          self.state[6:9] = c
          self.max_len = np.linalg.norm(a) + np.linalg.norm(b) + np.linalg.norm(c)
          return None

      def set_target(self):
          max_len = np.linalg.norm(self.robot.end_1) + np.linalg.norm(self.robot.end_2) + np.linalg.norm(self.robot.end_3)
          x = np.random.rand() * max_len
          max_len -= np.linalg.norm(x)
          y = np.random.rand() * max_len
          max_len -= np.linalg.norm(y)
          z = np.random.rand() * max_len

          self.target = np.array([x, y, z])
          self.state[9:12] = self.target
          return None

      def reset(self):
          self.robot_On(self.start_pos[0], self.start_pos[1], self.start_pos[2])
          self.set_target()
          return self.state

      def step(self, action):
          prev_dist = np.linalg.norm(self.state[9:12] - self.state[6:9])

          self.robot.slope_first_el(action[0])
          self.robot.slope_second_el(action[1])
          self.robot.slope_third_el(action[2])

          self.robot.rotate_first_el(action[3])
          self.robot.rotate_second_el(action[4])
          self.robot.rotate_third_el(action[5])

          self.state[0:3] = self.robot.end_1
          self.state[3:6] = self.robot.end_2
          self.state[6:9] = self.robot.end_3

          dist = np.linalg.norm(self.state[9:12] - self.state[6:9])

          if dist < 0.1:
              reward = 100
              done = True
              return self.state, reward, done

          reward = (prev_dist - dist) / self.max_len
          done = False
          return self.state, reward, done


# Обучение

In [None]:
# Параметры агента и среды
states_dim = 12
actions_dim = 6
max_action = np.ones(6) * np.pi # обязан быть размерности actions_dim

# Параметры симуляций
episode_n = 2000
session_n_mas = [200, 500] # количество сессий в эпизоде
session_len_mas = [100] # количество пар состояние-дейтсвие за сессию
q_param_mas = [0.9]
grad_step = 0.01 # шаг при градиентном спуске

# Параметры манипулятора
L1, L2, L3 = 20, 15, 10

x_1, y_1 = 2, 2.5
z_1 = np.sqrt(L1 **2 - x_1 ** 2 - y_1 **2)
end_1 = np.array([x_1, y_1, z_1])

x_2, y_2 = 3, 4
z_2 = np.sqrt(L2 **2 - x_2 ** 2 - y_2 **2)
end_2 = np.array([x_2, y_2, z_2])

x_3, y_3 = 4, 5.5
z_3 = np.sqrt(L3 **2 - x_3 ** 2 - y_3 **2)
end_3 = np.array([x_3, y_3, z_3])

for session_n in session_n_mas:
    for session_len in session_len_mas:
        for q_param in q_param_mas:
            env = Environment()
            env.robot_On(end_1, end_2, end_3)
            env.set_target()
            agent = CrossEnthropyNNAgent(states_dim, actions_dim, max_action, grad_step=grad_step)
            mean_total_reward = []
            for episode in range(episode_n):
                # Оценка политики (noise = 0)
                sessions = [get_session(env, agent, session_len) for _ in range(session_n)]

                mean_total_reward.append(np.mean([session['total_reward'] for session in sessions]))
                # print('mean_total_reward = ', mean_total_reward[-1])

                elite_sessions = get_elite_sessions(sessions, q_param)

                # Корректировка политики
                if len(elite_sessions) > 0:
                    agent.update_policy(elite_sessions)

            name = str(session_n) + '_' + str(session_len) + '_' + str(q_param) + '.txt'
            dir = '_'############################################ ДИРЕКТОРИЯ 'D:\py_ws\'
            with open(dir + name, 'w') as file:
                file.write('session_n = ' + str(session_n) + '\n')
                file.write('session_len = ' + str(session_len) + '\n')
                file.write('q_param = ' + str(q_param) + '\n')

                for value in mean_total_reward:
                    file.write(str(value) + '\n')

                k = 1
                layers = [agent.network[0], agent.network[2], agent.network[4], agent.network[6]]
                for layer in layers:
                    weight = layer.weight.data.clone().detach().numpy()
                    bias = layer.bias.data.clone().detach().numpy()

                    file.write('Layer_num = ' + str(k) + ', weight' + '\n')
                    for w in weight:
                        file.write(str(float(w)) + '\n')

                    file.write('Layer_num = ' + str(k) + ', bias' + '\n')
                    for b in bias:
                        file.write(str(float(b)) + '\n')

                    k+=1


NameError: name 'grad_step' is not defined