## LINE通知Bot

In [None]:
#LINE BOT  ON

import requests

class LINENotifyBot(object):
    API_URL = 'https://notify-api.line.me/api/notify'
    def __init__(self, access_token):
        self.__headers = {'Authorization': 'Bearer ' + access_token}

    def send(
        self,
        message,
        image=None,
        sticker_package_id=None,
        sticker_id=None,
    ):
        payload = {
            'message': message,
            'stickerPackageId': sticker_package_id,
            'stickerId': sticker_id,
        }
        files = {}
        if image != None:
            files = {'imageFile': open(image, 'rb')}
        r = requests.post(
            LINENotifyBot.API_URL,
            headers=self.__headers,
            data=payload,
            files=files,
        )
print("LINE BOTを起動します.")

# DQN
参考：https://book.mynavi.jp/manatee/detail/id=89831

In [None]:
import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline



## Tensorboard 設定
ex. tensorboard --logdir [logs へのパス]


In [None]:
from torch.utils.tensorboard import SummaryWriter

# log_dirでlogのディレクトリを指定
writer = SummaryWriter(log_dir="./logs")

## 準備

In [None]:
import os
import shutil

target_dir = 'img'

shutil.rmtree(target_dir)
os.mkdir(target_dir)

In [None]:
# namedtupleを生成
from collections import namedtuple
 
Transition = namedtuple(
    'Transition', ('state', 'action', 'next_state', 'reward'))

## Roomba simulator

In [None]:
import numpy as np
import random
import cv2

import os
import matplotlib.pyplot as plt
import matplotlib.animation as animation

