In [1]:
import tensorflow as tf
import keras

input_shape = [4]
n_outputs = 2

model = keras.Sequential([
    keras.layers.Dense(128, activation="elu", input_shape=input_shape),
    keras.layers.Dense(56, activation="elu"),
    keras.layers.Dense(n_outputs, activation="linear")
])

In [2]:
import numpy as np

def epsilon_greedy_policy(state, epsilon, index, total_episodes):
    if index < total_episodes / 10:
        return np.random.randint(n_outputs)
    random_num = np.random.rand()
    if random_num < epsilon:
        return np.random.randint(n_outputs)
    else:
        Q_values = model.predict([state])
        return Q_values.argmax()

In [3]:
from collections import deque

replay_buffer = deque(maxlen=2000)

In [4]:
def sample_experiences(batch_size):
    indices = np.random.randint(len(replay_buffer), size=batch_size)
    batch = [replay_buffer[index] for index in indices]
    return [
        [experience[0] for experience in batch],
        [experience[1] for experience in batch],
        [experience[2] for experience in batch],
        [experience[3] for experience in batch],
        [experience[4] for experience in batch]
    ]

In [5]:
def play_one_step(env, state, epsilon, episode, total_episodes):
    action = epsilon_greedy_policy(state, epsilon, episode, total_episodes)
    next_state, reward, done = env.simulate_step(action)
    replay_buffer.append((state, action, reward, next_state, done))
    return next_state, reward, done

In [6]:
batch_size = 32
discount_factor = 0.95
optimizer = keras.optimizers.Nadam(learning_rate=1e-2)
loss_fn = keras.losses.mean_squared_error

def training_step(batch_size):
    experiences = sample_experiences(batch_size)
    states = experiences[0]
    actions = experiences[1]
    rewards = experiences[2]
    next_states = experiences[3]
    dones = experiences[4]
    next_Q_values = model.predict(next_states, verbose=0) # type: ignore
    max_next_Q_values = next_Q_values.max(axis=1)
    runs = 1.0 - (1 if dones else 0)
    target_Q_values = rewards + runs * discount_factor * max_next_Q_values
    target_Q_values = target_Q_values.reshape(-1, 1)
    mask = tf.one_hot(actions, n_outputs)
    
    with tf.GradientTape() as tape:
        stacked_states = np.stack(states, axis=0)
        all_Q_values = model(stacked_states)
        Q_values = tf.reduce_sum(all_Q_values * mask, axis=1, keepdims=True)
        loss = tf.reduce_mean(loss_fn(target_Q_values, Q_values))

    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(grads, model.trainable_variables)) # type: ignore



In [7]:
import numpy as np
from typing import List
from scipy.integrate import solve_ivp


