In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np


# Define the LSTM-based architecture for the Advantage Actor-Critic (A2C) agent
class A2CNetwork(nn.Module):
    def __init__(
        self, input_size, hidden_size_policy=48, hidden_size_value=48, num_actions=4
    ):
        super(A2CNetwork, self).__init__()

        # LSTM for both Policy and Value functions
        self.lstm_policy = nn.LSTM(input_size, hidden_size_policy, batch_first=True)
        self.lstm_value = nn.LSTM(input_size, hidden_size_value, batch_first=True)

        # Fully connected layers for Policy (Actor)
        self.fc_policy = nn.Linear(hidden_size_policy, num_actions)

        # Fully connected layers for Value (Critic)
        self.fc_value = nn.Linear(hidden_size_value, 1)

        # Entropy regularization term
        self.entropy_weight = 0.01

    def forward(
        self,
        observation,
        prev_reward,
        prev_action_onehot,
        hidden_state_policy,
        hidden_state_value,
    ):
        # Prepare input by concatenating observation, reward, and action
        x = torch.cat(
            [observation, prev_reward.unsqueeze(-1), prev_action_onehot], dim=-1
        )

        # LSTM forward pass
        policy_out, hidden_state_policy = self.lstm_policy(x, hidden_state_policy)
        value_out, hidden_state_value = self.lstm_value(x, hidden_state_value)

        # Actor (Policy): output action probabilities
        action_probs = F.softmax(self.fc_policy(policy_out[:, -1, :]), dim=-1)

        # Critic (Value): output state value estimate
        state_value = self.fc_value(value_out[:, -1, :])

        return action_probs, state_value, hidden_state_policy, hidden_state_value

    def compute_loss(
        self, action_probs, state_value, reward, done, prev_state_value, entropy
    ):
        # Advantage function
        advantage = reward - prev_state_value

        # Actor loss (policy gradient)
        actor_loss = -torch.mean(torch.log(action_probs) * advantage)

        # Critic loss (value function)
        critic_loss = F.mse_loss(state_value, reward)

        # Entropy regularization
        entropy_loss = -torch.mean(entropy)

        # Total loss
        total_loss = actor_loss + critic_loss + self.entropy_weight * entropy_loss
        return total_loss


# Hyperparameters
input_size = 10  # Example input size (observation, reward, action)
num_actions = 2  # Number of possible actions
hidden_size = 48  # LSTM hidden size

# Initialize the A2C Network
net = A2CNetwork(
    input_size=input_size,
    hidden_size_policy=hidden_size,
    hidden_size_value=hidden_size,
    num_actions=num_actions,
)

# Optimizer
optimizer = optim.Adam(net.parameters(), lr=1e-4)


# Function to train the agent
def train_agent():
    hidden_state_policy = (
        torch.zeros(1, 1, hidden_size),
        torch.zeros(1, 1, hidden_size),
    )
    hidden_state_value = (
        torch.zeros(1, 1, hidden_size),
        torch.zeros(1, 1, hidden_size),
    )

    for episode in range(1000):  # Simulate for 1000 episodes
        observation = torch.randn(1, input_size)  # Example observation
        prev_reward = torch.tensor([1.0])  # Example previous reward
        prev_action_onehot = torch.randn(
            1, num_actions
        )  # Example one-hot encoded action from previous step

        # Forward pass through the network
        action_probs, state_value, hidden_state_policy, hidden_state_value = net(
            observation,
            prev_reward,
            prev_action_onehot,
            hidden_state_policy.reshape(-1),
            hidden_state_value.squeeze(),
        )

        # Calculate the entropy (for regularization)
        entropy = -torch.sum(action_probs * torch.log(action_probs), dim=-1)

        # Get the reward (just for illustration purposes)
        reward = torch.tensor([1.0])  # Example reward from environment
        done = torch.tensor([False])  # Example terminal state (for simplicity)

        # Compute the loss
        total_loss = net.compute_loss(
            action_probs, state_value, reward, done, state_value, entropy
        )

        # Backpropagation
        optimizer.zero_grad()
        total_loss.backward()
        optimizer.step()

        print(f"Episode {episode+1}, Loss: {total_loss.item()}")


# Run training
train_agent()

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim

# load the dataset, split into input (X) and output (y) variables
dataset = np.loadtxt("pima-indians-diabetes.csv", delimiter=",")
X = dataset[:, 0:8]
y = dataset[:, 8]

X = torch.tensor(X, dtype=torch.float32)
y = torch.tensor(y, dtype=torch.float32).reshape(-1, 1)

