**Part 3**

PPO Loss

* Terrrible loss, very one sided predictions
* Needs Actor to provide rewards

In [3]:
import torch
import torch.nn as nn
import torch.optim as optim
import random

In [4]:
# Define the neural network
class LogicNet(nn.Module):
    def __init__(self):
        super(LogicNet, self).__init__()
        self.fc1 = nn.Linear(2, 4)  # Input layer -> Hidden Layer
        self.fc2 = nn.Linear(4, 1)  # Hidden Layer -> Output Layer

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        logits = self.fc2(x)
        return logits

    def get_action_and_or_log_prob(self, state, action=None):
        """Helper method to get action and its log_prob from logits"""
        logits = self.forward(state)    # Get the logits from a forward pass of the Policy Network
        # For a binary output (0 or 1), Bernoulli distribution is appropriate
        probs = torch.distributions.Bernoulli(logits=logits)

        if action is None:
            sampled_action = probs.sample() # Sample action based on current probabilities (returns 0 or 1)
            log_prob = probs.log_prob(sampled_action)   # Calculate the log of the probability the sampled action is chosen
            return sampled_action, log_prob
        else:
            log_prob = probs.log_prob(action)       # returns the log of the probability the action is chosen
            return log_prob


In [5]:
# Define the environment
class LogicGateEnv:
    def __init__(self, gate="AND"):
        self.gate = gate
        self.data = torch.tensor([[0, 0], [0, 1], [1, 0], [1, 1]], dtype=torch.float32)
        self.targets = self.get_targets(gate)

    def get_targets(self, gate):
        if gate == "AND":
            return torch.tensor([[0], [0], [0], [1]], dtype=torch.float32)
        elif gate == "OR":
            return torch.tensor([[0], [1], [1], [1]], dtype=torch.float32)
        elif gate == "XOR":
            return torch.tensor([[0], [1], [1], [0]], dtype=torch.float32)
        elif gate == "XNOR":
            return torch.tensor([[1], [0], [0], [1]], dtype=torch.float32)

    def step(self, input_idx, prediction):
        target = self.targets[input_idx]
        # Take the mean squared error
        # print(f"prediction: {prediction} || target: {target}")
        error = (prediction - target).pow(2).mean().item()
        reward = 1.0 - error
        return reward