class InvertedPendulum:
    """
    Class representing an inverted pendulum simulator.

    Attributes:
        g (float): Acceleration due to gravity.
        m (float): Mass of the pendulum.
        l (float): Length of the pendulum.
        I (float): Moment of inertia of the pendulum.
        dt (float): Time step for simulation.
        max_force (float): Maximum force applied to the pendulum.
        voltage_to_force_scale (float): Scaling factor for converting voltage to force.
        max_theta (float): Maximum angle of the pendulum.
        track_length (float): Length of the track on which the pendulum moves.
        cart_position (float): Initial position of the pendulum on the track.
        state (List[float]): Current state of the pendulum [theta, omega, x, x_dot].
    """

    def __init__(self) -> None:
        """
        Initialize the InvertedPendulum object.
        """
        self.g = 9.81
        self.m = 0.17
        self.l = 0.305
        self.dt = 0.02
        self.I = self._calculate_moment_of_inertia()

        self.max_force = 2.3
        self.max_theta = np.radians(25)
        self.track_length = 0.5
        self.cart_position = 0.25

        self.state = [np.pi, 0.1, self.cart_position, 0]

    def _calculate_moment_of_inertia(self) -> float:
        additional_mass = 0.09
        attachment_position = self.l * 7 / 8
        additional_moment_of_inertia = additional_mass * attachment_position**2
        return self.m * self.l**2 + additional_moment_of_inertia

    def _calculate_force(self, action: np.intp) -> float:
        """
        Calculate the force based on the given action.

        Args:
            action (np.intp): Action to be performed.

        Returns:
            float: Force to be applied.

        Raises:
            ValueError: If the direction is invalid.
        """

        if action in {1, -1}:
            return action * self.max_force

        raise ValueError("Invalid direction")

    def equations_of_motion(
        self, state: List[float], applied_force: float
    ) -> List[float]:
        """
        Calculate the equations of motion for the pendulum.

        Args:
            state (List[float]): Current state of the pendulum [theta, omega, x, x_dot].
            applied_force (float): Force applied to the pendulum.

        Returns:
            List[float]: Derivatives of the state variables [dtheta_dt, domega_dt, dx_dt, dv_dt].
        """
        theta, omega, _, x_dot = state
        theta_from_vertical = theta - np.pi

        dL_dtheta = -self.m * self.g * self.l * np.sin(theta_from_vertical)
        dL_domega = self.I * omega

        domega_dt = (
            dL_domega - dL_dtheta
        ) / self.I + applied_force * self.l / self.I * np.cos(theta_from_vertical)
        dtheta_dt = omega

        dv_dt = applied_force / self.m
        dx_dt = x_dot

        return [dtheta_dt, domega_dt, dx_dt, dv_dt]

    def simulate_step(self, action: np.intp) -> tuple[List[float], float, bool]:
        """
        Simulate a single step of the pendulum.

        Args:
            action (np.intp): Action to be performed.

        Returns:
            tuple[List[float], float, bool]: New state, reward, and terminal flag.
        """
        applied_force = self._calculate_force(action)
        solution = solve_ivp(
            lambda _, state: self.equations_of_motion(state, applied_force),
            [0, self.dt],
            self.state,
            method="RK45",
            t_eval=[self.dt],
        )
        new_state = solution.y[:, -1]
        self.state = self.enforce_constraints(new_state)
        return (
            self.state,
            self.calculate_reward(self.state),
            self.terminal_state(self.state),
        )

    def enforce_constraints(self, state: List[float]) -> List[float]:
        """
        Enforce constraints on the pendulum state.

        Args:
            state (List[float]): Current state of the pendulum [theta, omega, x, x_dot].

        Returns:
            List[float]: Updated state of the pendulum.
        """
        theta, omega, x, x_dot = state
        theta_from_vertical = theta - np.pi

        at_boundary = x <= 0 or x >= self.track_length

        if at_boundary:
            x = np.clip(x, 0, self.track_length)
            if x_dot != 0:
                impulse = -x_dot * self.m
                omega += impulse * self.l / self.I
            x_dot = 0

        if abs(theta_from_vertical) > self.max_theta:
            omega *= -0.6

        theta_from_vertical = np.clip(
            theta_from_vertical, -self.max_theta, self.max_theta
        )

        theta = theta_from_vertical + np.pi
        return [theta, omega, x, x_dot]

    def calculate_reward(self, state):
        """
        Calculate the reward based on the current state.

        Args:
            state (List[float]): Current state of the pendulum [theta, omega, x, x_dot].

        Returns:
            float: Reward value.
        """
        theta, _, _, _ = state
        target_angle = np.pi

        angle_difference = np.abs(theta - target_angle)

        reward = 1.0 / (1.0 + angle_difference)

        return reward

    def terminal_state(self, state):
        """
        Check if the current state is a terminal state.

        Args:
            state (List[float]): Current state of the pendulum [theta, omega, x, x_dot].

        Returns:
            bool: True if terminal state, False otherwise.
        """
        theta, _, x, _ = state
        max_angle = 3.58
        min_angle = 2.71
        min_position = 0.0
        max_position = 0.5

        if theta >= max_angle or theta <= min_angle:
            return True
        elif x >= max_position or x <= min_position:
            return True
        return False

    def reset(self) -> List[float]:
        """
        Reset the pendulum to its initial state.

        Returns:
            List[float]: Initial state of the pendulum [theta, omega, x, x_dot].
        """
        random_angular_velocity = np.random.uniform(-0.5, 0.5)
        self.state = [np.pi, random_angular_velocity, 0.25, 0]
        return self.state


In [8]:
env = InvertedPendulum()

total_episodes = 600
for episode in range(total_episodes):
    state = env.reset()
    for step in range(200):
        epsilon = max(1 - episode / 500, 0.01)
        new_state, reward, done = play_one_step(env, state, epsilon, episode, total_episodes)

        if done:
            break

    if episode > 50:
        training_step(batch_size)



In [10]:
model.save("trained_model-dq.h5")



