In [1]:
import random
import gymnasium as gym # pip install gymnasium[classic-control]

import numpy as np
import tensorflow as tf 

from keras import Model
from keras.layers import Dense
from keras.optimizers import Adam
from keras.losses import mean_squared_error

In [2]:
class DQN(Model):
    def __init__(self):
        super(DQN, self).__init__()
        self.d1 = Dense(64, input_dim=4, activation='relu')
        self.d2 = Dense(32, activation='relu')
        self.d3 = Dense(32, activation='relu')
        self.d3 = Dense(2, activation='linear')
        self.optimizer = Adam(0.001)

        self.memory = []

    def call(self, x):
        x = self.d1(x)
        x = self.d2(x)
        y_hat = self.d3(x)
        return y_hat

In [3]:
def update_model(model: DQN, update_last: bool = False):

    if len(model.memory) < 1000:
        return
    
    if len(model.memory) > 10000:
        del model.memory[0]

    sample_size = 32
    batch = random.sample(model.memory, sample_size)
    if update_last:
        batch[-1] = model.memory[-1]

    states, actions, rewards, next_states, dones = zip(*batch)

    states = np.array(states)
    actions = np.array(actions)
    rewards = np.array(rewards)
    next_states = np.array(next_states)
    dones = np.array(dones)

    next_q_value = model.call(next_states).numpy() # type: ignore

    target_y = model.call(states).numpy()  # type: ignore
    target_y[range(sample_size), actions] = rewards + (1 - dones) * 0.95 * np.max(next_q_value, axis=1)  # type: ignore

    with tf.GradientTape() as tape:
        loss = mean_squared_error(target_y, model.call(states))
    grads = tape.gradient(loss, model.trainable_variables)
    model.optimizer.apply_gradients(zip(grads, model.trainable_variables))

In [4]:
model_left = DQN()
model_right = DQN()

model = {
    'left': model_left,
    'right': model_right
}

# 카트폴 게임 환경 생성
env = gym.make('CartPole-v1')
# env = gym.make('CartPole-v1', render_mode="human")

In [5]:
for episode in range(1000):
    state, info = env.reset()

    dir = np.random.choice(['left', 'right'])
    score = 0
    score_step = []

    for step in range(3000):
        action_list = model[dir].call(np.array([state])).numpy()[0]  # type: ignore

        # 확률적으로 행동을 선택
        action_list = np.exp(action_list) / np.sum(np.exp(action_list))
        action = np.random.choice([0, 1], p=action_list)

        next_state, reward, _, _, _ = env.step(action)
        done = not (-0.3 < next_state[2] < 0.3 and -4 < next_state[0] < 4)

        # 각도가 크게 벌어지면 게임 오버
        if done:
            reward = -20
            model[dir].memory.append((state, action, reward, next_state, done))
            update_model(model[dir], update_last=True)
            print("Episode: {}, steps: {}, scores: {}, dir: {}, log: {}".format(episode, step, score, dir, score_step))
            break

        stable_condition = np.all(abs(np.array(next_state[1:])) < 0.1)

        # 카트가 목표 지점에 잘 도달하면 보상
        if dir == 'left' and next_state[0] < -1 and stable_condition:
            reward = 50
            model[dir].memory.append((state, action, reward, next_state, done))
            update_model(model[dir], update_last=True)
            dir = 'right'
            score += 1
            score_step.append(step)

        elif dir == 'right' and next_state[0] > 1 and stable_condition:
            reward = 50
            model[dir].memory.append((state, action, reward, next_state, done))
            update_model(model[dir], update_last=True)
            dir = 'left'
            score += 1
            score_step.append(step)

        else:
            # 보상 함수 설계
            if dir == 'left':
                if next_state[0] < -1.1:
                    if next_state[0] > state[0]:
                        reward = 1
                    else:
                        reward = -0.5
                else:
                    if next_state[0] < state[0]:
                        reward = 1
                    else:
                        reward = -0.5
            else:
                if next_state[0] > 1.1:
                    if next_state[0] < state[0]:
                        reward = 1
                    else:
                        reward = -0.5
                else:
                    if next_state[0] > state[0]:
                        reward = 1
                    else:
                        reward = -0.5

            model[dir].memory.append((state, action, reward, next_state, done))
            update_model(model[dir])

        state = next_state


  logger.warn(


Episode: 0, steps: 29, scores: 0, dir: left, log: []
Episode: 1, steps: 75, scores: 0, dir: right, log: []
Episode: 2, steps: 52, scores: 0, dir: left, log: []
Episode: 3, steps: 24, scores: 0, dir: right, log: []
Episode: 4, steps: 15, scores: 0, dir: left, log: []
Episode: 5, steps: 37, scores: 0, dir: left, log: []
Episode: 6, steps: 39, scores: 0, dir: left, log: []
Episode: 7, steps: 11, scores: 0, dir: left, log: []
Episode: 8, steps: 15, scores: 0, dir: right, log: []
Episode: 9, steps: 51, scores: 0, dir: left, log: []
Episode: 10, steps: 25, scores: 0, dir: right, log: []
Episode: 11, steps: 29, scores: 0, dir: right, log: []
Episode: 12, steps: 52, scores: 0, dir: left, log: []
Episode: 13, steps: 18, scores: 0, dir: right, log: []
Episode: 14, steps: 14, scores: 0, dir: left, log: []
Episode: 15, steps: 34, scores: 0, dir: left, log: []
Episode: 16, steps: 20, scores: 0, dir: left, log: []
Episode: 17, steps: 16, scores: 0, dir: right, log: []
Episode: 18, steps: 29, scores:

KeyboardInterrupt: 

In [6]:
model['left'].save_weights('model_left2', save_format='tf')
model['right'].save_weights('model_right2', save_format='tf')