In [6]:
!pip install gym



In [7]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
import gym

class PolicyNetwork(tf.keras.Model):
    def __init__(self, state_size, action_size):
        super(PolicyNetwork, self).__init__()
        self.fc1 = Dense(24, activation='relu')
        self.fc2 = Dense(24, activation='relu')
        self.fc3 = Dense(action_size, activation='softmax')

    def call(self, state):
        x = self.fc1(state)
        x = self.fc2(x)
        return self.fc3(x)

# def select_action(model, state):
#     state = np.expand_dims(state, axis=0)
#     action_probs = model.predict(state).flatten()
#     action = np.random.choice(len(action_probs), p=action_probs)
#     return action
###########
def select_action(model, state):
    print("State shape:", state.shape)  # Print the shape of the state
    state = np.expand_dims(state, axis=0)
    print("Expanded state shape:", state.shape)  # Print the shape of the expanded state
    action_probs = model.predict(state).flatten()
    action = np.random.choice(len(action_probs), p=action_probs)
    return action

def compute_discounted_rewards(rewards, gamma=0.99):
    discounted_rewards = np.zeros_like(rewards, dtype=np.float32)
    cumulative_reward = 0
    for i in reversed(range(len(rewards))):
        cumulative_reward = cumulative_reward * gamma + rewards[i]
        discounted_rewards[i] = cumulative_reward
    return discounted_rewards

def train_policy_network(policy_network, states, actions, rewards, optimizer):
    with tf.GradientTape() as tape:
        action_mask = tf.one_hot(actions, depth=2)
        policy_predictions = policy_network(states)
        log_prob = tf.reduce_sum(action_mask * tf.math.log(policy_predictions), axis=1)
        loss = -tf.reduce_mean(log_prob * rewards)
    gradients = tape.gradient(loss, policy_network.trainable_variables)
    optimizer.apply_gradients(zip(gradients, policy_network.trainable_variables))
    return loss

def main():
    # Create the CartPole environment
    env = gym.make('CartPole-v1')

    # Define the policy network
    state_size = env.observation_space.shape[0]
    action_size = env.action_space.n
    policy_network = PolicyNetwork(state_size, action_size)

    # Set hyperparameters
    learning_rate = 0.01
    optimizer = Adam(learning_rate)

    # Training loop
    num_episodes = 1000
    for episode in range(1, num_episodes + 1):
        episode_states, episode_actions, episode_rewards = [], [], []
        state = env.reset()
        done = False
        while not done:
            action = select_action(policy_network, state)
            next_state, reward, done, _ = env.step(action)
            episode_states.append(state)
            episode_actions.append(action)
            episode_rewards.append(reward)
            state = next_state

        # Compute discounted rewards
        discounted_rewards = compute_discounted_rewards(episode_rewards)

        # Normalize rewards
        discounted_rewards -= np.mean(discounted_rewards)
        discounted_rewards /= np.std(discounted_rewards)

        # Train the policy network
        loss = train_policy_network(policy_network,
                                    np.array(episode_states),
                                    np.array(episode_actions),
                                    discounted_rewards,
                                    optimizer)

        # Print episode information
        print(f'Episode: {episode}, Loss: {loss}')

    # Close the environment
    env.close()

if __name__ == "__main__":
    main()


AttributeError: 'tuple' object has no attribute 'shape'

In [8]:
########################
import numpy as np

GRID_SIZE = (4, 4)
def reward_function(state):
    if state == (3, 2):
        return 5  # Pear
    elif state == (4, 4):
        return 10  # Apple
    else:
        return -1  # No fruit

# Define the transition probability matrix
def transition_probability(state, action):
    x, y = state
    if action == 'up':
        return (x, max(y - 1, 1))
    elif action == 'down':
        return (x, min(y + 1, GRID_SIZE[1]))
    elif action == 'left':
        return (max(x - 1, 1), y)
    elif action == 'right':
        return (min(x + 1, GRID_SIZE[0]), y)

# Define the policies
policy_1 = ['down', 'right', 'right']
policy_2 = ['right', 'right', 'right', 'down', 'down', 'down']

def compute_utility(policy):
    state = (1, 1)  
    total_reward = 0
    for action in policy:
        next_state = transition_probability(state, action)
        reward = reward_function(next_state)
        total_reward += reward
        state = next_state
        if reward > 0: 
            break
    return total_reward

utility_policy_1 = compute_utility(policy_1)
utility_policy_2 = compute_utility(policy_2)

print("Utility of policy 1:", utility_policy_1)
print("Utility of policy 2:", utility_policy_2)

