<a href="https://colab.research.google.com/github/imsunbow/PythonProject/blob/main/20231222_%EC%9D%B8%EA%B3%B5%EC%A7%80%EB%8A%A5%EA%B3%BC%EC%A0%9C6_2018271056%EC%9D%B4%EC%9E%AC%EB%AF%BC.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [32]:
import numpy as np
import time
import random
import math
from datetime import datetime
import tensorflow as tf

total_episodes = 100000
max_steps = 99
gamma = 0.9
alpha = 1.0
original_epsilon = 0.4
decay_rate = 0.000016
random.seed(datetime.now().timestamp())

max_row = 9
max_col = 9
max_num_actions = 4

env_state_space = [
    ['H', 'H', 'H', 'H', 'H', 'H', 'H', 'H', 'H'],
    ['H', 'S', 'F', 'F', 'F', 'F', 'F', 'F', 'H'],
    ['H', 'F', 'F', 'H', 'H', 'F', 'H', 'F', 'H'],
    ['H', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'H'],
    ['H', 'F', 'H', 'F', 'H', 'F', 'F', 'H', 'H'],
    ['H', 'F', 'F', 'F', 'F', 'G', 'F', 'F', 'H'],
    ['H', 'F', 'H', 'H', 'F', 'H', 'F', 'F', 'H'],
    ['H', 'F', 'F', 'F', 'F', 'F', 'F', 'F', 'H'],
    ['H', 'H', 'H', 'H', 'H', 'H', 'H', 'H', 'H']
]

Q = np.zeros((max_row, max_col, max_num_actions))
move_offset = [[-1, 0], [0, 1], [1, 0], [0, -1]]
move_str = ['up   ', 'right', 'down ', 'left ']

class ExperienceReplayBuffer:
    def __init__(self, size):
        self.buffer = []
        self.size = size

    def add(self, experience):
        self.buffer.append(experience)
        if len(self.buffer) > self.size:
            self.buffer.pop(0)

    def sample(self, batch_size):
        if len(self.buffer) < batch_size:
            return []

        return random.sample(self.buffer, batch_size)

def choose_action_with_epsilon_greedy(s, epsilon):
    r = s[0]
    c = s[1]
    q_a_list = Q[r, c, :]
    max_action = np.argmax(q_a_list)
    rn = random.random()

    if rn >= epsilon:
        action = max_action
    else:
        rn1 = random.random()
        if rn1 >= 0.75:
            action = 0
        elif rn1 >= 0.5:
            action = 1
        elif rn1 >= 0.25:
            action = 2
        else:
            action = 3
    return action

def choose_action_with_greedy(s):
    r = s[0]
    c = s[1]
    q_a_list = Q[r, c, :]
    max_action = np.argmax(q_a_list)
    return max_action

def get_new_state_and_reward(s, a):
    new_state = [s[0] + move_offset[a][0], s[1] + move_offset[a][1]]
    cell = env_state_space[new_state[0]][new_state[1]]

    if cell == 'F':
        rew = 0
    elif cell == 'H':
        rew = -9
    elif cell == 'G':
        rew = 9
    elif cell == 'S':
        rew = 0
    else:
        print("Logic error in get_new_state_and_reward. This cannot happen!")
        return [0, 0], -20000

    return new_state, rew

def display_Q_table(Q):
    print("\ncol=0       1        2        3       4         5")
    for r in range(max_row):
        print("row:", r)
        for a in range(max_num_actions):
            line = ",   ".join("{:5.2f}".format(Q[r, c, a]) for c in range(max_col))
            print(line)

def env_rendering(s):
    for i in range(0, max_row, 1):
        line = ""
        for j in range(0, max_col, 1):
            line = line + env_state_space[i][j]
        if s[0] == i:
            col = s[1]
            line1 = line[:col] + '*' + line[col + 1:]
        else:
            line1 = line
        print(line1)

def update_target_network(source_model, target_model):
    target_model.set_weights(source_model.get_weights())

# Neural Network Model
class QNetwork(tf.keras.Model):
    def __init__(self):
        super(QNetwork, self).__init__()
        self.flatten = tf.keras.layers.Flatten(input_shape=(max_row, max_col))
        self.dense1 = tf.keras.layers.Dense(128, activation='relu')
        self.dense2 = tf.keras.layers.Dense(64, activation='relu')
        self.output_layer = tf.keras.layers.Dense(max_num_actions, activation='linear')

    def call(self, state, training=False):
        x = self.flatten(state)
        x = self.dense1(x)
        x = self.dense2(x)
        return self.output_layer(x)

