In [1]:
import gym
import numpy as np

# 환경 생성
env = gym.make("BipedalWalker-v3", render_mode="human")

# 환경 초기화
state = env.reset()[0]
done = False
total_reward = 0

while not done:
    # 랜덤 행동 선택 (초기 테스트)
    action = env.action_space.sample()
    state, reward, done, truncated, info = env.step(action)
    total_reward += reward

print(f"Total reward: {total_reward:.2f}")
env.close()


Total reward: -124.51


In [2]:
import gym
import collections
import random

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical

import matplotlib.pyplot as plt
import cv2
import numpy as np
import os

In [3]:
class PPO(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(PPO, self).__init__()
        self.data = []

        # 네트워크 구조
        self.fc1 = nn.Linear(state_dim, 256)
        self.fc_mu = nn.Linear(256, action_dim)
        self.fc_std = nn.Linear(256, action_dim)
        self.fc_v = nn.Linear(256, 1)

        # 옵티마이저
        self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)

    def pi(self, x):
        x = F.relu(self.fc1(x))
        mu = self.fc_mu(x)
        std = torch.exp(self.fc_std(x))  # 표준편차는 항상 양수
        return mu, std

    def v(self, x):
        x = F.relu(self.fc1(x))
        v = self.fc_v(x)
        return v

    def put_data(self, transition):
        self.data.append(transition)

    def make_batch(self):
        s_lst, a_lst, r_lst, s_prime_lst, prob_a_lst, done_lst = [], [], [], [], [], []
        for transition in self.data:
            s, a, r, s_prime, prob_a, done = transition
            s_lst.append(s)
            a_lst.append(a)
            r_lst.append([r])
            s_prime_lst.append(s_prime)
            prob_a_lst.append(prob_a)
            done_mask = 0 if done else 1
            done_lst.append([done_mask])

        s = torch.tensor(s_lst, dtype=torch.float)
        a = torch.tensor(a_lst, dtype=torch.float)
        r = torch.tensor(r_lst, dtype=torch.float)
        s_prime = torch.tensor(s_prime_lst, dtype=torch.float)
        prob_a = torch.tensor(prob_a_lst, dtype=torch.float)
        done_mask = torch.tensor(done_lst, dtype=torch.float)
        self.data = []
        return s, a, r, s_prime, done_mask, prob_a

    def train_net(self):
        s, a, r, s_prime, done_mask, prob_a = self.make_batch()

        for i in range(K_epoch):
            td_target = r + gamma * self.v(s_prime) * done_mask
            delta = td_target - self.v(s)
            delta = delta.detach().numpy()

            advantage_lst = []
            advantage = 0.0
            for delta_t in delta[::-1]:
                advantage = gamma * lmbda * advantage + delta_t[0]
                advantage_lst.append([advantage])
            advantage_lst.reverse()
            advantage = torch.tensor(advantage_lst, dtype=torch.float)

            mu, std = self.pi(s)
            dist = torch.distributions.Normal(mu, std)
            log_prob = dist.log_prob(a).sum(dim=1, keepdim=True)
            ratio = torch.exp(log_prob - prob_a)

            surr1 = ratio * advantage
            surr2 = torch.clamp(ratio, 1 - eps_clip, 1 + eps_clip) * advantage
            loss = -torch.min(surr1, surr2) + F.smooth_l1_loss(self.v(s), td_target.detach())

            self.optimizer.zero_grad()
            loss.mean().backward()
            self.optimizer.step()


In [4]:
#Hyperparameters
learning_rate = 0.0003
gamma         = 0.99
lmbda         = 0.95
eps_clip      = 0.2
K_epoch       = 3
T_horizon     = 2048

In [6]:
# 환경 생성 (렌더 모드: human)
env = gym.make("BipedalWalker-v3", render_mode="human")
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]

# 모델 초기화 및 가중치 로드
model = PPO(state_dim, action_dim)
model.load_state_dict(torch.load("./bipedal_pth/ppo_bipedal473.pth"))
model.eval()  # 학습 중이 아니므로 eval()로 전환

# 환경 초기화 및 실행
state = env.reset()[0]
done = False
truncated = False
total_reward = 0

while not done or truncated:
    with torch.no_grad():
        # 상태에서 행동 선택
        mu, std = model.pi(torch.from_numpy(state).float())
        dist = torch.distributions.Normal(mu, std)
        action = dist.sample().numpy()

    # 환경에서 행동 수행
    next_state, reward, done, truncated, _ = env.step(action)
    state = next_state
    total_reward += reward
    

print(f"Total Reward: {total_reward}")
env.close()


  model.load_state_dict(torch.load("./bipedal_pth/ppo_bipedal473.pth"))


Total Reward: 302.83394552813695


In [8]:
env = gym.make('BipedalWalker-v3', render_mode='rgb_array')

model = PPO(state_dim, action_dim)
model.load_state_dict(torch.load("./bipedal_pth/ppo_bipedal473.pth"))
model.eval()  # 학습 중이 아니므로 eval()로 전환

state = env.reset()[0]
done = False
truncated = False
total_reward = 0

output_folder = "./videos"  # 저장할 폴더 경로
os.makedirs(output_folder, exist_ok=True)  # 폴더가 없으면 생성
video_filename = os.path.join(output_folder, "ppo_bipedal.avi")  # 파일 경로 설정

