In [37]:
# pip install gym

In [38]:
import numpy as np
import gym
from gym import spaces

import torch
import torch.nn as nn
import torch.optim as optim
from torch.distributions import MultivariateNormal
import matplotlib.pyplot as plt
from collections import deque


In [39]:

class QuadrotorEnv(gym.Env):
    metadata = {'render.modes': ['console']}

    def __init__(self):
        super(QuadrotorEnv, self).__init__()

        # Constants
        self.g = 9.81  # gravity
        self.m = 1.0   # mass of the UAV
        self.mu = 0.05 # damping factor
        self.dt = 0.02 # time step

        # Define action and observation space
        # Actions are thrust T, angle phi, angle theta
        self.action_space = spaces.Box(low=np.array([0, -np.pi, -np.pi]), 
                                       high=np.array([20, np.pi, np.pi]), dtype=np.float32)

        # Observation space: x, y, z, vx, vy, vz
        self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(6,), dtype=np.float32)

        # State initialization
        self.state = None
        self.reset()

    def step(self, action):
        T, phi, theta = action
        x, y, z, vx, vy, vz = self.state

        # Calculate accelerations
        ax = (-0.7071 * np.cos(phi) * np.sin(theta) - 0.7071 * np.sin(phi)) * T / self.m
        ay = (-0.7071 * np.cos(phi) * np.sin(theta) - 0.7071 * np.sin(phi)) * T / self.m
        az = self.g - (np.cos(phi) * np.cos(theta)) * T / self.m

        # Update velocities
        vx += (ax - self.mu * vx) * self.dt
        vy += (ay - self.mu * vy) * self.dt
        vz += (az - self.mu * vz) * self.dt

        # Update positions
        x += vx * self.dt
        y += vy * self.dt
        z += vz * self.dt

        # Update state
        self.state = np.array([x, y, z, vx, vy, vz])

        # Calculate reward (placeholder)
        reward = -np.sqrt((x - 5 * np.cos(1.2 * self.current_step * self.dt))**2 + (y - 5 * np.sin(1.2 * self.current_step * self.dt))**2 + (z + 20)**2)

        # Check if UAV is within the reasonable bounds (this is a simple check)
        done = z < -25 or self.current_step > 1000

        self.current_step += 1

        # Optionally we could add more info
        info = {}

        return self.state, reward, done, info

    def reset(self):
        # Reset the state
        self.state = np.array([0.0, 0.0, 0.0, 1.0, -1.0, 0.0], dtype=np.float32)
        self.current_step = 0
        return self.state

    def render(self, mode='console'):
        if mode == 'console':
            print(f'State: {self.state}')


In [40]:

