In [None]:
!pip install numpy==1.23.5

In [None]:
!pip install gym==0.26.2


In [1]:
import gym
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
from IPython.display import display, clear_output
import time

# ActorCritic

In [2]:


class ActorCritic(nn.Module):
    def __init__(self, input_dim, action_dim):
        super(ActorCritic, self).__init__()
        self.shared = nn.Sequential(
            nn.Linear(input_dim, 64),
            nn.ReLU(),
            nn.Linear(64, 64),
            nn.ReLU()
        )
        self.policy_head = nn.Linear(64, action_dim)
        self.value_head = nn.Linear(64, 1)

    def forward(self, state):
        x = self.shared(state)
        logits = self.policy_head(x)
        value = self.value_head(x)
        probs = F.softmax(logits, dim=-1)
        return probs, value.squeeze(-1)

    def act(self, state):
        probs, value = self.forward(state)
        dist = torch.distributions.Categorical(probs)
        action = dist.sample()
        return action.item(), dist.log_prob(action), value

# PPOAgent

In [3]:


class PPOAgent:
    def __init__(self, state_dim, action_dim, lr=3e-4, gamma=0.99, lam=0.95, clip_eps=0.2, epochs=10, batch_size=64):
        self.model = ActorCritic(state_dim, action_dim)
        self.optimizer = optim.Adam(self.model.parameters(), lr=lr)
        self.buffer = RolloutBuffer()
        self.gamma = gamma
        self.lam = lam
        self.clip_eps = clip_eps
        self.epochs = epochs
        self.batch_size = batch_size

    def select_action(self, state):
        state = torch.FloatTensor(state).unsqueeze(0)
        action, log_prob, value = self.model.act(state)
        return action, log_prob.item(), value.item()

    def store_transition(self, *args):
        self.buffer.add(*args)

    def update(self):
        self.buffer.compute_returns_and_advantages(self.gamma, self.lam)

        for _ in range(self.epochs):
            for states, actions, old_log_probs, returns, advantages in self.buffer.get_batches(self.batch_size):
                probs, values = self.model(states)
                dist = torch.distributions.Categorical(probs)
                entropy = dist.entropy().mean()
                new_log_probs = dist.log_prob(actions)

                ratio = torch.exp(new_log_probs - old_log_probs)
                surr1 = ratio * advantages
                surr2 = torch.clamp(ratio, 1 - self.clip_eps, 1 + self.clip_eps) * advantages
                policy_loss = -torch.min(surr1, surr2).mean()

                value_loss = F.mse_loss(values, returns)

                loss = policy_loss + 0.5 * value_loss - 0.01 * entropy

                self.optimizer.zero_grad()
                loss.backward()
                self.optimizer.step()

        self.buffer.clear()





# RolloutBuffer

In [4]:


class RolloutBuffer:
    def __init__(self):
        self.states = []
        self.actions = []
        self.log_probs = []
        self.rewards = []
        self.dones = []
        self.values = []

    def add(self, state, action, log_prob, reward, done, value):
        self.states.append(state)
        self.actions.append(action)
        self.log_probs.append(log_prob)
        self.rewards.append(reward)
        self.dones.append(done)
        self.values.append(value)

    def compute_returns_and_advantages(self, gamma=0.99, lam=0.95):
        returns = []
        advantages = []
        gae = 0
        next_value = 0

        for i in reversed(range(len(self.rewards))):
            delta = self.rewards[i] + gamma * next_value * (1 - self.dones[i]) - self.values[i]
            gae = delta + gamma * lam * (1 - self.dones[i]) * gae
            next_value = self.values[i]
            returns.insert(0, gae + self.values[i])
            advantages.insert(0, gae)

        self.returns = returns
        self.advantages = advantages

    def get_batches(self, batch_size):
        total = len(self.states)
        indices = np.arange(total)
        np.random.shuffle(indices)

        for start in range(0, total, batch_size):
            end = start + batch_size
            batch_indices = indices[start:end]
            yield (
                torch.FloatTensor(np.array(self.states)[batch_indices]),
                torch.LongTensor(np.array(self.actions)[batch_indices]),
                torch.FloatTensor(np.array(self.log_probs)[batch_indices]),
                torch.FloatTensor(np.array(self.returns)[batch_indices]),
                torch.FloatTensor(np.array(self.advantages)[batch_indices])
            )

    def clear(self):
        self.__init__()