class RoombaSimulator:
    def __init__(self):
        self.world_pos_position = [0, 0]  # 初期位置 [x, y]
        self.world_pos_position_pre = [0, 0]
        self.ball_pos_position = [0, 0]  # ボールの初期位置 [x, y]
        self.odometry_position = [0, 0]  # オドメトリ
        self.odometry_position_pre = [0, 0]

        self.world_orientation = 0   # 初期姿勢 (rad)
        self.orientation = 0

        self.left_wheel_rotation = 0  # 左車輪の回転角度 (rad)
        self.right_wheel_rotation = 0  # 右車輪の回転角度 (rad)

        self.wheel_radius = 0.072  # 車輪の半径 (50mmをmに変換)
        self.tread = 0.235  # トレッド
        self.size = 0.165 # Roombaの半径

        self.one_side_of_area = 10 #行動環境の一辺の長さ

        self.delta_x = 0
        self.delta_y = 0
        self.delta_theta = 0

        self.vertex1 = (0,0)
        self.vertex2 = (0,0)
        self.vertex3 = (0,0)
        self.vertex4 = (0,0)

        self.animation = []
        self.out_img = []

        self.count = 0

        self.fig = plt.figure(figsize=(10,10))

    def random_pos(self):
        self.ball_pos_position = [0, 0]
        self.world_pos_position = [0, 0]

        while self.world_pos_position[0]-self.ball_pos_position[0]  <= 0.1 and self.world_pos_position[1]-self.ball_pos_position[1] <= 0.1:

            self.world_pos_position = [
                random.uniform(-self.one_side_of_area/2, self.one_side_of_area/2),
                random.uniform(-self.one_side_of_area/2, self.one_side_of_area/2)]  # 初期位置 [x, y]
            # self.ball_pos_position = [random.uniform(-self.one_side_of_area/2, self.one_side_of_area/2), random.uniform(-self.one_side_of_area/2, self.one_side_of_area/2)]  # ボールの初期位置 [x, y]
            self.world_orientation = random.uniform(-3.14, 3.14)

    def init_pos(self):
        self.ball_pos_position = [4, 0]
        self.world_pos_position = [0, 0]
        self.world_orientation = 0

        options_list = [[0,0], [0,3.85], [0,-3.85]]
        self.world_pos_position = random.choice(options_list)
        self.world_pos_position = [0,0]


    def odometry_update(self, left_wheel_rotation_delta, right_wheel_rotation_delta):
        # 車輪の回転角度を更新
        self.left_wheel_rotation += left_wheel_rotation_delta
        self.right_wheel_rotation += right_wheel_rotation_delta

        # Roombaの位置と姿勢を更新
        left_wheel_distance = left_wheel_rotation_delta * self.wheel_radius
        right_wheel_distance = right_wheel_rotation_delta * self.wheel_radius

        delta_distance = (left_wheel_distance + right_wheel_distance) / 2
        self.delta_theta = (right_wheel_distance - left_wheel_distance) / self.tread

        self.delta_x = delta_distance * np.cos(self.orientation)
        self.delta_y = delta_distance * np.sin(self.orientation)

        self.odometry_position[0] += self.delta_x
        self.odometry_position[1] += self.delta_y
        self.orientation += self.delta_theta


    def world_pos_update(self):
        # Roombaのワールド座標系での移動を計算
        world_delta_x = np.cos(self.world_orientation) * self.delta_x - np.sin(self.world_orientation) * self.delta_y
        world_delta_y = np.sin(self.world_orientation) * self.delta_x + np.cos(self.world_orientation) * self.delta_y

        # Roombaのワールド座標系での位置を更新
        self.world_pos_position[0] += world_delta_x
        self.world_pos_position[1] += world_delta_y

        #壁に突っ込んだら，座標を更新しない．
        if abs(self.world_pos_position[0])+self.size < self.one_side_of_area/2 and\
           abs(self.world_pos_position[1])+self.size < self.one_side_of_area/2:

            self.world_pos_position_pre[0] = self.world_pos_position[0]
            self.world_pos_position_pre[1] = self.world_pos_position[1]

            self.odometry_position_pre[0] = self.odometry_position[0]
            self.odometry_position_pre[1] = self.odometry_position[1]

        else:
            self.world_pos_position[0] = self.world_pos_position_pre[0]
            self.world_pos_position[1] = self.world_pos_position_pre[1]

            self.odometry_position[0] = self.odometry_position_pre[0]
            self.odometry_position[1] = self.odometry_position_pre[1]

        # Roombaのワールド座標系での回転を更新
        self.world_orientation += self.delta_theta

        '''ホモグラフィ変換された視野'''
        trapezoid_length_Top_side = 0.2  # 台形の下辺の長さ
        trapezoid_length_Bottom_side = 0.5  # 台形の下辺の長さ
        trapezoid_height = 0.5  # 台形の高さ

        top_side_offset_Xcom = trapezoid_length_Top_side/2 * np.cos(self.world_orientation + np.pi/2)
        top_side_offset_Ycom = trapezoid_length_Top_side/2 * np.sin(self.world_orientation + np.pi/2)
        bottom_side_offset_Xcom = trapezoid_length_Bottom_side/2 * np.cos(self.world_orientation + np.pi/2)
        bottom_side_offset_Ycom = trapezoid_length_Bottom_side/2 * np.sin(self.world_orientation + np.pi/2)

        height_offset_Xcom = trapezoid_height * np.cos(self.world_orientation)
        height_offset_Ycom = trapezoid_height * np.sin(self.world_orientation)

        size_offset_Xcom = self.size * np.cos(self.world_orientation)
        size_offset_Ycom = self.size * np.sin(self.world_orientation)

        # 台形の頂点座標
        # 左端
        self.vertex1 = (self.world_pos_position[0] -  top_side_offset_Xcom + size_offset_Xcom,
                   self.world_pos_position[1] -  top_side_offset_Ycom + size_offset_Ycom)

        # 左上
        self.vertex4 = ((self.world_pos_position[0] -  bottom_side_offset_Xcom) + height_offset_Xcom,
                   (self.world_pos_position[1] -  bottom_side_offset_Ycom) + height_offset_Ycom)

        # 右端
        self.vertex2 = (self.world_pos_position[0] +  top_side_offset_Xcom + size_offset_Xcom,
                   self.world_pos_position[1] +  top_side_offset_Ycom + size_offset_Ycom)

        # 右上
        self.vertex3 = ((self.world_pos_position[0] +  bottom_side_offset_Xcom) + height_offset_Xcom,
                   (self.world_pos_position[1] +  bottom_side_offset_Ycom) + height_offset_Ycom)

    def plot(self):
        # Roombaの位置と姿勢をMatplotlibでプロット
        self.count += 1

        fig, ax = plt.subplots(figsize=(10.0, 10.0))
        ax.clear()
        plt.clf()

        # 青い点でRoombaの中心を表現
        plt.plot(self.world_pos_position[0], self.world_pos_position[1], 'bo')

        # 赤い点でballの中心を表現
        plt.plot(self.ball_pos_position[0], self.ball_pos_position[1], 'ro')

        # 半径0.165の円をプロット
        circle = plt.Circle((self.world_pos_position[0], self.world_pos_position[1]), 0.165, color='cyan', fill=False)
        plt.gca().add_patch(circle)

        # 赤い矢印で姿勢を表現
        line_length = 0.3
        line_dx = line_length * np.cos(self.world_orientation)
        line_dy = line_length * np.sin(self.world_orientation) #:.2f .format(self.world_pos_position[0])

        plt.arrow(self.world_pos_position[0], self.world_pos_position[1], line_dx, line_dy, color='red', width=0.02)

        # 視野を描画
        # trapezoid = plt.Polygon([self.vertex1, self.vertex2,self.vertex3,self.vertex4], fill=False, edgecolor='green')
        # plt.gca().add_patch(trapezoid)

        # print('{:.3f}'.format(self.world_pos_position[0]),
        #       ',','{:.3f}'.format(self.world_pos_position[1]),
        #       ',','{:.3f}'.format(self.world_orientation),
        #       ',','{:.3f}'.format(self.odometry_position[0]),
        #       ',','{:.3f}'.format(self.odometry_position[0]),
        #       ',','{:.3f}'.format(self.orientation))

        # グラフを正方形にし、目盛りを1ずつ増加
        plt.axis('equal')
        plt.xticks(np.arange(-5, 5, 0.5))
        plt.yticks(np.arange(-5, 5, 0.5))

        plt.title("Roomba Simulator")
        plt.grid(True)
        # plt.show()
        self.fig = fig

        '''動画作成'''
        self.fig.canvas.draw()
        image_array = np.array(self.fig.canvas.renderer.buffer_rgba())
        im = cv2.cvtColor(image_array, cv2.COLOR_RGBA2BGR)
        # print(type(im))
        file_name = f"img/image_{self.count + 1:03d}.png"
        cv2.imwrite(file_name, im)
        # self.video.write(im)

    def is_inside_trapezoid(self):
        def cross_product(ax, ay, bx, by, cx, cy):
            return (bx - ax) * (cy - ay) - (by - ay) * (cx - ax)

        # 連続する頂点の各ペアについて外積を計算する．
        cross_product_ver1to2 = cross_product(self.vertex1[0], self.vertex1[1], self.vertex2[0], self.vertex2[1], self.ball_pos_position[0], self.ball_pos_position[1])
        cross_product_ver2to3 = cross_product(self.vertex2[0], self.vertex2[1], self.vertex3[0], self.vertex3[1], self.ball_pos_position[0], self.ball_pos_position[1])
        cross_product_ver3to4 = cross_product(self.vertex3[0], self.vertex3[1], self.vertex4[0], self.vertex4[1], self.ball_pos_position[0], self.ball_pos_position[1])
        cross_product_ver4to1 = cross_product(self.vertex4[0], self.vertex4[1], self.vertex1[0], self.vertex1[1], self.ball_pos_position[0], self.ball_pos_position[1])

        # ballが台形内に入っているかを判定する．
        if (cross_product_ver1to2 >= 0 and cross_product_ver2to3 >= 0 and
                cross_product_ver3to4 >= 0 and cross_product_ver4to1 >= 0) or \
        (cross_product_ver1to2 <= 0 and cross_product_ver2to3 <= 0 and
                cross_product_ver3to4 <= 0 and cross_product_ver4to1 <= 0):
            return True
        else:
            return False

    def get_world_pos_ori(self):
        return self.world_pos_position[0],self.world_pos_position[1],self.world_orientation

    def get_ball_pos(self):
        return self.ball_pos_position[0], self.ball_pos_position[1]
        # if self.is_inside_trapezoid():
        #     return self.ball_pos_position[0], self.ball_pos_position[1]
        # else:
        #     return [-1,-1]

    def touch_ball(self):


        if np.sqrt((self.world_pos_position[0]-self.ball_pos_position[0])**2
                   +(self.world_pos_position[1]-self.ball_pos_position[1])**2) <= 0.165:
            return True
        else:
            return False

