In [12]:
import torch

from maf.model import DQN

input_dim = 84
batch_size = 1
num_channels = 1
output_dim = 9

x = torch.rand([input_dim, input_dim])
x = x.resize(batch_size, num_channels, input_dim, input_dim)
y = torch.randn(batch_size, output_dim)
model = DQN(num_channels, output_dim)

pred = model.forward(x)

assert pred.shape == y.shape



In [15]:
from collections import deque
import torch.optim as optim
import torch.nn as nn
import numpy as np
import random


class DeepQNetwork:
    def __init__(
        self,
        input_dim,
        output_dim,
        learning_rate=0.001,
        gamma=0.99,
        epsilon=1.0,
        epsilon_decay=0.995,
        min_epsilon=0.01,
        batch_size=64,
        memory_size=10000,
    ):
        self.input_dim = input_dim
        self.output_dim = output_dim
        self.learning_rate = learning_rate
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay
        self.min_epsilon = min_epsilon
        self.batch_size = batch_size
        self.memory = deque(maxlen=memory_size)

        # Initialize Q-network and target network
        self.q_network = DQN(input_dim, output_dim)
        self.target_network = DQN(input_dim, output_dim)
        self.target_network.load_state_dict(self.q_network.state_dict())
        self.optimizer = optim.Adam(self.q_network.parameters(), lr=learning_rate)
        self.loss_fn = nn.MSELoss()

    def select_action(self, state):
        if np.random.rand() < self.epsilon:
            return np.random.randint(self.output_dim)
        else:
            with torch.no_grad():
                q_values = self.q_network(torch.tensor(state).float())
                return torch.argmax(q_values).item()

    def remember(self, state, action, reward, next_state):
        self.memory.append((state, action, reward, next_state))
        return self.replay()  # Return the loss value from replay()

    def replay(self):
        if len(self.memory) < self.batch_size:
            return 0  # If memory is not enough to replay, return 0 as loss

        batch = random.sample(self.memory, self.batch_size)
        states, actions, rewards, next_states = zip(*batch)

        states = torch.tensor(states).float()
        actions = torch.tensor(actions).unsqueeze(1)
        rewards = torch.tensor(rewards).unsqueeze(1)
        next_states = torch.tensor(next_states).float()

        q_values = self.q_network(states).gather(1, actions)

        next_q_values = self.target_network(next_states).detach().max(1)[0].unsqueeze(1)
        target_q_values = rewards + self.gamma * next_q_values

        loss = self.loss_fn(q_values, target_q_values)

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        self.epsilon = max(self.epsilon * self.epsilon_decay, self.min_epsilon)

        return loss.item()  # Return the loss value

    def update_target_network(self):
        self.target_network.load_state_dict(self.q_network.state_dict())


# Instantiate the DeepQNetwork class
input_dim = 1  # Dimension of input state
output_dim = 9  # Dimension of output action
dqn = DeepQNetwork(input_dim, output_dim)

# Dummy dataframe, replace with your actual data
# data = {
#     'stateEstimate.x': np.random.randn(10),
#     'stateEstimate.y': np.random.randn(10),
#     'stateEstimate.z': np.random.randn(10),
#     'stateEstimate.vx': np.random.randn(10),
#     'stateEstimate.vy': np.random.randn(10),
#     'stateEstimate.vz': np.random.randn(10)
# }
# df = pd.DataFrame(data)

# Train the model
num_epochs = 100  # Number of training epochs
for epoch in range(num_epochs):
    total_loss = 0  # Variable to accumulate the total loss for this epoch
    state = torch.randn(1, 84, 84)


#     for _, row in df.iterrows():
#         state = row[["stateEstimate.x", "stateEstimate.y", "stateEstimate.z"]].values
#         action = row[
#             ["stateEstimate.vx", "stateEstimate.vy", "stateEstimate.vz"]
#         ].values
#         action_index = dqn.select_action(state)
#         # Placeholder for action execution and next state observation
#         next_state = np.random.randn(
#             input_dim
#         )  # Placeholder for next state observation
#         reward = np.random.randn()  # Placeholder for reward observation
#         loss = dqn.remember(
#             state, action_index, reward, next_state
#         )  # Get the loss value from replay()
#         total_loss += loss  # Accumulate the loss for this batch

#     avg_loss = total_loss / len(df)  # Calculate average loss for this epoch
#     print(f"Epoch [{epoch + 1}/{num_epochs}], Average Loss: {avg_loss:.4f}")
#     dqn.update_target_network()


# torch.save(dqn.q_network.state_dict(), "trained_model.pth")


# Load trained model
# loaded_model = DQN(input_dim, output_dim)
# loaded_model.load_state_dict(torch.load("trained_model.pth"))
# loaded_model.eval()  # Set the model to evaluation mode


# # Assuming you have loaded the model as 'loaded_model' as shown in the previous response

# # Assuming you have new input data stored in a NumPy array called 'input_data'
# input_data = np.array([[-0.22130723297595978, 0.09820868074893951, 0.359013170003891]])

# # Convert input data to torch tensor and feed it to the model
# with torch.no_grad():
#     input_tensor = torch.tensor(input_data).float()
#     output_tensor = loaded_model(input_tensor)

# # Convert output tensor back to NumPy array
# predicted_output = output_tensor.numpy()

# print("Predicted output:", predicted_output)