In [6]:
# Training loop
def train_logic_gate(gate="XOR", epochs=10, learning_rate=0.001, batch_size=64, k_epochs=4, epsilon=0.2):
    print(f"Training {gate} gate with {epochs} epochs, {learning_rate} learning rate, and {batch_size} batch size.")
    env = LogicGateEnv(gate)

    Policy_New = LogicNet()

    optimizer = optim.Adam(Policy_New.parameters(), lr=learning_rate)

    num_correct = 0.0

    for epoch in range(epochs):

        #START OF ADVANTAGE CALCULATION
        #global stack
        rewards_batch = []
        inputs_batch = []
        targets_batch = []

        # --- 1. Collect a Batch of Experiences ---
        # Loop agent prediction, push reward value:
        for i in range(batch_size):
            # Get model inputs and target
            idx = random.randint(0, 3)
            inputs = env.data[idx]
            target = env.targets[idx]

            # Get model prediction
            # Get logits from current policy to make a prediction for reward calculation
            with torch.no_grad(): # No need to track gradients during data collection
                prediction_logits = Policy_New(inputs)
                pred = torch.round(torch.sigmoid(prediction_logits)).float() # (might need .item())

            # Calculate reward
            reward = env.step(idx, pred)

            # Append to lists
            inputs_batch.append(inputs)
            rewards_batch.append(reward)
            targets_batch.append(target)

        # Convert collected batch lists into PyTorch tensors
        inputs_batch_tensor = torch.stack(inputs_batch)
        targets_batch_tensor = torch.stack(targets_batch)
        rewards_batch_tensor = torch.tensor(rewards_batch, dtype=torch.float32)

        num_correct += (rewards_batch_tensor).sum().item()  ### need to change

        # Unsqueeze to ensure rewards_batch_t has the same shape as targets_batch_t for element-wise ops SHAPE:(1, batch_size)
        rewards_batch_t = rewards_batch_tensor.unsqueeze(1)

        # --- START OF ADVANTAGE CALCULATION ---
        # Calculate the mean of the rewards in the current batch
        mean_reward = rewards_batch_tensor.mean()

        # Calculate the standard deviation of the rewards in the current batch
        # Add a small epsilon (1e-8) to prevent division by zero in case all rewards are identical
        std_reward = rewards_batch_tensor.std() + 1e-8

        # Calculate the advantage for each time step in the batch using your specified formula
        advantages_of_batch = (rewards_batch_t - mean_reward) / (std_reward)

        # --- END OF ADVANTAGE CALCULATION ---

        # --- 2. Store "Old Policy" Parameters ---
        # Transfer the weights to the Old Policy Model
        Policy_Old = LogicNet()
        Policy_Old.load_state_dict(Policy_New.state_dict())
        Policy_Old.eval()       # Tells Pytorch not to calculate gradients for this network

        # Get log_probabilities for the collected 'targets' from the OLD policy
        # Detach these to prevent gradients from flowing back into old_net
        with torch.no_grad():
            old_logits = Policy_Old(inputs_batch_tensor)
            # Use the get_action_and_or_log_prob helper
            log_prob_old = Policy_Old.get_action_and_or_log_prob(inputs_batch_tensor, targets_batch_tensor).detach()
            # The .detach() is critical here to ensure old_net remains fixed.

        # --- 3. Inner Optimization Loop (K_epochs) --- GRPO iteration
        for _ in range(k_epochs):
            new_policy_logits = Policy_New(inputs_batch_tensor)
            log_prob_new = Policy_New.get_action_and_or_log_prob(inputs_batch_tensor, targets_batch_tensor)

            # print(f"log_prob_new: {log_prob_new}")
            # print(f"log_prob_old: {log_prob_old}")

            # Calculate the ratio of each Trajectory in the Group
            # r_t(0) = π_0(a_t|s_t) / π_0_old(a_t|s_t) = exp(log(π_0(a_t|s_t) - log(π_0_old(a_t|s_t)))
            ratio = torch.exp(log_prob_new - log_prob_old)

            surrogate_1 = ratio * advantages_of_batch
            surrogate_2 = torch.clamp(input=ratio, min = 1.0 - epsilon, max = 1.0 + epsilon) * advantages_of_batch

            loss = -torch.min(surrogate_1, surrogate_2).mean()
            # In GRPO, the objective function is typically designed to be maximized (e.g., maximizing the expected return). Since PyTorch optimizers are designed for minimization, the common practice is to minimize the negative of the objective function.

            # Update the New Policy Model
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        #epoch count
        if epoch % 1000 == 0:
            avg_reward = rewards_batch_tensor.mean().item()
            print(f"Epoch {epoch}: Loss = {loss.item()}, Avg Reward = {avg_reward:.4f}, Mean Advantage: {advantages_of_batch.mean().item()}")
            # Validate
            with torch.no_grad():
                for i in range(4):
                    logits = Policy_New(env.data[i])
                    pred = torch.round(torch.sigmoid(logits)).item()
                    print(f"Input: {env.data[i].tolist()}, Logits: {logits}, Prediction: {pred}, Actual: {env.targets[i].item()}")
    print("Training completed.\n")
    print(f"Number of correct predictions: {num_correct}/{epochs * batch_size}")
    print(f"Accuracy: {num_correct/(epochs * batch_size)}",)

    print("\nTesting model:")
    for i in range(4):
        logits = Policy_New(env.data[i])
        pred = torch.round(torch.sigmoid(logits)).item()
        print(f"Input: {env.data[i].tolist()}, Prediction: {pred}, Actual: {env.targets[i].item()}")


In [7]:

# Run training
train_logic_gate("AND")

Training AND gate with 10 epochs, 0.001 learning rate, and 64 batch size.
Epoch 0: Loss = -0.001682821661233902, Avg Reward = 0.6719, Mean Advantage: -3.725290298461914e-09
Input: [0.0, 0.0], Logits: tensor([-0.1018]), Prediction: 0.0, Actual: 0.0
Input: [0.0, 1.0], Logits: tensor([-0.0248]), Prediction: 0.0, Actual: 0.0
Input: [1.0, 0.0], Logits: tensor([-0.0962]), Prediction: 0.0, Actual: 0.0
Input: [1.0, 1.0], Logits: tensor([-0.0962]), Prediction: 0.0, Actual: 1.0
Training completed.

Number of correct predictions: 485.0/640
Accuracy: 0.7578125

Testing model:
Input: [0.0, 0.0], Prediction: 0.0, Actual: 0.0
Input: [0.0, 1.0], Prediction: 0.0, Actual: 0.0
Input: [1.0, 0.0], Prediction: 0.0, Actual: 0.0
Input: [1.0, 1.0], Prediction: 0.0, Actual: 1.0