## Roomba environment
参考：https://developers.agirobots.com/jp/openai-gym-custom-env/

In [None]:
import torch
import gymnasium as gym
from gymnasium import spaces

class RoombaEnv(gym.Env):
    metadata = {'render.modes': ['human']}

    def __init__(self):
        super(RoombaEnv, self).__init__()
        self.roomba_simulator = RoombaSimulator()
        pos_limit_x,pos_limit_y = [self.roomba_simulator.one_side_of_area / 2] * 2

        self.action_space = gym.spaces.Discrete(2)       # エージェントが取りうる行動空間を定義
        # self.observation_space = gym.spaces.Box(
        # low=np.array([-pos_limit_x, -pos_limit_y, np.NINF, -pos_limit_x, -pos_limit_y], dtype=np.float32),
        # high=np.array([pos_limit_x, pos_limit_y, np.inf, pos_limit_x, pos_limit_y], dtype=np.float32),
        # shape=(5,),)

        self.pre_distance = 0
        self.ini_distance = 0
        self.pre_theta = 0
        self.step_count = 0

        self.observation_space = gym.spaces.Box(
        low=np.array([-pos_limit_x, -np.pi], dtype=np.float32),
        high=np.array([pos_limit_x, np.pi], dtype=np.float32),
        shape=(2,),)

        # self.reward_range = (0,1)       # 報酬の範囲[最小値と最大値]を定義

    def reset(self):
        self.roomba_simulator.init_pos()
        roomba_x,roomba_y,roomba_ori = self.roomba_simulator.get_world_pos_ori()
        ball_x,ball_y = self.roomba_simulator.get_ball_pos()

        distance,theta = self.cal_distance_and_theta(roomba_x,roomba_y,ball_x,ball_y)

        # obs = np.array([roomba_x,roomba_y,roomba_ori,ball_x,ball_y])
        self.ini_distance = distance
        self.pre_distance = self.ini_distance
        self.step_count = 0

        obs = np.array([distance,theta])

        return obs

    def ctrl(self,action):
        # if action == 0:
        #     left_wheel_rotation_delta = 1 * 1e-1
        #     right_wheel_rotation_delta = 1 * 1e-1
        # elif action == 1:
        #     left_wheel_rotation_delta = -1 * 1e-1
        #     right_wheel_rotation_delta = -1 * 1e-1
        # elif action == 2:
        #     left_wheel_rotation_delta = 1 * 1e-1
        #     right_wheel_rotation_delta = -1 * 1e-1
        # elif action == 3:
        #     left_wheel_rotation_delta = -1 * 1e-1
        #     right_wheel_rotation_delta = 1 * 1e-1
        # elif action == 4:
        #     left_wheel_rotation_delta = 0
        #     right_wheel_rotation_delta = 1 * 1e-1
        # elif action == 5:
        #     left_wheel_rotation_delta = 1 * 1e-1
        #     right_wheel_rotation_delta = 0
        # elif action == 6:
        #     left_wheel_rotation_delta = 0
        #     right_wheel_rotation_delta = -1 * 1e-1
        # elif action == 7:
        #     left_wheel_rotation_delta = -1 * 1e-1
        #     right_wheel_rotation_delta = 0
        # elif action == 8:
        #     left_wheel_rotation_delta = 0
        #     right_wheel_rotation_delta = 0

        if action == 0:
            left_wheel_rotation_delta = 1 * 1e-1
            right_wheel_rotation_delta = 1 * 1e-1
        elif action == 1:
            left_wheel_rotation_delta = 0#0.5 * 1e-1
            right_wheel_rotation_delta = 0#1 * 1e-1
        # elif action == 2:
        #     left_wheel_rotation_delta = 0.5 * 1e-1
        #     right_wheel_rotation_delta = -1 * 1e-1

        # left_wheel_rotation_delta = 0.1
        # right_wheel_rotation_delta = 0.1

        return left_wheel_rotation_delta,right_wheel_rotation_delta

    def cal_distance_and_theta(self,roomba_x,roomba_y,ball_x,ball_y):
        distance = np.sqrt((roomba_x-ball_x)**2+(roomba_y-ball_y)**2)
        theta = np.arctan2((roomba_y-ball_y),(roomba_x-ball_x))

        return distance,theta


    def step(self, action):
        left_wheel_rotation_delta,right_wheel_rotation_delta = self.ctrl(action)
        self.roomba_simulator.odometry_update(left_wheel_rotation_delta, right_wheel_rotation_delta)
        self.roomba_simulator.world_pos_update()
        roomba_x,roomba_y,roomba_ori = self.roomba_simulator.get_world_pos_ori()
        ball_x,ball_y = self.roomba_simulator.get_ball_pos()

        reward = torch.FloatTensor([0])
        reward2 = 0.0
        reward1 = 0.0

        distance,theta = self.cal_distance_and_theta(roomba_x,roomba_y,ball_x,ball_y)

        # obs = np.array([roomba_x,roomba_y,roomba_ori,ball_x,ball_y])
        obs = np.array([distance,theta])

        if self.roomba_simulator.touch_ball():
            done = True
            reward1 = 50 + 5*(1000 - self.step_count)
        else:
            done = False
            if self.pre_distance > distance:
                reward1 = 100*(self.pre_distance - distance)
            else:
                reward1 = -0.05

            # reward2 += 2* (0.79-abs(theta))
        # print(reward1,done)

        info = {'distance':distance,'theta':theta}

        reward = torch.FloatTensor([reward1+reward2])

        self.pre_distance = distance
        self.pre_theta = theta
        self.step_count += 1

        # print(reward,self.pre_distance,distance)

        return obs, reward, done, info

    def make_anime(self):
        self.roomba_simulator.plot()


    # def close(self):

    # def seed(self, seed=None):