# SAC Agent
class SAC_Agent:
    def __init__(self, state_dim, action_dim, hidden_dim=256, lr=0.0003):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.hidden_dim = hidden_dim
        self.lr = lr

        # Actor network
        self.actor = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, action_dim)
        )
        
        # Q networks
        self.q1 = nn.Sequential(
            nn.Linear(state_dim + action_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 1)
        )
        
        self.q2 = nn.Sequential(
            nn.Linear(state_dim + action_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 1)
        )
        
        # Target Q networks
        self.target_q1 = nn.Sequential(
            nn.Linear(state_dim + action_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 1)
        )
        
        self.target_q2 = nn.Sequential(
            nn.Linear(state_dim + action_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 1)
        )

        # Initialize target Q networks with Q networks
        self.target_q1.load_state_dict(self.q1.state_dict())
        self.target_q2.load_state_dict(self.q2.state_dict())

        # Optimizers
        self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=lr)
        self.q_optimizer = optim.Adam(list(self.q1.parameters()) + list(self.q2.parameters()), lr=lr)

        # Initialize policy network
        # self.policy = PolicyNetwork(state_dim, action_dim, hidden_dim)

    def choose_action(self, state):
        state = torch.FloatTensor(state).unsqueeze(0)
        action_mean = self.actor(state)
        dist = torch.distributions.Normal(action_mean, torch.ones_like(action_mean) * 0.1)
        action = dist.sample()
        log_prob = dist.log_prob(action)
        return action.detach().numpy()[0], log_prob.detach()

    # def choose_action(self, state):
    #     state = torch.FloatTensor(state).unsqueeze(0)
    #     action_mean, action_log_std = self.policy(state)
    #     std = torch.exp(action_log_std)
    #     normal = torch.distributions.Normal(action_mean, std)
    #     sampled_action = normal.sample()
    #     return sampled_action.detach().numpy()[0]  # Return thrust, roll, and pitch values separately


    def train_agent(self, state_batch, action_batch, reward_batch, next_state_batch, done_batch, gamma=0.99, alpha=0.2, tau=0.005):
        state_batch = torch.FloatTensor(state_batch)
        action_batch = torch.FloatTensor(action_batch)
        reward_batch = torch.FloatTensor(reward_batch)
        next_state_batch = torch.FloatTensor(next_state_batch)
        done_batch = torch.FloatTensor(done_batch)

        # Update Q networks
        next_actions, next_log_probs = self.choose_action(next_state_batch)
        target_q1_next = self.target_q1(torch.cat([next_state_batch, next_actions], dim=-1))
        target_q2_next = self.target_q2(torch.cat([next_state_batch, next_actions], dim=-1))
        target_min_q_next = torch.min(target_q1_next, target_q2_next) - alpha * next_log_probs
        target_q = reward_batch + gamma * (1 - done_batch) * target_min_q_next.detach()

        q1_loss = nn.MSELoss()(self.q1(torch.cat([state_batch, action_batch], dim=-1)), target_q)
        q2_loss = nn.MSELoss()(self.q2(torch.cat([state_batch, action_batch], dim=-1)), target_q)

        self.q_optimizer.zero_grad()
        q1_loss.backward()
        q2_loss.backward()
        self.q_optimizer.step()

        # Update actor network and alpha
        actions, log_probs = self.choose_action(state_batch)
        q1 = self.q1(torch.cat([state_batch, actions.unsqueeze(-1)], dim=-1))
        q2 = self.q2(torch.cat([state_batch, actions.unsqueeze(-1)], dim=-1))
        min_q = torch.min(q1, q2)

        actor_loss = (alpha * log_probs - min_q).mean()
        
        self.actor_optimizer.zero_grad()
        actor_loss.backward()
        self.actor_optimizer.step()

        # Update target Q networks
        for target_param, param in zip(self.target_q1.parameters(), self.q1.parameters()):
            target_param.data.copy_(tau * param.data + (1 - tau) * target_param.data)
        
        for target_param, param in zip(self.target_q2.parameters(), self.q2.parameters()):
            target_param.data.copy_(tau * param.data + (1 - tau) * target_param.data)


In [41]:

# # Main training loop
# env = QuadrotorEnv() 
# state_dim = env.observation_space.shape[0]
# action_dim = env.action_space.shape[0]

# agent = SAC_Agent(state_dim, action_dim)

# EPISODES = 1000
# for episode in range(EPISODES):
#     state = env.reset()
#     done = False
#     total_reward = 0
#     while not done:
#         action, _ = agent.choose_action(state)
#         next_state, reward, done, _ = env.step(action)
#         agent.train_agent(state, action, reward, next_state, done)
#         state = next_state
#         total_reward += reward
#     print(f"Episode: {episode + 1}, Total Reward: {total_reward}")


In [42]:
# # Initialize the environment
# env = QuadrotorEnv() 

# # Initialize SAC agent
# agent = SAC_Agent(state_dim, action_dim)

# # Initialize replay buffer
# replay_buffer = deque(maxlen=10000)

# num_iterations = 10
# environment_steps = 10
# gradient_steps = 10


# # Main training loop
# for iteration in range(num_iterations):
#     # Environment steps
#     for step in range(environment_steps):
#         state = env.reset()
#         done = False
#         while not done:
#             thrust, phi, theta = agent.choose_action(state)
#             action = np.array([thrust, phi, theta])
#             next_state, reward, done, _ = env.step(action)

#             # action = agent.choose_action(state)
#             # next_state, reward, done, _ = env.step(action)
#             replay_buffer.append((state, action, reward, next_state))
#             state = next_state