#train

In [5]:
import numpy as np
# --- اموزش ---
env = gym.make("CartPole-v1", render_mode="rgb_array")
obs, _ = env.reset()
state_dim = obs.shape[0]
action_dim = env.action_space.n

agent = PPOAgent(state_dim, action_dim)


max_episodes = 1000
max_steps = 500
reward_history = []

for episode in range(max_episodes):
    state = env.reset()
    if isinstance(state, tuple):  # سازگاری با نسخه‌های مختلف gym
        state = state[0]
    total_reward = 0

    for _ in range(max_steps):
        action, log_prob, value = agent.select_action(state)
        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        total_reward += reward
        agent.store_transition(state, action, log_prob, reward, done, value)
        state = next_state
        if done:
            break

    agent.update()
    reward_history.append(total_reward)

    # میانگین متحرک 100 اپیزود اخیر
    if len(reward_history) >= 100:
        moving_avg = np.mean(reward_history[-100:])
    else:
        moving_avg = np.mean(reward_history)

    print(f"Episode {episode+1}, Reward: {total_reward:.1f}, Moving Avg (100): {moving_avg:.2f}")

    # چک کردن موفقیت
    if moving_avg >= 475.0:
        print(f"[+] Environment solved in {episode+1} episodes!")
        break


Episode 1, Reward: 23.0, Moving Avg (100): 23.00
Episode 2, Reward: 24.0, Moving Avg (100): 23.50
Episode 3, Reward: 22.0, Moving Avg (100): 23.00
Episode 4, Reward: 18.0, Moving Avg (100): 21.75
Episode 5, Reward: 12.0, Moving Avg (100): 19.80
Episode 6, Reward: 21.0, Moving Avg (100): 20.00
Episode 7, Reward: 9.0, Moving Avg (100): 18.43
Episode 8, Reward: 21.0, Moving Avg (100): 18.75
Episode 9, Reward: 21.0, Moving Avg (100): 19.00
Episode 10, Reward: 33.0, Moving Avg (100): 20.40
Episode 11, Reward: 44.0, Moving Avg (100): 22.55
Episode 12, Reward: 38.0, Moving Avg (100): 23.83
Episode 13, Reward: 19.0, Moving Avg (100): 23.46
Episode 14, Reward: 28.0, Moving Avg (100): 23.79
Episode 15, Reward: 27.0, Moving Avg (100): 24.00
Episode 16, Reward: 26.0, Moving Avg (100): 24.12
Episode 17, Reward: 42.0, Moving Avg (100): 25.18
Episode 18, Reward: 20.0, Moving Avg (100): 24.89
Episode 19, Reward: 59.0, Moving Avg (100): 26.68
Episode 20, Reward: 46.0, Moving Avg (100): 27.65
Episode 21

#render

In [6]:
import imageio
from IPython.display import HTML
from base64 import b64encode

env = gym.make("CartPole-v1", render_mode="rgb_array")
obs, _ = env.reset()
done = False

frames = []

while not done:
    action, _, _ = agent.select_action(obs)
    obs, _, terminated, truncated, _ = env.step(action)
    done = terminated or truncated
    frame = env.render()
    frames.append(frame)

env.close()

# ذخیره ویدیو
video_path = "/content/cartpole_render.mp4"
imageio.mimsave(video_path, frames, fps=30)

# نمایش ویدیو در Colab
mp4 = open(video_path, 'rb').read()
data_url = "data:video/mp4;base64," + b64encode(mp4).decode()
HTML(f"""<video width=400 controls><source src="{data_url}" type="video/mp4"></video>""")