In [None]:
import gymnasium as gym
gym.envs.registration.register(id='RoombaEnv-v0',entry_point=RoombaEnv)

## Config

In [None]:
from torch import nn
# 定数の設定
ENV = 'RoombaEnv-v0'  # 使用する課題名
MAX_STEPS = 1000  # 1試行のstep数
NUM_EPISODES = 200  # 最大試行回数

GAMMA = 0.99  # 時間割引率
LEARNING_RATE = 1e-4
BATCH_SIZE = 32
CAPACITY = 100
MODELS =""
CRITERION = nn.SmoothL1Loss()
UPDATE_FREQ = 1000 #step
NOTIFI = True

memory_loss = []
update_count = []

memory_step = []
sum_reward = []
memory_reward_mean = []
episode_count = []
memory_distance = []
memory_theta = []

## ReplayMemory class

In [None]:
# 経験を保存するメモリクラスを定義します
class ReplayMemory:
    def __init__(self, CAPACITY):
        self.capacity = CAPACITY  # メモリの最大長さ
        self.memory = []  # 経験を保存する変数
        self.index = 0  # 保存するindexを示す変数

    def push(self, state, action, state_next, reward):
        """state, action, state_next, rewardをメモリに保存します"""

        if len(self.memory) < self.capacity:
            self.memory.append(None)  # メモリに保存されている経験が，最大まで満たされていない場合は，新たなに枠を確保する．

        # namedtupleのTransitionを使用し、値とフィールド名をペアにして保存します
        self.memory[self.index] = Transition(state, action, state_next, reward)

        self.index = (self.index + 1) % self.capacity  # 保存するindexを1つずらす

    def sample(self, batch_size):
        """batch_size分だけ、ランダムに保存内容を取り出します"""
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