# define the model
model = nn.Sequential(
    nn.Linear(8, 12),
    nn.ReLU(),
    nn.Linear(12, 8),
    nn.ReLU(),
    nn.Linear(8, 1),
    nn.Sigmoid(),
)
print(model)

# train the model
loss_fn = nn.BCELoss()  # binary cross entropy
optimizer = optim.Adam(model.parameters(), lr=0.001)

n_epochs = 100
batch_size = 10

for epoch in range(n_epochs):
    for i in range(0, len(X), batch_size):
        Xbatch = X[i : i + batch_size]
        y_pred = model(Xbatch)
        ybatch = y[i : i + batch_size]
        loss = loss_fn(y_pred, ybatch)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    print(f"Finished epoch {epoch}, latest loss {loss}")

# compute accuracy (no_grad is optional)
with torch.no_grad():
    y_pred = model(X)
accuracy = (y_pred.round() == y).float().mean()
print(f"Accuracy {accuracy}")

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F


def lstm_step(x_t, h_t_prev, c_t_prev, W_i, W_f, W_c, W_o, b_i, b_f, b_c, b_o):
    i_t = torch.sigmoid(torch.matmul(W_i, x_t) + torch.matmul(W_i, h_t_prev) + b_i)
    f_t = torch.sigmoid(torch.matmul(W_f, x_t) + torch.matmul(W_f, h_t_prev) + b_f)
    c_t = f_t * c_t_prev + i_t * torch.tanh(
        torch.matmul(W_c, x_t) + torch.matmul(W_c, h_t_prev) + b_c
    )
    o_t = torch.sigmoid(torch.matmul(W_o, x_t) + torch.matmul(W_o, h_t_prev) + b_o)
    h_t = o_t * torch.tanh(c_t)

    return h_t, c_t


# Example initialization (random weights and biases)
input_size = 10
hidden_size = 20

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

W_i = torch.randn(hidden_size, input_size, device=device)
W_f = torch.randn(hidden_size, input_size, device=device)
W_c = torch.randn(hidden_size, input_size, device=device)
W_o = torch.randn(hidden_size, input_size, device=device)
b_i = torch.randn(hidden_size, device=device)
b_f = torch.randn(hidden_size, device=device)
b_c = torch.randn(hidden_size, device=device)
b_o = torch.randn(hidden_size, device=device)

# Example input and previous state
x_t = torch.randn(input_size, device=device)
h_t_prev = torch.randn(hidden_size, device=device)
c_t_prev = torch.randn(hidden_size, device=device)

# Compute next LSTM state
h_t, c_t = lstm_step(x_t, h_t_prev, c_t_prev, W_i, W_f, W_c, W_o, b_i, b_f, b_c, b_o)
print("Next Hidden State:", h_t)
print("Next Cell State:", c_t)

In [3]:
import torch
import torch.nn as nn


class LSTMCell(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(LSTMCell, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size

        # A single linear layer for all four gates at once
        self.lstm_gate = nn.Linear(input_size + hidden_size, 4 * hidden_size)

        self.sigmoid = nn.Sigmoid()
        self.tanh = nn.Tanh()

    def forward(self, x, hidden):
        h_prev, c_prev = hidden

        # Concatenate input and previous hidden state
        combined = torch.cat((x, h_prev), dim=1)

        # Compute all gates in a single matrix multiplication
        gates = self.lstm_gate(combined)

        # Split the output into four parts for input, forget, cell, and output gates
        i_t, f_t, c_tilde, o_t = torch.chunk(gates, 4, dim=1)

        # Apply activation functions
        i_t = self.sigmoid(i_t)  # Input gate
        f_t = self.sigmoid(f_t)  # Forget gate
        c_tilde = self.tanh(c_tilde)  # Candidate cell state
        o_t = self.sigmoid(o_t)  # Output gate

        # Compute new cell state and hidden state
        c_t = f_t * c_prev + i_t * c_tilde
        h_t = o_t * self.tanh(c_t)

        return h_t, c_t


# Example usage
input_size = 10
hidden_size = 20
batch_size = 5

lstm_cell = LSTMCell(input_size, hidden_size)

# Create random input tensor
x = torch.randn(batch_size, input_size)

# Initialize previous hidden and cell states with zeros
h_prev = torch.zeros(batch_size, hidden_size)
c_prev = torch.zeros(batch_size, hidden_size)

# Forward pass
h_next, c_next = lstm_cell(x, (h_prev, c_prev))

print("Next Hidden State Shape:", h_next.shape)  # Should be (batch_size, hidden_size)
print("Next Cell State Shape:", c_next.shape)  # Should be (batch_size, hidden_size)

Next Hidden State Shape: torch.Size([5, 20])
Next Cell State Shape: torch.Size([5, 20])