# Choose the policy with maximum utility
if utility_policy_1 < utility_policy_2:
    chosen_policy = policy_2
    max_utility = utility_policy_2
else:
    chosen_policy = policy_1
    max_utility = utility_policy_1

print("Chosen policy:", chosen_policy)
print("Maximum utility:", max_utility)


Utility of policy 1: 3
Utility of policy 2: 5
Chosen policy: ['right', 'right', 'right', 'down', 'down', 'down']
Maximum utility: 5


In [9]:
import argparse
import gym
import numpy as np
from itertools import count
from collections import deque
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical



In [10]:

parser = argparse.ArgumentParser(description='PyTorch REINFORCE example')
parser.add_argument('--gamma', type=float, default=0.99, metavar='G',
                    help='discount factor (default: 0.99)')
parser.add_argument('--seed', type=int, default=543, metavar='N',
                    help='random seed (default: 543)')
parser.add_argument('--render', action='store_true',
                    help='render the environment')
parser.add_argument('--log-interval', type=int, default=10, metavar='N',
                    help='interval between training status logs (default: 10)')





_StoreAction(option_strings=['--log-interval'], dest='log_interval', nargs=None, const=None, default=10, type=<class 'int'>, choices=None, required=False, help='interval between training status logs (default: 10)', metavar='N')

In [11]:
args = parser.parse_args()
env = gym.make('CartPole-v1')
env.reset(seed=args.seed)
torch.manual_seed(args.seed)

class Policy(nn.Module):
    def __init__(self):
        super(Policy, self).__init__()
        self.affine1 = nn.Linear(4, 128)
        self.dropout = nn.Dropout(p=0.6)
        self.affine2 = nn.Linear(128, 2)

        self.saved_log_probs = []
        self.rewards = []

    def forward(self, x):
        x = self.affine1(x)
        x = self.dropout(x)
        x = F.relu(x)
        action_scores = self.affine2(x)
        return F.softmax(action_scores, dim=1)


policy = Policy()
optimizer = optim.Adam(policy.parameters(), lr=1e-2)
eps = np.finfo(np.float32).eps.item()


def select_action(state):
    state = torch.from_numpy(state).float().unsqueeze(0)
    probs = policy(state)
    m = Categorical(probs)
    action = m.sample()
    policy.saved_log_probs.append(m.log_prob(action))
    return action.item()


def finish_episode():
    R = 0
    policy_loss = []
    returns = deque()
    for r in policy.rewards[::-1]:
        R = r + args.gamma * R
        returns.appendleft(R)
    returns = torch.tensor(returns)
    returns = (returns - returns.mean()) / (returns.std() + eps)
    for log_prob, R in zip(policy.saved_log_probs, returns):
        policy_loss.append(-log_prob * R)
    optimizer.zero_grad()
    policy_loss = torch.cat(policy_loss).sum()
    policy_loss.backward()
    optimizer.step()
    del policy.rewards[:]
    del policy.saved_log_probs[:]


def main():
    running_reward = 10
    for i_episode in count(1):
        state, _ = env.reset()
        ep_reward = 0
        for t in range(1, 10000):  # Don't infinite loop while learning
            action = select_action(state)
            state, reward, done, _, _ = env.step(action)
            if args.render:
                env.render()
            policy.rewards.append(reward)
            ep_reward += reward
            if done:
                break

        running_reward = 0.05 * ep_reward + (1 - 0.05) * running_reward
        finish_episode()
        if i_episode % args.log_interval == 0:
            print('Episode {}\tLast reward: {:.2f}\tAverage reward: {:.2f}'.format(
                  i_episode, ep_reward, running_reward))
        if running_reward > env.spec.reward_threshold:
            print("Solved! Running reward is now {} and "
                  "the last episode runs to {} time steps!".format(running_reward, t))
            break


if __name__ == '__main__':
    main()

usage: ipykernel_launcher.py [-h] [--gamma G] [--seed N] [--render] [--log-interval N]
ipykernel_launcher.py: error: unrecognized arguments: -f C:\Users\shubh\AppData\Roaming\jupyter\runtime\kernel-4816bf84-619a-4703-8c57-5df5e79375f4.json


SystemExit: 2

In [5]:
import argparse
import gym
import numpy as np
from itertools import count
from collections import namedtuple

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical

# Cart Pole

parser = argparse.ArgumentParser(description='PyTorch actor-critic example')
parser.add_argument('--gamma', type=float, default=0.99, metavar='G',
                    help='discount factor (default: 0.99)')
parser.add_argument('--seed', type=int, default=543, metavar='N',
                    help='random seed (default: 543)')