## Brain class

In [None]:
# エージェントが持つ脳となるクラスです、DQNを実行
# Q関数をディープラーニングのネットワークをクラスとして定義
import random
import torch
from torch import nn
from torch import optim
import torch.nn.functional as F
from torch.autograd import Variable


class Brain:
    def __init__(self, num_states, num_actions):
        self.update_time = 0
        self.num_states = num_states
        self.num_actions = num_actions
        self.memory = ReplayMemory(CAPACITY)


        # Online Q Network
        self.model = nn.Sequential()
        self.model.add_module('fc1', nn.Linear(self.num_states, 256))
        self.model.add_module('relu1', nn.ReLU())
        self.model.add_module('fc2', nn.Linear(256, 128))
        self.model.add_module('relu2', nn.ReLU())
        self.model.add_module('fc3', nn.Linear(128, 64))
        self.model.add_module('relu3', nn.ReLU())
        self.model.add_module('fc4', nn.Linear(64, self.num_actions))

        # Target Q Network
        self.target_model = nn.Sequential()
        self.target_model.add_module('fc1', nn.Linear(self.num_states, 256))
        self.target_model.add_module('relu1', nn.ReLU())
        self.target_model.add_module('fc2', nn.Linear(256, 128))
        self.target_model.add_module('relu2', nn.ReLU())
        self.target_model.add_module('fc3', nn.Linear(128, 64))
        self.target_model.add_module('relu3', nn.ReLU())
        self.target_model.add_module('fc4', nn.Linear(64, self.num_actions))

        print(self.model)  # ネットワークの形を出力
        MODELS = self.model
        self.optimizer = optim.Adam(self.model.parameters(), lr=LEARNING_RATE)

    def replay(self):
        if len(self.memory) < BATCH_SIZE:
            return

        transitions = self.memory.sample(BATCH_SIZE)
        batch = Transition(*zip(*transitions))
        non_final_mask = torch.ByteTensor(tuple(map(lambda s: s is not None,batch.next_state)))

        state_batch = Variable(torch.cat(batch.state))
        action_batch = Variable(torch.cat(batch.action))
        reward_batch = Variable(torch.cat(batch.reward))
        non_final_next_states = Variable(torch.cat([s for s in batch.next_state if s is not None]))

        self.model.eval()
        state_action_values = self.model(state_batch).gather(1, action_batch)

        # 教師データの作成
        next_state_values = Variable(torch.zeros(BATCH_SIZE).type(torch.FloatTensor))
        next_state_values[non_final_mask] = self.target_model(non_final_next_states).data.max(1)[0]
        expected_state_action_values = reward_batch + GAMMA * next_state_values

        if self.update_time % UPDATE_FREQ == 0:
            self.target_model.load_state_dict(self.model.state_dict())

        # パラメータの更新
        self.model.train()
        loss = CRITERION(state_action_values, expected_state_action_values)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # 記録
        self.update_time +=  1
        writer.add_scalar("loss vs. step", loss.detach().numpy(),self.update_time)
        memory_loss.append(loss.detach().numpy())
        update_count.append(self.update_time)


    def decide_action(self, state, episode):
        epsilon = 0.5 * (1 / (episode + 1))
        if epsilon < 0.1:
            epsilon = 0.1

        if epsilon <= np.random.uniform(0, 1):
            self.model.eval()  # ネットワークを推論モードに切り替える
            action = self.model(Variable(state)).data.max(1)[1].view(1, 1)
        else:
            action = torch.LongTensor(
                [[random.randrange(self.num_actions)]])

        return action

    def save_model(self):
        # モデル全体の保存
        torch.save(self.model, 'models/model.pth')


