In [None]:
!pip install swig
!pip install gym[box2d]
!apt update
!apt install -y xvfb
!pip install -U pyvirtualdisplay
!pip install gym-notebook-wrapper

Collecting swig
  Using cached swig-4.3.0-py2.py3-none-manylinux_2_5_x86_64.manylinux1_x86_64.whl.metadata (3.5 kB)
Using cached swig-4.3.0-py2.py3-none-manylinux_2_5_x86_64.manylinux1_x86_64.whl (1.9 MB)
Installing collected packages: swig
Successfully installed swig-4.3.0
Collecting box2d-py==2.3.5 (from gym[box2d])
  Using cached box2d-py-2.3.5.tar.gz (374 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting pygame==2.1.0 (from gym[box2d])
  Downloading pygame-2.1.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.5 kB)
Downloading pygame-2.1.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (18.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m18.3/18.3 MB[0m [31m70.2 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: box2d-py
  Building wheel for box2d-py (setup.py) ... [?25l[?25hdone
  Created wheel for box2d-py: filename=box2d_py-2.3.5-cp310-cp310-linux_x86_64.whl size=2376419 sha256=d

In [None]:
import os
import numpy as np
from matplotlib import pyplot as plt
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras import Model
from tensorflow.keras.optimizers import Adam
import gym
from gym.wrappers import RecordVideo
from IPython.display import Video, display
from google.colab import drive
drive.mount('/content/drive')



Mounted at /content/drive


In [None]:
class NoiseGenerator:
    def __init__(self, mean, std_dev, theta=0.3, dt=5e-2):
        self.theta = theta
        self.dt = dt
        self.mean = mean
        self.std_dev = std_dev

        if mean.shape != std_dev.shape:
            raise ValueError('Mean shape: {} and std_dev shape: {} should be the same!'.format(
                mean.shape, std_dev.shape))

        self.x_shape = mean.shape
        self.x = None

        self.reset()

    def reset(self):
        self.x = np.zeros_like(self.x_shape)

    def generate(self):
        self.x = (self.x
                  + self.theta * (self.mean - self.x) * self.dt
                  + self.std_dev * np.sqrt(self.dt) * np.random.normal(size=self.x_shape))

        return self.x


class MemoriesRecorder:
    def __init__(self, memory_capacity=50000):
        self.memory_capacity = memory_capacity
        self.state_db     = None
        self.action_db    = None
        self.reward_db    = None
        self.new_state_db = None

        self.writes_num = 0

    def init_memory(self, state_shape, action_shape):
        state_shape  = prepend_tuple(self.memory_capacity, state_shape)
        action_shape = prepend_tuple(self.memory_capacity, action_shape)

        self.state_db     = np.zeros(state_shape, np.float32)
        self.action_db    = np.zeros(action_shape, np.float32)
        self.reward_db    = np.zeros((self.memory_capacity, 1), np.float32)
        self.new_state_db = np.zeros(state_shape, np.float32)

    def write(self, state, action, reward, new_state):
        if self.state_db is None:
            self.init_memory(state.shape, action.shape)

        memory_index = self.writes_num % self.memory_capacity

        self.state_db[memory_index]     = state
        self.action_db[memory_index]    = action
        self.reward_db[memory_index]    = reward
        self.new_state_db[memory_index] = new_state

        self.writes_num += 1

    def sample(self, batch_size=64):
        indexes_range = min(self.memory_capacity, self.writes_num)
        sampled_indexes = np.random.choice(indexes_range, batch_size)

        return (self.state_db[sampled_indexes],
                self.action_db[sampled_indexes],
                self.reward_db[sampled_indexes],
                self.new_state_db[sampled_indexes])


def show_img(img, hide_colorbar=False):
    if len(img.shape) < 3 or img.shape[2] == 1:
        plt.imshow(img, cmap='gray')
    else:
        plt.imshow(img)

    if not hide_colorbar:
        plt.colorbar()


def prepend_tuple(new_dim, some_shape):
    some_shape_list = list(some_shape)
    some_shape_list.insert(0, new_dim)
    return tuple(some_shape_list)


def replace_color(data, original, new_value):
    r1, g1, b1 = original
    r2, g2, b2 = new_value

    red, green, blue = data[:,:,0], data[:,:,1], data[:,:,2]
    mask = (red == r1) & (green == g1) & (blue == b1)
    data[:,:,:3][mask] = [r2, g2, b2]

class BaseSolution:
    def __init__(self, action_space, model_outputs=None, noise_mean=None, noise_std=None):

        self.gamma = 0.99
        self.actor_lr = 0.00001
        self.critic_lr = 0.002
        self.tau = 0.005
        self.memory_capacity = 60000
        self.need_decode_out = model_outputs is not None
        self.model_action_out = model_outputs if model_outputs else action_space.shape[0]
        self.action_space = action_space

        if noise_mean is None:
            noise_mean = np.full(self.model_action_out, 0.0, np.float32)
        if noise_std is None:
            noise_std  = np.full(self.model_action_out, 0.2, np.float32)

        self.noise = NoiseGenerator(noise_mean, noise_std)
        self.r_buffer = MemoriesRecorder(memory_capacity=self.memory_capacity)

        self.actor_opt      = Adam(self.actor_lr)
        self.critic_opt     = Adam(self.critic_lr)
        self.actor          = None
        self.critic         = None
        self.target_actor   = None
        self.target_critic  = None

    def reset(self):
        self.noise.reset()

    def build_actor(self, state_shape, name="Actor"):
        inputs = layers.Input(shape=state_shape)
        x = inputs
        x = layers.Conv2D(16, kernel_size=(5, 5), strides=(4, 4), padding='valid', use_bias=False, activation="relu")(x)
        x = layers.Conv2D(32, kernel_size=(3, 3), strides=(3, 3), padding='valid', use_bias=False, activation="relu")(x)
        x = layers.Conv2D(32, kernel_size=(3, 3), strides=(3, 3), padding='valid', use_bias=False, activation="relu")(x)

        x = layers.Flatten()(x)
        x = layers.Dense(64, activation='relu')(x)
        last_init = tf.random_uniform_initializer(minval=-0.005, maxval=0.005)
        y = layers.Dense(self.model_action_out, activation='tanh')(x)

        model = Model(inputs=inputs, outputs=y, name=name)
        model.summary()
        return model

    def build_critic(self, state_shape, name="Critic"):
        state_inputs = layers.Input(shape=state_shape)
        x = state_inputs
        x = layers.Conv2D(16, kernel_size=(5, 5), strides=(4, 4), padding='valid', use_bias=False, activation="relu")(x)
        x = layers.Conv2D(32, kernel_size=(3, 3), strides=(3, 3), padding='valid', use_bias=False, activation="relu")(x)
        x = layers.Conv2D(32, kernel_size=(3, 3), strides=(3, 3), padding='valid', use_bias=False, activation="relu")(x)

        x = layers.Flatten()(x)
        action_inputs = layers.Input(shape=(self.model_action_out,))
        x = layers.concatenate([x, action_inputs])

        x = layers.Dense(64, activation='relu')(x)
        x = layers.Dense(32, activation='relu')(x)
        y = layers.Dense(1)(x)

        model = Model(inputs=[state_inputs, action_inputs], outputs=y, name=name)
        model.summary()
        return model

    def init_networks(self, state_shape):
        self.actor  = self.build_actor(state_shape)
        self.critic = self.build_critic(state_shape)
        self.target_actor  = self.build_actor(state_shape, name='TargetActor')
        self.target_critic = self.build_critic(state_shape, name='TargetCritic')
        self.target_actor.set_weights(self.actor.get_weights())
        self.target_critic.set_weights(self.critic.get_weights())

    def get_action(self, state, add_noise=True):
        prep_state = self.preprocess(state)
        if self.actor is None:
            self.init_networks(prep_state.shape)

        tensor_state = tf.expand_dims(tf.convert_to_tensor(prep_state), 0)
        actor_output = self.actor(tensor_state).numpy()

        if add_noise:
            actor_output = actor_output[0] + self.noise.generate()
        else:
            actor_output = actor_output[0]

        if self.need_decode_out:
            env_action = self.decode_model_output(actor_output)
        else:
            env_action = actor_output

        env_action = np.clip(np.array(env_action), a_min=self.action_space.low, a_max=self.action_space.high)
        return env_action, actor_output

    def decode_model_output(self, model_out):
        return np.array([model_out[0], model_out[1].clip(0, 1), -model_out[1].clip(-1, 0)])

    def preprocess(self, img, greyscale=False):
        img = img.copy()
        for i in range(88, 93+1):
            img[i, 0:12, :] = img[i, 12, :]

        replace_color(img, original=(102, 229, 102), new_value=(102, 204, 102))

        if greyscale:
            img = img.mean(axis=2)
            img = np.expand_dims(img, 2)

        car_color = 68.0
        car_area = img[67:77, 42:53]
        car_area[car_area == car_color] = 0

        img = img / img.max()
        img[(img > 0.411) & (img < 0.412)] = 0.4
        img[(img > 0.419) & (img < 0.420)] = 0.4

        game_screen = img[0:83, :]
        game_screen[game_screen == 1] = 0.80
        return img

    def learn(self, state, train_action, reward, new_state):
        prep_state     = self.preprocess(state)
        prep_new_state = self.preprocess(new_state)
        self.r_buffer.write(prep_state, train_action, reward, prep_new_state)
        state_batch, action_batch, reward_batch, new_state_batch = self.r_buffer.sample()

        state_batch     = tf.convert_to_tensor(state_batch)
        action_batch    = tf.convert_to_tensor(action_batch)
        reward_batch    = tf.convert_to_tensor(reward_batch, dtype=tf.float32)
        new_state_batch = tf.convert_to_tensor(new_state_batch)

        self.update_actor_critic(state_batch, action_batch, reward_batch, new_state_batch)

    @tf.function
    def update_actor_critic(self, state, action, reward, new_state):
        with tf.GradientTape() as tape:
            new_action = self.target_actor(new_state, training=True)
            y = reward + self.gamma * self.target_critic([new_state, new_action], training=True)
            critic_loss = tf.math.reduce_mean(tf.square(y - self.critic([state, action], training=True)))

        critic_gradients = tape.gradient(critic_loss, self.critic.trainable_variables)
        self.critic_opt.apply_gradients(zip(critic_gradients, self.critic.trainable_variables))

        with tf.GradientTape() as tape:
            critic_out = self.critic([state, self.actor(state, training=True)], training=True)
            actor_loss = -tf.math.reduce_mean(critic_out)

        actor_gradients = tape.gradient(actor_loss, self.actor.trainable_variables)
        self.actor_opt.apply_gradients(zip(actor_gradients, self.actor.trainable_variables))

    @tf.function
    def update_target_network(self, target_weights, new_weights):
        for t, n in zip(target_weights, new_weights):
            t.assign((1 - self.tau) * t + self.tau * n)

    def save_solution(self, path='/content/'):
        self.actor.save(path + 'actor.h5')
        self.critic.save(path + 'critic.h5')
        self.target_actor.save(path + 'target_actor.h5')
        self.target_critic.save(path + 'target_critic.h5')

    def load_solution(self, path='models/'):
        self.actor = tf.keras.models.load_model(path + 'actor.h5')
        self.critic = tf.keras.models.load_model(path + 'critic.h5')
        self.target_actor = tf.keras.models.load_model(path + 'target_actor.h5')
        self.target_critic = tf.keras.models.load_model(path + 'target_critic.h5')


def train_car_racing():
    n_episodes = 5  #実際はもっと大きな値で訓練を行う
    problem = 'CarRacing-v2'
    model_path = '/content/drive/MyDrive/DQN_save/'

    gym.logger.set_level(40)
    preview = False
    best_result = 0
    all_episode_reward = []
    env = gym.make(problem)
    env.reset()
    noise_std = np.array([0.1, 4 * 0.2], dtype=np.float32)

    solution = BaseSolution(env.action_space, model_outputs=2, noise_std=noise_std)

    try:
        if (os.path.exists(os.path.join(model_path, 'actor.h5')) and
            os.path.exists(os.path.join(model_path, 'critic.h5')) and
            os.path.exists(os.path.join(model_path, 'target_actor.h5')) and
            os.path.exists(os.path.join(model_path, 'target_critic.h5'))):
            solution.load_solution(model_path)
            print("既存のモデルをロードして、続きから学習を行います。")
        else:
            print("既存のモデルファイルが見つかりません。新規に学習を開始します。")
    except Exception as e:
        print("モデルのロードでエラーが発生しました。新規に学習を開始します。")
        print(e)

    print("\n===== 学習開始 =====")
    for ie in range(n_episodes):
        state = env.reset()
        solution.reset()
        done = False
        episode_reward = 0
        no_reward_counter = 0

        while not done:
            if preview:
                env.render()

            action, train_action = solution.get_action(state)
            action /= 4
            new_state, reward, done, info = env.step(action)
            solution.learn(state, train_action, reward, new_state)
            state = new_state
            episode_reward += reward

            if reward < 0:
                no_reward_counter += 1
                if no_reward_counter > 200:
                    break
            else:
                no_reward_counter = 0

        all_episode_reward.append(episode_reward)
        average_result = np.array(all_episode_reward[-10:]).mean()
        print(f"Episode: {ie}, Last result: {episode_reward:.2f}, Average results(10eps): {average_result:.2f}")

        if episode_reward > best_result:
            print('Saving best solution')
            solution.save_solution(path=model_path)
            best_result = episode_reward

    env.close()
    print("\n===== 学習終了 =====")


if __name__ == "__main__":
    train_car_racing()




  from pkg_resources import resource_stream, resource_exists
Implementing implicit namespace packages (as specified in PEP 420) is preferred to `pkg_resources.declare_namespace`. See https://setuptools.pypa.io/en/latest/references/keywords.html#keyword-namespace-packages
  declare_namespace(pkg)
Implementing implicit namespace packages (as specified in PEP 420) is preferred to `pkg_resources.declare_namespace`. See https://setuptools.pypa.io/en/latest/references/keywords.html#keyword-namespace-packages
  declare_namespace(pkg)
Implementing implicit namespace packages (as specified in PEP 420) is preferred to `pkg_resources.declare_namespace`. See https://setuptools.pypa.io/en/latest/references/keywords.html#keyword-namespace-packages
  declare_namespace(pkg)


既存のモデルをロードして、続きから学習を行います。

===== 学習開始 =====


  if not isinstance(terminated, (bool, np.bool8)):


Episode: 0, Last result: 21.52, Average results(10eps): 21.52
Saving best solution




Episode: 1, Last result: 362.50, Average results(10eps): 192.01
Saving best solution




Episode: 2, Last result: 93.77, Average results(10eps): 159.26
Episode: 3, Last result: 218.90, Average results(10eps): 174.17
Episode: 4, Last result: 342.92, Average results(10eps): 207.92

===== 学習終了 =====


In [None]:
import os
import gym
import numpy as np
import tensorflow as tf
from gym.wrappers import RecordVideo
from IPython.display import display, Video


def test_with_pretrained_model():
    test_env = RecordVideo(
        gym.make("CarRacing-v2"),
        video_folder="./mp4"
    )

    solution_eval = BaseSolution(test_env.action_space, model_outputs=2)
    solution_eval.load_solution(path="/content/drive/MyDrive/DQN_save/")

    num_episodes = 1
    for e in range(num_episodes):
        state = test_env.reset()
        solution_eval.reset()

        total_reward = 0.0
        done = False
        step_counter = 0

        while not done:
            preprocessed_state = solution_eval.preprocess(state)
            action, _ = solution_eval.get_action(preprocessed_state, add_noise=False)
            action /= 4.0
            next_state, reward, done, info = test_env.step(action)
            total_reward += reward
            step_counter += 1
            state = next_state

        print(f"Test Episode: {e+1}/{num_episodes}, Steps: {step_counter}, Total Reward: {total_reward:.2f}")

    test_env.close()

    video_path = "./mp4/rl-video-episode-0.mp4"
    if os.path.exists(video_path):
        print("Generated video path:", video_path)
        display(Video(video_path, embed=True))
    else:
        print("Video file not found:", video_path)

if __name__ == "__main__":
    test_with_pretrained_model()


  if not isinstance(terminated, (bool, np.bool8)):


Test Episode: 1/1, Steps: 1000, Total Reward: 420.91
Generated video path: ./mp4/rl-video-episode-0.mp4


In [None]:
!pip install swig
!pip install gymnasium[box2d]
!apt update
!apt install -y xvfb
!pip install -U pyvirtualdisplay
!pip install gym-notebook-wrapper

Collecting swig
  Downloading swig-4.3.0-py2.py3-none-manylinux_2_5_x86_64.manylinux1_x86_64.whl.metadata (3.5 kB)
Downloading swig-4.3.0-py2.py3-none-manylinux_2_5_x86_64.manylinux1_x86_64.whl (1.9 MB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.9 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.9/1.9 MB[0m [31m58.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: swig
Successfully installed swig-4.3.0
Collecting gymnasium[box2d]
  Using cached gymnasium-1.0.0-py3-none-any.whl.metadata (9.5 kB)
Collecting farama-notifications>=0.0.1 (from gymnasium[box2d])
  Using cached Farama_Notifications-0.0.4-py3-none-any.whl.metadata (558 bytes)
Collecting box2d-py==2.3.5 (from gymnasium[box2d])
  Using cached box2d-py-2.3.5.tar.gz (374 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Downloading Farama_Notifications-0.0.4-py3-none-any.whl (2.5 kB)
Downloading gymnasium-1.0.0-py3-none-any.

In [None]:
import os
import numpy as np
from matplotlib import pyplot as plt
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras import Model
from tensorflow.keras.optimizers import Adam
import gym
from gym.wrappers import RecordVideo
from IPython.display import Video, display
from google.colab import drive
drive.mount('/content/drive')



Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
class NoiseGenerator:
    def __init__(self, mean, std_dev, theta=0.3, dt=5e-2):
        self.theta = theta
        self.dt = dt
        self.mean = mean
        self.std_dev = std_dev

        if mean.shape != std_dev.shape:
            raise ValueError('Mean shape: {} and std_dev shape: {} should be the same!'.format(
                mean.shape, std_dev.shape))

        self.x_shape = mean.shape
        self.x = None

        self.reset()

    def reset(self):
        self.x = np.zeros_like(self.x_shape)

    def generate(self):
        self.x = (self.x
                  + self.theta * (self.mean - self.x) * self.dt
                  + self.std_dev * np.sqrt(self.dt) * np.random.normal(size=self.x_shape))

        return self.x


class MemoriesRecorder:
    def __init__(self, memory_capacity=50000):
        self.memory_capacity = memory_capacity
        self.state_db     = None
        self.action_db    = None
        self.reward_db    = None
        self.new_state_db = None

        self.writes_num = 0

    def init_memory(self, state_shape, action_shape):
        state_shape  = prepend_tuple(self.memory_capacity, state_shape)
        action_shape = prepend_tuple(self.memory_capacity, action_shape)

        self.state_db     = np.zeros(state_shape, np.float32)
        self.action_db    = np.zeros(action_shape, np.float32)
        self.reward_db    = np.zeros((self.memory_capacity, 1), np.float32)
        self.new_state_db = np.zeros(state_shape, np.float32)

    def write(self, state, action, reward, new_state):
        if self.state_db is None:
            self.init_memory(state.shape, action.shape)

        memory_index = self.writes_num % self.memory_capacity

        self.state_db[memory_index]     = state
        self.action_db[memory_index]    = action
        self.reward_db[memory_index]    = reward
        self.new_state_db[memory_index] = new_state

        self.writes_num += 1

    def sample(self, batch_size=64):
        indexes_range = min(self.memory_capacity, self.writes_num)
        sampled_indexes = np.random.choice(indexes_range, batch_size)

        return (self.state_db[sampled_indexes],
                self.action_db[sampled_indexes],
                self.reward_db[sampled_indexes],
                self.new_state_db[sampled_indexes])


def show_img(img, hide_colorbar=False):
    if len(img.shape) < 3 or img.shape[2] == 1:
        plt.imshow(img, cmap='gray')
    else:
        plt.imshow(img)

    if not hide_colorbar:
        plt.colorbar()


def prepend_tuple(new_dim, some_shape):
    some_shape_list = list(some_shape)
    some_shape_list.insert(0, new_dim)
    return tuple(some_shape_list)


def replace_color(data, original, new_value):
    r1, g1, b1 = original
    r2, g2, b2 = new_value

    red, green, blue = data[:,:,0], data[:,:,1], data[:,:,2]
    mask = (red == r1) & (green == g1) & (blue == b1)
    data[:,:,:3][mask] = [r2, g2, b2]

class BaseSolution:
    def __init__(self, action_space, model_outputs=None, noise_mean=None, noise_std=None):

        self.gamma = 0.99
        self.actor_lr = 0.00001
        self.critic_lr = 0.002
        self.tau = 0.005
        self.memory_capacity = 60000
        self.need_decode_out = model_outputs is not None
        self.model_action_out = model_outputs if model_outputs else action_space.shape[0]
        self.action_space = action_space

        if noise_mean is None:
            noise_mean = np.full(self.model_action_out, 0.0, np.float32)
        if noise_std is None:
            noise_std  = np.full(self.model_action_out, 0.2, np.float32)

        self.noise = NoiseGenerator(noise_mean, noise_std)
        self.r_buffer = MemoriesRecorder(memory_capacity=self.memory_capacity)

        self.actor_opt      = Adam(self.actor_lr)
        self.critic_opt     = Adam(self.critic_lr)
        self.actor          = None
        self.critic         = None
        self.target_actor   = None
        self.target_critic  = None

    def reset(self):
        self.noise.reset()

    def build_actor(self, state_shape, name="Actor"):
        inputs = layers.Input(shape=state_shape)
        x = inputs
        x = layers.Conv2D(16, kernel_size=(5, 5), strides=(4, 4), padding='valid', use_bias=False, activation="relu")(x)
        x = layers.Conv2D(32, kernel_size=(3, 3), strides=(3, 3), padding='valid', use_bias=False, activation="relu")(x)
        x = layers.Conv2D(32, kernel_size=(3, 3), strides=(3, 3), padding='valid', use_bias=False, activation="relu")(x)

        x = layers.Flatten()(x)
        x = layers.Dense(64, activation='relu')(x)
        last_init = tf.random_uniform_initializer(minval=-0.005, maxval=0.005)
        y = layers.Dense(self.model_action_out, activation='tanh')(x)

        model = Model(inputs=inputs, outputs=y, name=name)
        model.summary()
        return model

    def build_critic(self, state_shape, name="Critic"):
        state_inputs = layers.Input(shape=state_shape)
        x = state_inputs
        x = layers.Conv2D(16, kernel_size=(5, 5), strides=(4, 4), padding='valid', use_bias=False, activation="relu")(x)
        x = layers.Conv2D(32, kernel_size=(3, 3), strides=(3, 3), padding='valid', use_bias=False, activation="relu")(x)
        x = layers.Conv2D(32, kernel_size=(3, 3), strides=(3, 3), padding='valid', use_bias=False, activation="relu")(x)

        x = layers.Flatten()(x)
        action_inputs = layers.Input(shape=(self.model_action_out,))
        x = layers.concatenate([x, action_inputs])

        x = layers.Dense(64, activation='relu')(x)
        x = layers.Dense(32, activation='relu')(x)
        y = layers.Dense(1)(x)

        model = Model(inputs=[state_inputs, action_inputs], outputs=y, name=name)
        model.summary()
        return model

    def init_networks(self, state_shape):
        self.actor  = self.build_actor(state_shape)
        self.critic = self.build_critic(state_shape)
        self.target_actor  = self.build_actor(state_shape, name='TargetActor')
        self.target_critic = self.build_critic(state_shape, name='TargetCritic')
        self.target_actor.set_weights(self.actor.get_weights())
        self.target_critic.set_weights(self.critic.get_weights())

    def get_action(self, state, add_noise=True):
        prep_state = self.preprocess(state)
        if self.actor is None:
            self.init_networks(prep_state.shape)

        tensor_state = tf.expand_dims(tf.convert_to_tensor(prep_state), 0)
        actor_output = self.actor(tensor_state).numpy()

        if add_noise:
            actor_output = actor_output[0] + self.noise.generate()
        else:
            actor_output = actor_output[0]

        if self.need_decode_out:
            env_action = self.decode_model_output(actor_output)
        else:
            env_action = actor_output

        env_action = np.clip(np.array(env_action), a_min=self.action_space.low, a_max=self.action_space.high)
        return env_action, actor_output

    def decode_model_output(self, model_out):
        return np.array([model_out[0], model_out[1].clip(0, 1), -model_out[1].clip(-1, 0)])

    def preprocess(self, img, greyscale=False):
        img = img.copy()
        for i in range(88, 93+1):
            img[i, 0:12, :] = img[i, 12, :]

        replace_color(img, original=(102, 229, 102), new_value=(102, 204, 102))

        if greyscale:
            img = img.mean(axis=2)
            img = np.expand_dims(img, 2)

        car_color = 68.0
        car_area = img[67:77, 42:53]
        car_area[car_area == car_color] = 0

        img = img / img.max()
        img[(img > 0.411) & (img < 0.412)] = 0.4
        img[(img > 0.419) & (img < 0.420)] = 0.4

        game_screen = img[0:83, :]
        game_screen[game_screen == 1] = 0.80
        return img

    def learn(self, state, train_action, reward, new_state):
        prep_state     = self.preprocess(state)
        prep_new_state = self.preprocess(new_state)
        self.r_buffer.write(prep_state, train_action, reward, prep_new_state)
        state_batch, action_batch, reward_batch, new_state_batch = self.r_buffer.sample()

        state_batch     = tf.convert_to_tensor(state_batch)
        action_batch    = tf.convert_to_tensor(action_batch)
        reward_batch    = tf.convert_to_tensor(reward_batch, dtype=tf.float32)
        new_state_batch = tf.convert_to_tensor(new_state_batch)

        self.update_actor_critic(state_batch, action_batch, reward_batch, new_state_batch)

    @tf.function
    def update_actor_critic(self, state, action, reward, new_state):
        with tf.GradientTape() as tape:
            new_action = self.target_actor(new_state, training=True)
            y = reward + self.gamma * self.target_critic([new_state, new_action], training=True)
            critic_loss = tf.math.reduce_mean(tf.square(y - self.critic([state, action], training=True)))

        critic_gradients = tape.gradient(critic_loss, self.critic.trainable_variables)
        self.critic_opt.apply_gradients(zip(critic_gradients, self.critic.trainable_variables))

        with tf.GradientTape() as tape:
            critic_out = self.critic([state, self.actor(state, training=True)], training=True)
            actor_loss = -tf.math.reduce_mean(critic_out)

        actor_gradients = tape.gradient(actor_loss, self.actor.trainable_variables)
        self.actor_opt.apply_gradients(zip(actor_gradients, self.actor.trainable_variables))

    @tf.function
    def update_target_network(self, target_weights, new_weights):
        for t, n in zip(target_weights, new_weights):
            t.assign((1 - self.tau) * t + self.tau * n)

    def save_solution(self, path='/content/'):
        self.actor.save(path + 'actor.h5')
        self.critic.save(path + 'critic.h5')
        self.target_actor.save(path + 'target_actor.h5')
        self.target_critic.save(path + 'target_critic.h5')

    def load_solution(self, path='models/'):
        self.actor = tf.keras.models.load_model(path + 'actor.h5')
        self.critic = tf.keras.models.load_model(path + 'critic.h5')
        self.target_actor = tf.keras.models.load_model(path + 'target_actor.h5')
        self.target_critic = tf.keras.models.load_model(path + 'target_critic.h5')


def train_car_racing():
    n_episodes = 5  #実際はもっと大きな値で訓練を行う
    problem = 'CarRacing-v2'
    model_path = '/content/drive/MyDrive/DQN_save/'

    gym.logger.set_level(40)
    preview = False
    best_result = 0
    all_episode_reward = []
    env = gym.make(problem)
    env.reset()
    noise_std = np.array([0.1, 4 * 0.2], dtype=np.float32)

    solution = BaseSolution(env.action_space, model_outputs=2, noise_std=noise_std)

    try:
        if (os.path.exists(os.path.join(model_path, 'actor.h5')) and
            os.path.exists(os.path.join(model_path, 'critic.h5')) and
            os.path.exists(os.path.join(model_path, 'target_actor.h5')) and
            os.path.exists(os.path.join(model_path, 'target_critic.h5'))):
            solution.load_solution(model_path)
            print("既存のモデルをロードして、続きから学習を行います。")
        else:
            print("既存のモデルファイルが見つかりません。新規に学習を開始します。")
    except Exception as e:
        print("モデルのロードでエラーが発生しました。新規に学習を開始します。")
        print(e)

    print("\n===== 学習開始 =====")
    for ie in range(n_episodes):
        state = env.reset()
        solution.reset()
        done = False
        episode_reward = 0
        no_reward_counter = 0

        while not done:
            if preview:
                env.render()

            action, train_action = solution.get_action(state)
            action /= 4
            new_state, reward, done, info = env.step(action)
            solution.learn(state, train_action, reward, new_state)
            state = new_state
            episode_reward += reward

            if reward < 0:
                no_reward_counter += 1
                if no_reward_counter > 200:
                    break
            else:
                no_reward_counter = 0

        all_episode_reward.append(episode_reward)
        average_result = np.array(all_episode_reward[-10:]).mean()
        print(f"Episode: {ie}, Last result: {episode_reward:.2f}, Average results(10eps): {average_result:.2f}")

        if episode_reward > best_result:
            print('Saving best solution')
            solution.save_solution(path=model_path)
            best_result = episode_reward

    env.close()
    print("\n===== 学習終了 =====")


if __name__ == "__main__":
    train_car_racing()




  from pkg_resources import resource_stream, resource_exists
Implementing implicit namespace packages (as specified in PEP 420) is preferred to `pkg_resources.declare_namespace`. See https://setuptools.pypa.io/en/latest/references/keywords.html#keyword-namespace-packages
  declare_namespace(pkg)
Implementing implicit namespace packages (as specified in PEP 420) is preferred to `pkg_resources.declare_namespace`. See https://setuptools.pypa.io/en/latest/references/keywords.html#keyword-namespace-packages
  declare_namespace(pkg)
Implementing implicit namespace packages (as specified in PEP 420) is preferred to `pkg_resources.declare_namespace`. See https://setuptools.pypa.io/en/latest/references/keywords.html#keyword-namespace-packages
  declare_namespace(pkg)


既存のモデルをロードして、続きから学習を行います。

===== 学習開始 =====


  if not isinstance(terminated, (bool, np.bool8)):


Episode: 0, Last result: 136.49, Average results(10eps): 136.49
Saving best solution
Episode: 1, Last result: -4.72, Average results(10eps): 65.89




Episode: 2, Last result: 354.43, Average results(10eps): 162.07
Saving best solution




Episode: 3, Last result: 172.05, Average results(10eps): 164.56
Episode: 4, Last result: 114.51, Average results(10eps): 154.55

===== 学習終了 =====


In [None]:
import os
import gym
import numpy as np
import tensorflow as tf
from gym.wrappers import RecordVideo
from IPython.display import display, Video


def test_with_pretrained_model():
    test_env = RecordVideo(
        gym.make("CarRacing-v2"),
        video_folder="./mp4"
    )

    solution_eval = BaseSolution(test_env.action_space, model_outputs=2)
    solution_eval.load_solution(path="/content/drive/MyDrive/DQN_save/")

    num_episodes = 1
    for e in range(num_episodes):
        state = test_env.reset()
        solution_eval.reset()

        total_reward = 0.0
        done = False
        step_counter = 0

        while not done:
            preprocessed_state = solution_eval.preprocess(state)
            action, _ = solution_eval.get_action(preprocessed_state, add_noise=False)
            action /= 4.0
            next_state, reward, done, info = test_env.step(action)
            total_reward += reward
            step_counter += 1
            state = next_state

        print(f"Test Episode: {e+1}/{num_episodes}, Steps: {step_counter}, Total Reward: {total_reward:.2f}")

    test_env.close()

    video_path = "./mp4/rl-video-episode-0.mp4"
    if os.path.exists(video_path):
        print("Generated video path:", video_path)
        display(Video(video_path, embed=True))
    else:
        print("Video file not found:", video_path)

if __name__ == "__main__":
    test_with_pretrained_model()


  if not isinstance(terminated, (bool, np.bool8)):


Test Episode: 1/1, Steps: 1000, Total Reward: 199.66
Generated video path: ./mp4/rl-video-episode-0.mp4
