#### Reinforcement Learning for Engineers

Reinforcement Learning (RL) is a paradigm of machine learning and optimal control where an **agent** learns to make decisions by interacting with an **environment** to maximize a cumulative **reward**. Unlike supervised learning, the agent isn’t given correct actions but instead **experiments** with actions, learning through feedback (rewards). The agent observes the current **state** of the environment, takes an **action**, and receives a **reward** (scalar feedback signal), then the environment transitions to a new state. This loop continues over time (see **Figure 1**). The agent seeks a **policy** (mapping states to actions) that maximizes expected cumulative rewards. Key concepts:

*  **States**: Observations of the environment.
*  **Actions**: Decisions made by the agent.
*  **Rewards**: Immediate feedback signals.
*  **Policy**: Strategy to select actions.

Agents face the **exploration vs. exploitation dilemma**: exploring new actions to find higher rewards vs. exploiting known rewarding actions.

**Markov Decision Processes (MDPs):**

RL problems often use Markov Decision Processes (MDPs), a mathematical framework for sequential decision-making under uncertainty. An MDP is defined by a tuple:

$ \mathcal{M} = (\mathcal{S}, \mathcal{A}, P, R, \gamma) $

where:

*  $\mathcal{S}$ = Set of **states**.
*  $\mathcal{A}$ = Set of **actions**.
*  $P(s'|s,a)$ = **Transition probability** to state $s'$ given state $s$ and action $a$.
*  $R(s,a,s')$ = **Reward** from transitioning state $s$ to $s'$ via action $a$.
*  $\gamma \in [0,1)$ = **Discount factor**, weighing future vs. immediate rewards.

MDPs satisfy the **Markov property**: future states depend only on current state and action. The **optimal policy** $\pi^*(s)$ maximizes expected long-term rewards. MDP solutions involve computing **value functions** $V(s)$ or **action-value functions** $Q(s,a)$. The **Bellman optimality equation** for value functions:

$ V^*(s) = \max_{a \in \mathcal{A}} \sum_{s'} P(s'|s,a)\left[ R(s,a,s') + \gamma V^*(s') \right] $

Similarly, the optimal Q-value:

$ Q^*(s,a) = \sum_{s'}P(s'|s,a)\left[R(s,a,s') + \gamma \max_{a'}Q^*(s',a')\right] $

**Model-Free vs. Model-Based RL:**

A key distinction:

*  **Model-based RL**: Uses or learns a model $P(s'|s,a)$ and reward function, enabling planning and simulation (e.g., AlphaZero).
*  **Model-free RL**: Learns directly from trial-and-error interaction, without explicit models. Common and simpler, but typically requires more environment interactions.

Hybrid approaches like Dyna-Q use learned models to simulate additional experiences.

**Applications of RL in Engineering Optimization:**

RL applies broadly to engineering tasks involving sequential decision-making or control:

*  **Chemical Engineering**: Process control and reaction optimization (e.g., reactor settings, energy minimization, yield improvement).
*  **Mechanical Engineering**: Robotic control, autonomous systems (e.g., robotic arms, drones, inverted pendulum).
*  **Automotive**: Autonomous driving (lane-keeping, cruise control, collision avoidance).
*  **Industrial Energy Management**: HVAC optimization (e.g., DeepMind reduced Google data center cooling energy by ~40%).

These applications demonstrate RL’s effectiveness in engineering optimization, addressing complex, uncertain, and dynamic conditions.

#### Deep Deterministic Policy Gradient (DDPG)

**Overview:** DDPG combines policy gradients and Q-learning for continuous action spaces using actor-critic methods.

**Updates:**
*  **Critic Loss:**
$ L(\theta_Q) = \left(Q_{\theta_Q}(s,a) - [r + \gamma Q_{\theta_Q^-}(s',\mu_{\theta_\mu^-}(s'))]\right)^2 $
*  **Actor Update:**
$ \nabla_{\theta_\mu} J \approx \mathbb{E}_{s}\left[\nabla_a Q_{\theta_Q}(s,a)|_{a=\mu(s)}\nabla_{\theta_\mu}\mu_{\theta_\mu}(s)\right] $

**Applications:** Robotics (pendulum balancing), chemical process control, portfolio management, voltage regulation in power systems.

###  Definition of Symbols

*  $Q(s,a)$: Action-value function.
*  $\alpha$: Learning rate.
*  $\gamma$: Discount factor.
*  $r$: Reward.
*  $\theta$: Parameters of neural network or policy.
*  $\pi_{\theta}(a|s)$: Policy distribution parameterized by $\theta$.
*  $G_t$: Return (cumulative discounted reward from time $t$ onwards).
*  $\hat{A}_t$: Advantage estimate at time $t$.
*  $\epsilon$: PPO clip range hyperparameter.
*  $\mu(s)$: Deterministic policy function.

#### Hand Tracking with MediaPipe

Use MediaPipe Hands to track hand positions and control a cart-pole system and an LED. Hand gestures are detected in real-time using a webcam and used to interact with simulated or physical objects. Use computer vision for hand detection with MediaPipe and control a cart-pole balancing system using hand movements. Adjust LED brightness and blinking rate with hand tracking and explore applications in machine learning, automation, and control systems.

📄 <a href='https://apmonitor.com/pds/index.php/Main/HandTracking'>Hand Tracking Activity</a>

Below is code to run gymnasium CartPole-v1.

In [None]:
pip install gymnasium

In [None]:
pip install pygame

In [None]:
import gymnasium as gym
env = gym.make('CartPole-v1', render_mode='rgb_array')
observation = env.reset()
for _ in range(10):
  env.render()
  # Input:
  #   Force to the cart with actions: 0=left, 1=right
  # Returns:
  #   obs = cart position, cart velocity, pole angle, rot rate
  #   reward = +1 for every timestep
  #   done = True when abs(angle)>15 or abs(cart pos)>2.4
  action = env.action_space.sample() # random action
  observation, reward, done, info, e = env.step(action)

  print(observation)

  if done:
    observation = env.reset()
env.close()

#### Pendulum RL with DDPG

See additional explanation at <a href='https://apmonitor.com/do/index.php/Main/RLGymnasium'>RL with Gymnasium</a>

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import gymnasium as gym

# Make Pendulum environment
env = gym.make('Pendulum-v1', g=9.81)  # g is gravitational acceleration, default 10
state_dim = env.observation_space.shape[0]   # dimension of state (should be 3 for Pendulum)
action_dim = env.action_space.shape[0]       # dimension of action (1 for Pendulum)
max_action = float(env.action_space.high[0]) # max torque (=2.0)

In [None]:
# Actor Network: maps state -> action (within [-max_action, max_action])
class Actor(nn.Module):
    def __init__(self, state_dim, action_dim, max_action):
        super(Actor, self).__init__()
        self.max_action = max_action
        # Simple 2-layer MLP
        self.net = nn.Sequential(
            nn.Linear(state_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, action_dim)
        )
    def forward(self, state):
        # Output raw action, then scale to range [-max_action, max_action] using tanh
        raw_action = self.net(state)
        # bound output action between -1 and 1 via tanh, then scale
        action = self.max_action * torch.tanh(raw_action)
        return action

# Critic Network: maps (state, action) -> Q-value
class Critic(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(Critic, self).__init__()
        # Q-network takes state and action concatenated
        self.net = nn.Sequential(
            nn.Linear(state_dim + action_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, 1)
        )
    def forward(self, state, action):
        # Ensure state and action are concatenated as vectors
        if action.dim() == 1:
            action = action.unsqueeze(1)
        x = torch.cat([state, action], dim=1)
        Q = self.net(x)
        return Q

In [None]:
# Replay Buffer for experience replay
class ReplayBuffer:
    def __init__(self, capacity=100000):
        self.capacity = capacity
        self.buffer = []
        self.pos = 0  # position to insert next entry (for circular buffer)
    def add(self, state, action, reward, next_state, done):
        if len(self.buffer) < self.capacity:
            self.buffer.append(None)
        self.buffer[self.pos] = (state, action, reward, next_state, done)
        # Move position pointer (overwrite oldest if full)
        self.pos = (self.pos + 1) % self.capacity
    def sample(self, batch_size):
        batch = np.random.choice(len(self.buffer), batch_size, replace=False)
        states, actions, rewards, next_states, dones = zip(*(self.buffer[i] for i in batch))
        # Convert to torch tensors
        return (torch.tensor(np.array(states), dtype=torch.float32),
                torch.tensor(np.array(actions), dtype=torch.float32),
                torch.tensor(np.array(rewards), dtype=torch.float32).unsqueeze(1),
                torch.tensor(np.array(next_states), dtype=torch.float32),
                torch.tensor(np.array(dones), dtype=torch.float32).unsqueeze(1))
    def __len__(self):
        return len(self.buffer)

In [None]:
# Initialize actor, critic, target networks and optimizers
actor = Actor(state_dim, action_dim, max_action)
critic = Critic(state_dim, action_dim)
target_actor = Actor(state_dim, action_dim, max_action)
target_critic = Critic(state_dim, action_dim)
# Copy weights from actor to target_actor, and critic to target_critic
target_actor.load_state_dict(actor.state_dict())
target_critic.load_state_dict(critic.state_dict())
target_actor.eval()
target_critic.eval()

actor_optimizer = optim.Adam(actor.parameters(), lr=1e-3)
critic_optimizer = optim.Adam(critic.parameters(), lr=1e-3)
buffer = ReplayBuffer(capacity=100000)

In [None]:
import math

num_episodes = 200       # number of episodes to train
batch_size = 64          # batch size for sampling from replay
gamma = 0.99             # discount factor
tau = 0.005              # target network update rate (tau)
exploration_noise = 0.1  # stddev for Gaussian exploration noise

for episode in range(num_episodes):
    state, _ = env.reset()
    state = state.astype(np.float32)
    episode_reward = 0.0
    for step in range(500):  # max steps per episode (Pendulum typically truncated at 200)
        # Select action according to current policy + exploration noise
        state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
        with torch.no_grad():
            action = actor(state_tensor).cpu().numpy()[0]
        # Add exploration noise (Gaussian)
        action = action + np.random.normal(0, exploration_noise * max_action, size=action_dim)
        action = np.clip(action, -max_action, max_action)
        next_state, reward, terminated, truncated, info = env.step(action)
        done = terminated or truncated
        next_state = next_state.astype(np.float32)
        # Store transition in replay buffer
        buffer.add(state, action, reward, next_state, done)
        state = next_state
        episode_reward += reward

        # Train the networks if we have enough samples in replay buffer
        if len(buffer) >= batch_size:
            # Sample a batch
            states, actions, rewards, next_states, dones = buffer.sample(batch_size)
            # Compute target Q values using target networks
            with torch.no_grad():
                # Target actor for next action
                next_actions = target_actor(next_states)
                target_Q = target_critic(next_states, next_actions)
                # If done (terminal), no future reward; use (1-done) mask
                target_Q = rewards + gamma * (1 - dones) * target_Q
            # Critic loss = MSE between current Q and target Q
            current_Q = critic(states, actions)
            critic_loss = nn.MSELoss()(current_Q, target_Q)
            # Update critic
            critic_optimizer.zero_grad()
            critic_loss.backward()
            critic_optimizer.step()

            # Actor loss = -mean(Q) (because we want to maximize Q, so minimize -Q)
            actor_actions = actor(states)
            actor_loss = -critic(states, actor_actions).mean()
            # Update actor
            actor_optimizer.zero_grad()
            actor_loss.backward()
            actor_optimizer.step()

            # Soft update target networks
            for param, target_param in zip(critic.parameters(), target_critic.parameters()):
                target_param.data.copy_(tau * param.data + (1 - tau) * target_param.data)
            for param, target_param in zip(actor.parameters(), target_actor.parameters()):
                target_param.data.copy_(tau * param.data + (1 - tau) * target_param.data)

        if done:
            break  # episode ends
    # Logging (print) the cumulative reward of the episode
    print(f"Episode {episode+1}: Reward = {episode_reward:.2f}")