## Agent class

In [None]:
class Agent:
    def __init__(self, num_states, num_actions):
        """課題の状態と行動の数を設定します"""
        self.num_states = num_states  # CartPoleは状態数4を取得
        self.num_actions = num_actions  # CartPoleの行動（右に左に押す）の2を取得
        self.brain = Brain(num_states, num_actions)  # エージェントが行動を決定するための頭脳を生成

    def update_q_function(self):
        """Q関数を更新します"""
        self.brain.replay()

    def get_action(self, state, step):
        """行動の決定します"""
        action = self.brain.decide_action(state, step)
        return action

    def memorize(self, state, action, state_next, reward):
        """memoryオブジェクトに、state, action, state_next, rewardの内容を保存します"""
        self.brain.memory.push(state, action, state_next, reward)

    def record_dqn_model(self):
        self.brain.save_model()

## Environment class

In [None]:
import os
import matplotlib.pyplot as plt
import matplotlib.animation as animation

class Environment:
    def __init__(self):
        self.env = gym.make(ENV)  # 実行する課題を設定, render_mode="human"
        self.num_states = self.env.observation_space.shape[0]  # 課題の状態と行動の数を設定
        self.num_actions = self.env.action_space.n  # CartPoleの行動（右に左に押す）の2を取得
        # 環境内で行動するAgentを生成
        self.agent = Agent(self.num_states, self.num_actions)
        self.reward_mean = np.zeros(10)  # 100試行分の報酬を格納し、平均報酬を求める．

        # 動画の記録
        self.fig = plt.figure()
        self.movie = []

        self.rewards = 0
        self.reward_mean_pre = 0
        self.episode_final = 0
        self.num_dizitized = 10  # 状態の分割数

    def list_2_Tensor(self, observation):
        state = torch.from_numpy(observation).type(torch.FloatTensor)
        state = torch.unsqueeze(state, 0)
        #ex. [0.1, 0.2, 0.3, 0.4] → [[0.1, 0.2, 0.3, 0.4]]．
        #二次元のテンソルに変更し，バッチ処理できるようにする． ex.[[0.1, 0.2, 0.3, 0.4],[...]]
        return state

    # 各値を離散値に変換
    def bins(self, clip_min, clip_max, num):
        return np.linspace(clip_min, clip_max, num + 1)[1:-1]

    def digitize_state(self, observation):
        distance,theta = observation
        digitized = [
            np.digitize(distance, bins=self.bins(0, 5.6, self.num_dizitized)),
            np.digitize(theta, bins=self.bins(-0.79, 0.79, self.num_dizitized))
        ]
        # return sum([x * (self.num_dizitized**i) for i, x in enumerate(digitized)])
        return np.array(digitized)


    def run(self):
        for episode in range(NUM_EPISODES):  # 試行数分繰り返す
            observation = self.env.reset()  # 環境の初期化
            state = self.list_2_Tensor(observation)

            for step in range(MAX_STEPS):
                action = self.agent.get_action(state, episode)  # 行動を求める
                observation_next, reward, done, info = self.env.step(action[0, 0].item())
                # observation_next = self.digitize_state(observation_next)
                state_next = self.list_2_Tensor(observation_next)
                self.agent.memorize(state, action, state_next, reward)
                self.agent.update_q_function()

                state = state_next
                int_reward = int(reward.item())
                self.rewards += int_reward

                if done or step == MAX_STEPS -1:

                    self.reward_mean = np.hstack((self.reward_mean[1:], self.rewards))
                    print('%d Episode: Finished after %d  :10Average = %.3lf' % (episode, step + 1, self.reward_mean.mean()))
                    # writer.add_scalar("10_mean_step vs. Episode", self.total_step.mean(),episode)

                    episode_count.append(episode + 1)
                    memory_reward_mean.append(self.reward_mean.mean())
                    sum_reward.append(self.rewards)
                    memory_step.append(step+1)
                    memory_distance.append(info['distance'])
                    memory_theta.append(info['theta'])

                    if episode == NUM_EPISODES - 1:
                        self.agent.record_dqn_model()

                    break


    def inference(self):
        print("**** Inference Test ****")
        observation = self.env.reset()  # 環境の初期化
        state = self.list_2_Tensor(observation)

        for step in range(MAX_STEPS):
            action = self.agent.get_action(state, 0)  # 行動を求める
            observation_next, reward, done, _ = self.env.step(action[0, 0].item())

            state_next = self.list_2_Tensor(observation_next)
            state = state_next
            self.env.make_anime()

            print('now %d step : get reward = %d' % (step + 1, reward))

            if done:
                break