#     # Gradient steps
#     for gradient_step in range(gradient_steps):
#         # Sample batch from replay buffer
#         batch_size = min(batch_size, len(replay_buffer))
#         batch = random.sample(replay_buffer, batch_size)
#         states, actions, rewards, next_states = zip(*batch)

#         # Update Q-function parameters
#         agent.update_q(states, actions, rewards, next_states)

#         # Update policy weights
#         agent.update_policy(states)

#         # Adjust temperature
#         agent.update_alpha(states)

#         # Update target network weights
#         agent.update_target_networks()

# # Output: θ1, θ2, φ


In [43]:
# # Main training loop
# env = QuadrotorEnv() 
# state_dim = env.observation_space.shape[0]
# action_dim = env.action_space.shape[0]

# agent = SAC_Agent(state_dim, action_dim)

# replay_buffer = deque(maxlen=100000)  # Replay buffer

# NUM_ITERATIONS = 10
# ENVIRONMENT_STEPS = 10
# GRADIENT_STEPS = 10

# for iteration in range(NUM_ITERATIONS):
#     # Environment step
#     for step in range(ENVIRONMENT_STEPS):
#         state = env.reset()
#         done = False
#         while not done:
#             action = agent.choose_action(state)
#             next_state, reward, done, _ = env.step(action)
#             replay_buffer.append((state, action, reward, next_state))
#             state = next_state
    
#     # Gradient step
#     for _ in range(GRADIENT_STEPS):
#         # Sample a mini-batch from replay buffer
#         batch_indices = np.random.choice(len(replay_buffer), BATCH_SIZE, replace=False)
#         batch = [replay_buffer[i] for i in batch_indices]
#         state_batch, action_batch, reward_batch, next_state_batch = zip(*batch)
        
#         # Update Q-function parameters
#         agent.update_q(state_batch, action_batch, reward_batch, next_state_batch)
        
#         # Update policy weights
#         agent.update_policy(state_batch)
        
#         # Adjust temperature
#         agent.adjust_temperature()
        
#         # Update target network weights
#         agent.update_target_networks()

# # Output: θ1, θ2, φ


In [44]:
# # Initialize the environment
# env = QuadrotorEnv() 

# # Initialize replay buffer D
# replay_buffer = deque(maxlen=10000)

# # Initial parameters
# theta1 = agent.q1.state_dict()
# theta2 = agent.q2.state_dict()
# phi = agent.actor.state_dict()

# # Initialize target network weights
# target_theta1 = theta1.copy()
# target_theta2 = theta2.copy()

# # Hyperparameters
# lambda_q = 0.001
# lambda_pi = 0.001
# lambda_alpha = 0.001
# tau = 0.005


# NUM_ITERATIONS = 10
# NUM_ENVIRONMENT_STEPS = 10
# NUM_GRADIENT_STEPS = 10

# # Main training loop
# for iteration in range(NUM_ITERATIONS):
#     # Environment step
#     for step in range(NUM_ENVIRONMENT_STEPS):
#         state = env.reset()
#         done = False
#         while not done:
#             # Sample action from the policy
#             action = agent.choose_action(state)
#             next_state, reward, done, _ = env.step(action)
#             # Store the transition in the replay pool
#             replay_buffer.append((state, action, reward, next_state))
#             state = next_state

#     # Gradient step
#     for gradient_step in range(NUM_GRADIENT_STEPS):
#         # Update Q-function parameters
#         q1_loss = update_q_function(replay_buffer, agent.q1, agent.target_q1, agent.q_optimizer, theta1, theta2, phi, lambda_q, tau)
#         q2_loss = update_q_function(replay_buffer, agent.q2, agent.target_q2, agent.q_optimizer, theta2, theta1, phi, lambda_q, tau)

#         # Update policy weights
#         policy_loss = update_policy(replay_buffer, agent.policy, agent.policy_optimizer, theta1, theta2, phi, lambda_pi)

#         # Adjust temperature
#         alpha_loss = update_temperature(replay_buffer, agent.log_alpha, agent.alpha_optimizer, lambda_alpha)

#         # Update target network weights
#         update_target_network(target_theta1, theta1, tau)
#         update_target_network(target_theta2, theta2, tau)

#     # Output parameters
#     theta1 = target_theta1.copy()
#     theta2 = target_theta2.copy()
