In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import csv
import os

class Grid:
    def __init__(self, size=5, start_pos=(0, 0), exit_pos=(4, 4)):
        self.size = size
        self.exit_pos = exit_pos
        self.figure_pos = start_pos

    def move(self, direction):
        x, y = self.figure_pos
        if direction == 0 and y > 0:  # up
            self.figure_pos = (x, y-1)
        elif direction == 1 and y < self.size-1:  # down
            self.figure_pos = (x, y+1)
        elif direction == 2 and x > 0:  # left
            self.figure_pos = (x-1, y)
        elif direction == 3 and x < self.size-1:  # right
            self.figure_pos = (x+1, y)

    def is_at_exit(self):
        return self.figure_pos == self.exit_pos

    def get_state(self, device='cpu'):
        return torch.FloatTensor(self.figure_pos).unsqueeze(0).to(device)

class PolicyNet(nn.Module):
    def __init__(self):
        super(PolicyNet, self).__init__()
        self.fc1 = nn.Linear(2, 16)
        self.fc2 = nn.Linear(16, 4)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        x = F.softmax(x, dim=1)
        return x

def generate_episode(grid, policy_net, device="cpu", max_episode_len=100):
    episode = []
    state = grid.get_state(device)
    ep_length = 0

    while not grid.is_at_exit() and ep_length < max_episode_len:
        ep_length += 1
        action_probs = policy_net(state).squeeze()
        log_probs = torch.log(action_probs)
        cpu_action_probs = action_probs.detach().cpu().numpy()
        action = np.random.choice(np.arange(4), p=cpu_action_probs)

        grid.move(action)
        next_state = grid.get_state(device)
        reward = -0.1 if not grid.is_at_exit() else 1.0  # 도착하면 +1 보상

        # 각 행동에 대한 로그 확률을 저장
        episode.append((state.tolist(), action, reward, log_probs.tolist()))

        if reward == 1.0:  # 목표에 도달했을 때 에피소드 종료
            break

        state = next_state

    return episode

def compute_discounted_rewards(rewards, gamma=0.99):
    discounted_rewards = []
    R = 0
    for reward in reversed(rewards):
        R = reward + gamma * R
        discounted_rewards.insert(0, R)
    return discounted_rewards

# 환경과 네트워크 초기화
grid = Grid(size=5, start_pos=(0, 0), exit_pos=(4, 4))
policy_net = PolicyNet()


model_path = 'policy_net.pth'

if os.path.exists(model_path):
    # 모델이 이미 존재하면 로드
    policy_net.load_state_dict(torch.load(model_path))
    print("학습된 모델을 로드했습니다.")

# 에피소드 생성
episode = generate_episode(grid, policy_net)

# 에피소드에서 보상만 추출
rewards = [step[2] for step in episode]

# 할인된 누적 보상 계산
discounted_rewards = compute_discounted_rewards(rewards)