# main

In [None]:
# main
roomba_env = Environment()
roomba_env.run()
roomba_env.inference()

# プロット

In [None]:
# env.close()
writer.close()

In [None]:

plt.figure(tight_layout=True)
plt.subplot(2, 2, 1)
plt.plot(episode_count,memory_reward_mean, color="red")
plt.grid()
plt.xlabel("episode")
plt.ylabel("reward_mean")

plt.subplot(2, 2, 2)
plt.plot(episode_count, memory_step, color="blue")
plt.grid()
plt.xlabel("episode")
plt.ylabel("step")

plt.subplot(2, 2, 3)
plt.plot(episode_count,memory_distance, color="green")
plt.grid()
plt.xlabel("episode")
plt.ylabel("final_distance")

plt.subplot(2, 2, 4)
plt.plot(episode_count, memory_theta, color="yellow")
plt.grid()
plt.xlabel("episode")
plt.ylabel("theta")

plt.savefig("img/test.png")

In [None]:
import json
import datetime

dt_now = datetime.datetime.now()

with open("./settings.json", "r", encoding="utf-8") as f:
        j = json.load(f)

token = j["LINE_token"]["my_token"]

if NOTIFI:
        bot = LINENotifyBot(access_token = token)
        bot.send(
        message="トレーニング完了 \n "
                +"Data:"+str(dt_now)+"\n"
                +"ENV:"+str(ENV)+"\n"
                +"CRITERION:"+str(CRITERION)+"\n"
                +"GAMMA:"+str(GAMMA)+"\n"
                +"LEARNING_RATE:"+str(LEARNING_RATE)+"\n"
                +"MAX_STEPS:"+str(MAX_STEPS)+"\n"
                +"MAX_STEPS:"+str(NUM_EPISODES)+"\n"
                +"BATCH_SIZE:"+str(BATCH_SIZE)+"\n"
                +"CAPACITY:"+str(CAPACITY)+"\n"
                +"UPDATE_FREQ:"+str(UPDATE_FREQ)+"\n"
                +"NUM_NEURON:"+str(MODELS)+"\n"
                ,
        image='img/test.png'
)

# メモ
## 改造案
- 台形内にボール座標が含まれたら，Roombaの中心座標からの相対座標を返す．台形外なら-1．
- 中間層を増やす．