parser.add_argument('--render', action='store_true',
                    help='render the environment')
parser.add_argument('--log-interval', type=int, default=10, metavar='N',
                    help='interval between training status logs (default: 10)')
args = parser.parse_args()


env = gym.make('CartPole-v1')
env.reset(seed=args.seed)
torch.manual_seed(args.seed)


SavedAction = namedtuple('SavedAction', ['log_prob', 'value'])


class Policy(nn.Module):
    """
    implements both actor and critic in one model
    """
    def __init__(self):
        super(Policy, self).__init__()
        self.affine1 = nn.Linear(4, 128)

        # actor's layer
        self.action_head = nn.Linear(128, 2)

        # critic's layer
        self.value_head = nn.Linear(128, 1)

        # action & reward buffer
        self.saved_actions = []
        self.rewards = []

    def forward(self, x):
        """
        forward of both actor and critic
        """
        x = F.relu(self.affine1(x))

        # actor: choses action to take from state s_t
        # by returning probability of each action
        action_prob = F.softmax(self.action_head(x), dim=-1)

        # critic: evaluates being in the state s_t
        state_values = self.value_head(x)

        # return values for both actor and critic as a tuple of 2 values:
        # 1. a list with the probability of each action over the action space
        # 2. the value from state s_t
        return action_prob, state_values


model = Policy()
optimizer = optim.Adam(model.parameters(), lr=3e-2)
eps = np.finfo(np.float32).eps.item()


def select_action(state):
    state = torch.from_numpy(state).float()
    probs, state_value = model(state)

    # create a categorical distribution over the list of probabilities of actions
    m = Categorical(probs)

    # and sample an action using the distribution
    action = m.sample()

    # save to action buffer
    model.saved_actions.append(SavedAction(m.log_prob(action), state_value))

    # the action to take (left or right)
    return action.item()


def finish_episode():
    """
    Training code. Calculates actor and critic loss and performs backprop.
    """
    R = 0
    saved_actions = model.saved_actions
    policy_losses = [] # list to save actor (policy) loss
    value_losses = [] # list to save critic (value) loss
    returns = [] # list to save the true values

    # calculate the true value using rewards returned from the environment
    for r in model.rewards[::-1]:
        # calculate the discounted value
        R = r + args.gamma * R
        returns.insert(0, R)

    returns = torch.tensor(returns)
    returns = (returns - returns.mean()) / (returns.std() + eps)

    for (log_prob, value), R in zip(saved_actions, returns):
        advantage = R - value.item()

        # calculate actor (policy) loss
        policy_losses.append(-log_prob * advantage)

        # calculate critic (value) loss using L1 smooth loss
        value_losses.append(F.smooth_l1_loss(value, torch.tensor([R])))

    # reset gradients
    optimizer.zero_grad()

    # sum up all the values of policy_losses and value_losses
    loss = torch.stack(policy_losses).sum() + torch.stack(value_losses).sum()

    # perform backprop
    loss.backward()
    optimizer.step()

    # reset rewards and action buffer
    del model.rewards[:]
    del model.saved_actions[:]


def main():
    running_reward = 10

    # run infinitely many episodes
    for i_episode in count(1):

        # reset environment and episode reward
        state, _ = env.reset()
        ep_reward = 0

        # for each episode, only run 9999 steps so that we don't
        # infinite loop while learning
        for t in range(1, 10000):

            # select action from policy
            action = select_action(state)

            # take the action
            state, reward, done, _, _ = env.step(action)

            if args.render:
                env.render()

            model.rewards.append(reward)
            ep_reward += reward
            if done:
                break

        # update cumulative reward
        running_reward = 0.05 * ep_reward + (1 - 0.05) * running_reward

        # perform backprop
        finish_episode()

        # log results
        if i_episode % args.log_interval == 0:
            print('Episode {}\tLast reward: {:.2f}\tAverage reward: {:.2f}'.format(
                  i_episode, ep_reward, running_reward))

        # check if we have "solved" the cart pole problem
        if running_reward > env.spec.reward_threshold:
            print("Solved! Running reward is now {} and "
                  "the last episode runs to {} time steps!".format(running_reward, t))
            break


if __name__ == '__main__':
    main()

usage: ipykernel_launcher.py [-h] [--gamma G] [--seed N] [--render] [--log-interval N]
ipykernel_launcher.py: error: unrecognized arguments: -f C:\Users\shubh\AppData\Roaming\jupyter\runtime\kernel-4816bf84-619a-4703-8c57-5df5e79375f4.json


SystemExit: 2