# 생성된 에피소드 데이터를 파일에 저장 (할인된 보상 추가)
with open('episode_data_with_discounted_rewards.csv', mode='w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow(['State', 'Action', 'Reward', 'Log Probs', 'Discounted Reward'])
    for i, step in enumerate(episode):
        state, action, reward, log_probs = step
        writer.writerow([state, action, reward, log_probs, discounted_rewards[i]])

# 저장된 데이터를 확인
print("Episode data with discounted rewards saved to 'episode_data_with_discounted_rewards.csv'")


Episode data with discounted rewards saved to 'episode_data_with_discounted_rewards.csv'


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import csv

def load_episode_data(file_path):
    states = []
    actions = []
    rewards = []
    log_probs = []
    discounted_rewards = []

    with open(file_path, mode='r') as file:
        reader = csv.DictReader(file)
        for row in reader:
            state = [float(i) for i in row['State'].strip('][').split(', ')]
            action = int(row['Action'])
            reward = float(row['Reward'])
            log_prob = [float(i) for i in row['Log Probs'].strip('][').split(', ')]
            discounted_reward = float(row['Discounted Reward'])

            states.append(state)
            actions.append(action)
            rewards.append(reward)
            log_probs.append(log_prob)
            discounted_rewards.append(discounted_reward)

    return states, actions, rewards, log_probs, discounted_rewards

def train_policy_net(policy_net, optimizer, states, actions, log_probs, discounted_rewards, device='cpu'):
    policy_net.train()
    optimizer.zero_grad()

    states = torch.tensor(states, dtype=torch.float32).to(device)
    actions = torch.tensor(actions, dtype=torch.int64).to(device)
    discounted_rewards = torch.tensor(discounted_rewards, dtype=torch.float32).to(device)

    action_probs = policy_net(states)
    log_action_probs = torch.log(action_probs)

    selected_log_probs = log_action_probs.gather(1, actions.unsqueeze(1)).squeeze()
    loss = -selected_log_probs * discounted_rewards
    loss = loss.mean()

    loss.backward()
    optimizer.step()

# 환경과 네트워크 초기화
grid = Grid(size=5, start_pos=(0, 0), exit_pos=(4, 4))
policy_net = PolicyNet()

model_path = 'policy_net.pth'

if os.path.exists(model_path):
    # 모델이 이미 존재하면 로드
    policy_net.load_state_dict(torch.load(model_path))
    print("학습된 모델을 로드했습니다.")


optimizer = optim.Adam(policy_net.parameters(), lr=0.01)

# 에피소드 데이터 로드
file_path = 'episode_data_with_discounted_rewards.csv'
states, actions, rewards, log_probs, discounted_rewards = load_episode_data(file_path)

# PolicyNet 학습
train_policy_net(policy_net, optimizer, states, actions, log_probs, discounted_rewards)

# 학습된 모델 저장
torch.save(policy_net.state_dict(), 'policy_net.pth')

print("PolicyNet 학습 및 저장 완료")


PolicyNet 학습 및 저장 완료


In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt
import imageio



def visualize_episode(grid, policy_net, device='cpu', max_episode_len=100):
    frames = []  # Store each frame here to make a video
    episode_info = []  # To store episode information

    policy_net.eval()
    with torch.no_grad():
        for step in range(max_episode_len):
            plt.figure(figsize=(5, 5))
            plt.xticks([])
            plt.yticks([])
            plt.imshow(np.zeros((grid.size, grid.size)), cmap='gray', vmin=0, vmax=1)
            plt.text(grid.exit_pos[0], grid.exit_pos[1], 'Exit', ha='center', va='center', color='green', fontsize=12)
            plt.text(grid.figure_pos[0], grid.figure_pos[1], 'Agent', ha='center', va='center', color='blue', fontsize=12)
            plt.grid(True)
            plt.title(f"Step: {step + 1}")
            plt.savefig('frame.png')
            plt.close()
            frames.append(imageio.imread('frame.png'))

            state = grid.get_state(device)
            action_probs = policy_net(state).squeeze()
            action = np.random.choice(np.arange(4), p=action_probs.detach().cpu().numpy())
            episode_info.append((state.cpu().numpy().tolist(), action))  # Store state and action

            grid.move(action)
            if grid.is_at_exit():
                break

    # Output episode information
    for info in episode_info:
        print(f"State: {info[0]}, Action: {action_probs}")

    # Save the visualized episode
    imageio.mimsave('game_progress.gif', frames, fps=1)
    imageio.mimsave('game_progress.mp4', frames, fps=1)

# 학습된 정책 네트워크 로드
# 정책 네트워크를 학습하고 저장한 후, 여기서 로드해야 합니다.
policy_net = PolicyNet()
policy_net.load_state_dict(torch.load('policy_net.pth'))  # 학습된 모델 파일 로드

# 환경 초기화 및 에피소드 시각화
grid = Grid(size=5, start_pos=(0, 0), exit_pos=(4, 4))
visualize_episode(grid, policy_net)

  frames.append(imageio.imread('frame.png'))


State: [[0.0, 0.0]], Action: tensor([0.1160, 0.2961, 0.0172, 0.5708])
State: [[1.0, 0.0]], Action: tensor([0.1160, 0.2961, 0.0172, 0.5708])
State: [[2.0, 0.0]], Action: tensor([0.1160, 0.2961, 0.0172, 0.5708])
State: [[3.0, 0.0]], Action: tensor([0.1160, 0.2961, 0.0172, 0.5708])
State: [[4.0, 0.0]], Action: tensor([0.1160, 0.2961, 0.0172, 0.5708])
State: [[4.0, 0.0]], Action: tensor([0.1160, 0.2961, 0.0172, 0.5708])
State: [[4.0, 1.0]], Action: tensor([0.1160, 0.2961, 0.0172, 0.5708])
State: [[4.0, 1.0]], Action: tensor([0.1160, 0.2961, 0.0172, 0.5708])
State: [[4.0, 1.0]], Action: tensor([0.1160, 0.2961, 0.0172, 0.5708])
State: [[4.0, 2.0]], Action: tensor([0.1160, 0.2961, 0.0172, 0.5708])
State: [[4.0, 3.0]], Action: tensor([0.1160, 0.2961, 0.0172, 0.5708])




In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import csv
import os

class Grid:
    def __init__(self, size=5, start_pos=(0, 0), exit_pos=(4, 4)):
        self.size = size
        self.exit_pos = exit_pos
        self.start_pos = start_pos
        self.figure_pos = start_pos

    def reset(self):
        self.figure_pos = self.start_pos

    def move(self, direction):
        x, y = self.figure_pos
        if direction == 0 and y > 0:  # up
            self.figure_pos = (x, y-1)
        elif direction == 1 and y < self.size-1:  # down
            self.figure_pos = (x, y+1)
        elif direction == 2 and x > 0:  # left
            self.figure_pos = (x-1, y)
        elif direction == 3 and x < self.size-1:  # right
            self.figure_pos = (x+1, y)

    def is_at_exit(self):
        return self.figure_pos == self.exit_pos

    def get_state(self, device='cpu'):
        return torch.FloatTensor(self.figure_pos).unsqueeze(0).to(device)

class PolicyNet(nn.Module):
    def __init__(self):
        super(PolicyNet, self).__init__()
        self.fc1 = nn.Linear(2, 16)
        self.fc2 = nn.Linear(16, 4)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        x = F.softmax(x, dim=1)
        return x

def generate_episode(grid, policy_net, device="cpu", max_episode_len=100):
    episode = []
    state = grid.get_state(device)
    ep_length = 0

    while not grid.is_at_exit() and ep_length < max_episode_len:
        ep_length += 1
        action_probs = policy_net(state).squeeze()
        log_probs = torch.log(action_probs)
        cpu_action_probs = action_probs.detach().cpu().numpy()
        action = np.random.choice(np.arange(4), p=cpu_action_probs)

        grid.move(action)
        next_state = grid.get_state(device)
        reward = -0.1 if not grid.is_at_exit() else 1.0  # 도착하면 +1 보상

        # 각 행동에 대한 로그 확률을 저장
        episode.append((state.tolist(), action, reward, log_probs.tolist()))

        if reward == 1.0:  # 목표에 도달했을 때 에피소드 종료
            break

        state = next_state

    return episode

def compute_discounted_rewards(rewards, gamma=0.99):
    discounted_rewards = []
    R = 0
    for reward in reversed(rewards):
        R = reward + gamma * R
        discounted_rewards.insert(0, R)
    return discounted_rewards

def load_episode_data(file_path):
    states = []
    actions = []
    rewards = []
    log_probs = []
    discounted_rewards = []

    with open(file_path, mode='r') as file:
        reader = csv.DictReader(file)
        for row in reader:
            state = [float(i) for i in row['State'].strip('][').split(', ')]
            action = int(row['Action'])
            reward = float(row['Reward'])
            log_prob = [float(i) for i in row['Log Probs'].strip('][').split(', ')]
            discounted_reward = float(row['Discounted Reward'])

            states.append(state)
            actions.append(action)
            rewards.append(reward)
            log_probs.append(log_prob)
            discounted_rewards.append(discounted_reward)

    return states, actions, rewards, log_probs, discounted_rewards

def train_policy_net(policy_net, optimizer, states, actions, log_probs, discounted_rewards, device='cpu'):
    policy_net.train()
    optimizer.zero_grad()

    states = torch.tensor(states, dtype=torch.float32).to(device)
    actions = torch.tensor(actions, dtype=torch.int64).to(device)
    discounted_rewards = torch.tensor(discounted_rewards, dtype=torch.float32).to(device)

    action_probs = policy_net(states)
    log_action_probs = torch.log(action_probs)

    selected_log_probs = log_action_probs.gather(1, actions.unsqueeze(1)).squeeze()
    loss = -selected_log_probs * discounted_rewards
    loss = loss.mean()

    loss.backward()
    optimizer.step()

def main(num_iterations=100):
    # 환경과 네트워크 초기화
    grid = Grid(size=10, start_pos=(0, 0), exit_pos=(6, 5))
    policy_net = PolicyNet()
    optimizer = optim.Adam(policy_net.parameters(), lr=0.01)
    file_path = 'episode_data_with_discounted_rewards.csv'
    model_path = 'policy_net.pth'

    if os.path.exists(model_path):
        # 모델이 이미 존재하면 로드
        policy_net.load_state_dict(torch.load(model_path))
        print("학습된 모델을 로드했습니다.")

    for i in range(num_iterations):
        print(f"Iteration {i+1}/{num_iterations}")

        # 그리드 초기화
        grid.reset()

        # 에피소드 생성
        episode = generate_episode(grid, policy_net)

        # 에피소드에서 보상만 추출
        rewards = [step[2] for step in episode]

        # 할인된 누적 보상 계산
        discounted_rewards = compute_discounted_rewards(rewards)

        # 생성된 에피소드 데이터를 파일에 저장 (할인된 보상 추가)
        with open(file_path, mode='w', newline='') as file:
            writer = csv.writer(file)
            writer.writerow(['State', 'Action', 'Reward', 'Log Probs', 'Discounted Reward'])
            for i, step in enumerate(episode):
                state, action, reward, log_probs = step
                writer.writerow([state, action, reward, log_probs, discounted_rewards[i]])

        # 에피소드 데이터 로드
        states, actions, rewards, log_probs, discounted_rewards = load_episode_data(file_path)

        # PolicyNet 학습
        train_policy_net(policy_net, optimizer, states, actions, log_probs, discounted_rewards)

        # 학습된 모델 저장
        torch.save(policy_net.state_dict(), model_path)

        print("PolicyNet 학습 및 저장 완료")

if __name__ == "__main__":
    main(num_iterations=500)  # 원하는 반복 횟수 설정


Iteration 1/500
PolicyNet 학습 및 저장 완료
Iteration 2/500
PolicyNet 학습 및 저장 완료
Iteration 3/500
PolicyNet 학습 및 저장 완료
Iteration 4/500
PolicyNet 학습 및 저장 완료
Iteration 5/500
PolicyNet 학습 및 저장 완료
Iteration 6/500
PolicyNet 학습 및 저장 완료
Iteration 7/500
PolicyNet 학습 및 저장 완료
Iteration 8/500
PolicyNet 학습 및 저장 완료
Iteration 9/500
PolicyNet 학습 및 저장 완료
Iteration 10/500
PolicyNet 학습 및 저장 완료
Iteration 11/500
PolicyNet 학습 및 저장 완료
Iteration 12/500
PolicyNet 학습 및 저장 완료
Iteration 13/500
PolicyNet 학습 및 저장 완료
Iteration 14/500
PolicyNet 학습 및 저장 완료
Iteration 15/500
PolicyNet 학습 및 저장 완료
Iteration 16/500
PolicyNet 학습 및 저장 완료
Iteration 17/500
PolicyNet 학습 및 저장 완료
Iteration 18/500
PolicyNet 학습 및 저장 완료
Iteration 19/500
PolicyNet 학습 및 저장 완료
Iteration 20/500
PolicyNet 학습 및 저장 완료
Iteration 21/500
PolicyNet 학습 및 저장 완료
Iteration 22/500
PolicyNet 학습 및 저장 완료
Iteration 23/500
PolicyNet 학습 및 저장 완료
Iteration 24/500
PolicyNet 학습 및 저장 완료
Iteration 25/500
PolicyNet 학습 및 저장 완료
Iteration 26/500
PolicyNet 학습 및 저장 완료
Iteration 27/500
Poli

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt
import imageio



def visualize_episode(grid, policy_net, device='cpu', max_episode_len=100):
    frames = []  # Store each frame here to make a video
    episode_info = []  # To store episode information

    policy_net.eval()
    with torch.no_grad():
        for step in range(max_episode_len):
            plt.figure(figsize=(5, 5))
            plt.xticks([])
            plt.yticks([])
            plt.imshow(np.zeros((grid.size, grid.size)), cmap='gray', vmin=0, vmax=1)
            plt.text(grid.exit_pos[0], grid.exit_pos[1], 'Exit', ha='center', va='center', color='green', fontsize=12)
            plt.text(grid.figure_pos[0], grid.figure_pos[1], 'Agent', ha='center', va='center', color='blue', fontsize=12)
            plt.grid(True)
            plt.title(f"Step: {step + 1}")
            plt.savefig('frame.png')
            plt.close()
            frames.append(imageio.imread('frame.png'))

            state = grid.get_state(device)
            action_probs = policy_net(state).squeeze()
            action = np.random.choice(np.arange(4), p=action_probs.detach().cpu().numpy())
            episode_info.append((state.cpu().numpy().tolist(), action))  # Store state and action

            grid.move(action)
            if grid.is_at_exit():
                break

    # Output episode information
    for info in episode_info:
        print(f"State: {info[0]}, Action: {action_probs}")

    # Save the visualized episode
    imageio.mimsave('game_progress.gif', frames, fps=1)
    imageio.mimsave('game_progress.mp4', frames, fps=1)

# 학습된 정책 네트워크 로드
# 정책 네트워크를 학습하고 저장한 후, 여기서 로드해야 합니다.
policy_net = PolicyNet()
policy_net.load_state_dict(torch.load('policy_net.pth'))  # 학습된 모델 파일 로드

# 환경 초기화 및 에피소드 시각화
grid = Grid(size=10, start_pos=(0, 0), exit_pos=(6, 5))
visualize_episode(grid, policy_net)

  frames.append(imageio.imread('frame.png'))


State: [[0.0, 0.0]], Action: tensor([5.9431e-05, 1.1891e-02, 2.5704e-04, 9.8779e-01])
State: [[0.0, 0.0]], Action: tensor([5.9431e-05, 1.1891e-02, 2.5704e-04, 9.8779e-01])
State: [[1.0, 0.0]], Action: tensor([5.9431e-05, 1.1891e-02, 2.5704e-04, 9.8779e-01])
State: [[1.0, 1.0]], Action: tensor([5.9431e-05, 1.1891e-02, 2.5704e-04, 9.8779e-01])
State: [[1.0, 0.0]], Action: tensor([5.9431e-05, 1.1891e-02, 2.5704e-04, 9.8779e-01])
State: [[2.0, 0.0]], Action: tensor([5.9431e-05, 1.1891e-02, 2.5704e-04, 9.8779e-01])
State: [[2.0, 1.0]], Action: tensor([5.9431e-05, 1.1891e-02, 2.5704e-04, 9.8779e-01])
State: [[2.0, 0.0]], Action: tensor([5.9431e-05, 1.1891e-02, 2.5704e-04, 9.8779e-01])
State: [[2.0, 0.0]], Action: tensor([5.9431e-05, 1.1891e-02, 2.5704e-04, 9.8779e-01])
State: [[2.0, 1.0]], Action: tensor([5.9431e-05, 1.1891e-02, 2.5704e-04, 9.8779e-01])
State: [[3.0, 1.0]], Action: tensor([5.9431e-05, 1.1891e-02, 2.5704e-04, 9.8779e-01])
State: [[4.0, 1.0]], Action: tensor([5.9431e-05, 1.189

