In [1]:
import gym
from gym import spaces
import numpy as np

class CustomCartPoleEnv(gym.Env):
    metadata = {'render.modes': ['human']}

    def __init__(self):
        super(CustomCartPoleEnv, self).__init__()
        # Define action and observation space
        # Actions: continuous force applied to the cart
        self.action_space = spaces.Box(low=-10, high=10, shape=(1,), dtype=np.float32)
        # States: position, velocity, angle, angular velocity
        self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(4,), dtype=np.float32)
        # Constants
        self.cart_mass = 0.5
        self.pend_mass = 0.2
        self.arm_length = 3
        self.g = 9.8
        self.d = 1  # Damping
        self.b = 1  # Friction
        # Time step
        self.dt = 0.02
        # Initial state
        self.state = None
    
    def step(self, action):
            u = action[0]
            x, x_dot, theta, theta_dot = self.state
    
            # Dynamics derived from the equations provided
            Sx = np.sin(theta)
            Cx = np.cos(theta)
            D = self.pend_mass * (self.arm_length ** 2) * (self.pend_mass + self.cart_mass * (1 - Cx ** 2))
    
            x_dot_dot = (1/D) * (-(self.pend_mass**2)*(self.arm_length**2)*self.g*Cx*Sx + self.pend_mass*(self.arm_length**2)*(self.pend_mass*self.arm_length*(theta_dot**2)*Sx-self.d*x_dot)) + self.pend_mass*(self.arm_length**2)*(1/D)*u
            theta_dot_dot = (1/D) * ((self.pend_mass+self.cart_mass)*self.pend_mass*self.g*self.arm_length*Sx - self.pend_mass*self.arm_length*Cx*(self.pend_mass*self.arm_length*(theta_dot**2)*Sx-self.d*x_dot)) - self.pend_mass*self.arm_length*Cx*(1/D)*u
    
            # Update state using Euler integration
            x += x_dot * self.dt
            x_dot += x_dot_dot * self.dt
            theta += theta_dot * self.dt
            theta_dot += theta_dot_dot * self.dt
    
            self.state = np.array([x, x_dot, theta, theta_dot])
    
            # Check if the episode is done
            done = bool(
                x < -2.4
                or x > 2.4
                or theta < -np.pi / 6
                or theta > np.pi / 6
            )
    
            if not done:
                reward = 1.0  # Reward for every step taken
            else:
                reward = 0.0  # No reward if out of bounds
    
            return np.array(self.state), reward, done, {}

    def reset(self):
        self.state = np.random.normal(0, 0.05, size=(4,))
        return np.array(self.state)

    def render(self, mode='human', close=False):
        pass  # This would be your animation or visualization


In [8]:
import torch
import torch.nn as nn
import torch.optim as optim
import gym
import numpy as np
import random
from collections import deque

class DQN(nn.Module):
    def __init__(self, state_size, action_size):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(state_size, 24)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(24, 24)
        self.fc3 = nn.Linear(24, action_size)

    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.relu(self.fc2(x))
        return self.fc3(x)

class DQNAgent:
    def __init__(self, state_size, action_size, action_range):
        self.state_size = state_size
        self.action_size = action_size
        self.action_range = action_range
        self.memory = deque(maxlen=2000)
        self.gamma = 0.95  # discount factor
        self.epsilon = 1.0  # exploration rate
        self.epsilon_decay = 0.995
        self.epsilon_min = 0.01
        self.learning_rate = 0.001
        self.model = DQN(state_size, action_size)
        self.optimizer = optim.Adam(self.model.parameters(), lr=self.learning_rate)

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        if random.random() <= self.epsilon:
            return random.randrange(self.action_range[0], self.action_range[1])
        state = torch.FloatTensor(state).unsqueeze(0)
        with torch.no_grad():
            action_values = self.model(state)
        return torch.argmax(action_values).item()

    def replay(self, batch_size):
        minibatch = random.sample(self.memory, batch_size)
        for state, action, reward, next_state, done in minibatch:
            state = torch.FloatTensor(state)
            next_state = torch.FloatTensor(next_state)
            reward = torch.tensor(reward)
            action = torch.tensor(action)
            done = torch.tensor(done)

            if not done:
                target = reward + self.gamma * torch.max(self.model(next_state))
            else:
                target = reward

            current_q = self.model(state)[action]
            loss = nn.functional.mse_loss(current_q, target)

            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()

        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

# Create Gym environment
env = gym.make('CartPole-v1')
state_size = env.observation_space.shape[0]
action_size = 1
action_range = [-10, 10]
agent = DQNAgent(state_size, action_size, action_range)
batch_size = 10

# Train the agent
num_episodes = 1000
for e in range(num_episodes):
    state = env.reset()

    for time in range(500):  # adjust based on the particular environment
        action = agent.act(state)
        next_state, reward, done, _ = env.step(action)
        reward = reward if not done else -10
        next_state = np.reshape(next_state, [1, state_size])
        agent.remember(state, action, reward, next_state, done)
        state = next_state
        if done:
            print(f"Episode: {e+1}/{num_episodes}, Score: {time}, Epsilon: {agent.epsilon:.2}")
            break

    if len(agent.memory) > batch_size:
        agent.replay(batch_size)


AssertionError: 4 (<class 'int'>) invalid