# Experience Replay Buffer
replay_buffer = ExperienceReplayBuffer(size=1000)

# Q Networks
q_network = QNetwork()
target_q_network = QNetwork()
update_target_network(q_network, target_q_network)

# Loss and Optimizer
mse_loss = tf.keras.losses.MeanSquaredError()
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)

# Training
print("Initial Q table is")
display_Q_table(Q)

start_state = [1, 1]

print("\nLearning starts.\n")
for episode in range(total_episodes):
    S = start_state
    epsilon = original_epsilon * math.exp(-decay_rate * episode)

    if episode % 5000 == 0:
        print('episode=', episode, '  epsilon=', epsilon)

    for step in range(max_steps):
        A = choose_action_with_epsilon_greedy(S, epsilon)
        S_, R = get_new_state_and_reward(S, A)

        r = S[0]
        c = S[1]

        Q[r][c][A] = Q[r][c][A] + alpha * (R + gamma * np.max(Q[S_[0]][S_[1]][:]) - Q[r][c][A])

        replay_buffer.add((S, A, R, S_))

        S = S_

        if env_state_space[S[0]][S[1]] == 'G' or env_state_space[S[0]][S[1]] == 'H':
            break


if episode % 100 == 0 and len(replay_buffer.buffer) >= 32:
    sampled_batch = replay_buffer.sample(32)

    if sampled_batch:  # Check if sampled_batch is not empty
        states, actions, rewards, next_states = zip(*sampled_batch)

        states = np.array(states)
        actions = np.array(actions)
        rewards = np.array(rewards)
        next_states = np.array(next_states)

        with tf.GradientTape() as tape:
            target_q_values = target_q_network(next_states, training=False).numpy()
            max_next_q_values = np.max(target_q_values, axis=1)
            target_q_values = rewards + gamma * max_next_q_values

            q_values = q_network(states, training=True)
            q_values = tf.reduce_sum(q_values * tf.one_hot(actions, max_num_actions), axis=1)

            loss = mse_loss(target_q_values, q_values)

        gradients = tape.gradient(loss, q_network.trainable_variables)
        optimizer.apply_gradients(zip(gradients, q_network.trainable_variables))

        update_target_network(q_network, target_q_network)

print('\nLearning is finished. The Q table is:')
display_Q_table(Q)

# Test stage
print("\nTest starts.\n")

for e in range(5):
    S = start_state
    total_rewards = 0
    print("\nEpisode:", e, " start state: (", S[0], ", ", S[1], ")")

    for step in range(max_steps):
        A = choose_action_with_greedy(S)
        S_, R = get_new_state_and_reward(S, A)

        total_rewards += R
        S = S_

        if env_state_space[S[0]][S[1]] == 'G' or env_state_space[S[0]][S[1]] == 'H':
            break

    print("Episode has ended. Total reward received in episode = ", total_rewards)
    time.sleep(1)

print("Program ends!!!")


Initial Q table is

col=0       1        2        3       4         5
row: 0
 0.00,    0.00,    0.00,    0.00,    0.00,    0.00,    0.00,    0.00,    0.00
 0.00,    0.00,    0.00,    0.00,    0.00,    0.00,    0.00,    0.00,    0.00
 0.00,    0.00,    0.00,    0.00,    0.00,    0.00,    0.00,    0.00,    0.00
 0.00,    0.00,    0.00,    0.00,    0.00,    0.00,    0.00,    0.00,    0.00
row: 1
 0.00,    0.00,    0.00,    0.00,    0.00,    0.00,    0.00,    0.00,    0.00
 0.00,    0.00,    0.00,    0.00,    0.00,    0.00,    0.00,    0.00,    0.00
 0.00,    0.00,    0.00,    0.00,    0.00,    0.00,    0.00,    0.00,    0.00
 0.00,    0.00,    0.00,    0.00,    0.00,    0.00,    0.00,    0.00,    0.00
row: 2
 0.00,    0.00,    0.00,    0.00,    0.00,    0.00,    0.00,    0.00,    0.00
 0.00,    0.00,    0.00,    0.00,    0.00,    0.00,    0.00,    0.00,    0.00
 0.00,    0.00,    0.00,    0.00,    0.00,    0.00,    0.00,    0.00,    0.00
 0.00,    0.00,    0.00,    0.00,    0.00,    0.00,