frame_width, frame_height = env.render().shape[1], env.render().shape[0]
fourcc = cv2.VideoWriter_fourcc(*'XVID')  # 코덱 설정 (XVID, MP4V 등)
fps = 30  # 초당 프레임 수
video_writer = cv2.VideoWriter(video_filename, fourcc, fps, (frame_width, frame_height))

# 에피소드 실행

while not done or truncated:
    with torch.no_grad():
        # 상태에서 행동 선택
        mu, std = model.pi(torch.from_numpy(state).float())
        dist = torch.distributions.Normal(mu, std)
        action = dist.sample().numpy()

    # 환경에서 행동 수행
    next_state, reward, done, truncated, _ = env.step(action)
    state = next_state
    total_reward += reward
    frame = env.render()
    video_frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)  # OpenCV는 BGR 포맷 사용
    video_writer.write(video_frame)


# 리소스 정리
env.close()
video_writer.release()
print("Total reward achieved: {:.1f}".format(total_reward))
print(f"Video saved as {video_filename}")

  model.load_state_dict(torch.load("./bipedal_pth/ppo_bipedal473.pth"))


Total reward achieved: 302.5
Video saved as ./videos\ppo_bipedal.avi


## Actor Critic

In [9]:
class ActorCritic(nn.Module):
    def __init__(self, state_dim=24, action_dim=4, hidden_dim=256, learning_rate=0.0005):
        super(ActorCritic, self).__init__()
        self.data = []

        self.fc1 = nn.Linear(state_dim, hidden_dim)
        self.fc_pi = nn.Linear(hidden_dim, action_dim)
        self.fc_sigma = nn.Linear(hidden_dim, action_dim)
        self.fc_v = nn.Linear(hidden_dim, 1)
        self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)

    def pi(self, x):
        x = F.relu(self.fc1(x))
        mu = torch.tanh(self.fc_pi(x)) 
        sigma = F.softplus(self.fc_sigma(x)) + 0.1
        return mu, sigma

    def v(self, x):
        x = F.relu(self.fc1(x))
        v = self.fc_v(x)
        return v

    def put_data(self, transition):
        self.data.append(transition)

    def make_batch(self):
        s_lst, a_lst, r_lst, s_prime_lst, done_lst = [], [], [], [], []
        for transition in self.data:
            s, a, r, s_prime, done = transition
            s_lst.append(s)
            a_lst.append(a)
            r_lst.append([np.clip(r / 100.0, -1.0, 1.0)])
            s_prime_lst.append(s_prime)
            done_mask = 0.0 if done else 1.0
            done_lst.append([done_mask])

        s_batch = torch.tensor(s_lst, dtype=torch.float)
        a_batch = torch.tensor(a_lst, dtype=torch.float)
        r_batch = torch.tensor(r_lst, dtype=torch.float)
        s_prime_batch = torch.tensor(s_prime_lst, dtype=torch.float)
        done_batch = torch.tensor(done_lst, dtype=torch.float)
        self.data = []
        return s_batch, a_batch, r_batch, s_prime_batch, done_batch

    def train_net(self, gamma=0.99):
        s, a, r, s_prime, done = self.make_batch()
        td_target = r + gamma * self.v(s_prime) * done
        delta = td_target - self.v(s)

        mu, sigma = self.pi(s)
        dist = torch.distributions.Normal(mu, sigma)
        log_prob = dist.log_prob(a)
        entropy = dist.entropy().mean()

        loss_pi = -(log_prob * delta.detach()).mean()
        loss_v = F.smooth_l1_loss(self.v(s), td_target.detach())
        loss = loss_pi + loss_v - 0.02 * entropy

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

In [14]:
from torch.distributions import Normal
#Hyperparameters
learning_rate = 0.0001
gamma         = 0.98
n_rollout     = 64

In [19]:
env = gym.make('BipedalWalker-v3', render_mode='rgb_array')

model = ActorCritic()
model.load_state_dict(torch.load("./bipedal_pth/actor_bipedal_1498.pth"))  # 저장된 모델 불러오기

s, _ = env.reset()
done = False
total_reward = 0

output_folder = "./videos"  # 저장할 폴더 경로
os.makedirs(output_folder, exist_ok=True)  # 폴더가 없으면 생성
video_filename = os.path.join(output_folder, "actor_bipedal1.avi")  # 파일 경로 설정

frame_width, frame_height = env.render().shape[1], env.render().shape[0]
fourcc = cv2.VideoWriter_fourcc(*'XVID')  # 코덱 설정 (XVID, MP4V 등)
fps = 30  # 초당 프레임 수
video_writer = cv2.VideoWriter(video_filename, fourcc, fps, (frame_width, frame_height))

# 에피소드 실행
while not done:
    s_tensor = torch.tensor(s, dtype=torch.float)
    mu, sigma = model.pi(s_tensor)
    dist = Normal(mu, sigma)
    a = dist.sample().numpy()
    a = torch.clamp(torch.tensor(a), -1.0, 1.0).numpy()
    s, r, done, truncated, info = env.step(a)
    total_reward += r

    # 현재 프레임을 저장
    frame = env.render()
    video_frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)  # OpenCV는 BGR 포맷 사용
    video_writer.write(video_frame)

    if done or truncated:
        break

# 리소스 정리
env.close()
video_writer.release()
print("Total reward achieved: {:.1f}".format(total_reward))
print(f"Video saved as {video_filename}")

  model.load_state_dict(torch.load("./bipedal_pth/actor_bipedal_1498.pth"))  # 저장된 모델 불러오기


Total reward achieved: -177.0
Video saved as ./videos\actor_bipedal1.avi
