In [None]:
'''This repository contains a detailed implementation of the Reinforcement Learning Agent class'''
import matplotlib.pyplot as plt
import numpy as np
from dataclasses import *
import torch 
from typing import Any, Callable, Dict, List, Tuple, Union, Optional
from functools import wraps
import random

In [None]:
@dataclass
class EnforceClassTyping:
    def __post_init__(self):
        for (name, field_type) in self.__annotations__.items():
            if not isinstance(self.__dict__[name], field_type):
                current_type = type(self.__dict__[name])
                raise TypeError(f"The field `{name}` was assigned by `{current_type}` instead of `{field_type}`")
        # print("Check is passed successfully")
def EnforceMethodTyping(func: Callable) -> Callable:
    'Enforces type annotation/hints for class mathods'
    arg_annotations = func.__annotations__
    if not arg_annotations:
        return func

    @wraps(func)
    def wrapper(self, *args: Tuple[Any], **kwargs: Dict[str, Any]) -> Any:
        for arg, annotation in zip(args, arg_annotations.values()):
            if not isinstance(arg, annotation):
                raise TypeError(f"Expected {annotation} for argument {arg}, got {type(arg)}.")

        for arg_name, arg_value in kwargs.items():
            if arg_name in arg_annotations:
                annotation = arg_annotations[arg_name]
                if not isinstance(arg_value, annotation):
                    raise TypeError(f"Expected {annotation} for keyword argument {arg_name}, got {type(arg_value)}.")

        return func(self, *args, **kwargs)

    return wrapper
def EnforceFunctionTyping(func: Callable) -> Callable:
    'Enforces type annotation/hints for other functions'
    @wraps(func)
    def wrapper(*args, **kwargs):
        # Check positional arguments
        for arg, annotation in zip(args, func.__annotations__.values()):
            if not isinstance(arg, annotation):
                raise TypeError(f"Expected {annotation} for {arg}, got {type(arg)}.")

        # Check keyword arguments
        for arg_name, arg_value in kwargs.items():
            if arg_name in func.__annotations__:
                annotation = func.__annotations__[arg_name]
                if not isinstance(arg_value, annotation):
                    raise TypeError(f"Expected {annotation} for {arg_name}, got {type(arg_value)}.")

        return func(*args, **kwargs)

    return wrapper


In [None]:
@dataclass
class Source(EnforceClassTyping):
    'This class represents the electric field sources with its position in the field(Position) and the magnitude of the source(Charge)'
    Position: torch.Tensor # m
    Charge: float #C


In [None]:
@EnforceFunctionTyping
def ElectricField(FieldSources: list, ObservationPosition: torch.Tensor)->torch.Tensor:
    'This function takes a list of sources and outputs the field strength experienced at any given point(s). This determines the physics of the field(an electric field in this case)'
    CoulombConstant = 8.9875e9 #N*m^2/C^2
    for FieldSource in FieldSources:
        if type(FieldSource) != Source:
            raise TypeError("The input is not valid")
    if type(ObservationPosition[0]) != type(ObservationPosition[1]):
         raise TypeError("Incompatible Reference point data types")
    elif type(ObservationPosition[0]) != torch.Tensor:
        raise TypeError("Invalid Reference point data type")
    elif ObservationPosition[0].size()!=ObservationPosition[1].size():
        raise TypeError("Incompatible Reference point dimensions")
    else: 
        ElectricFieldVector = torch.zeros_like(ObservationPosition)
    for FieldSource in FieldSources:
        PositionMatrices= torch.stack([torch.ones_like(ObservationPosition[0])* FieldSource.Position[0].item(), 
                                        torch.ones_like(ObservationPosition[1])* FieldSource.Position[1].item()])
        DisplacemnetVector = ObservationPosition - PositionMatrices
        DisplacementMagnitude = torch.sqrt(DisplacemnetVector[0]**2 +DisplacemnetVector[1]**2)  # Magnitude of the displacement vector
        ElectricFieldVector += (CoulombConstant * FieldSource.Charge) / DisplacementMagnitude**3 * DisplacemnetVector
    return ElectricFieldVector #N/C or V/m


In [None]:
@EnforceFunctionTyping
def PlotField(Sources: list, ObservationPosition: torch.Tensor):
    'This funtion plots the 2D electric vector field'
    xd, yd = ElectricField(Sources, ObservationPosition)
    xd = xd / torch.sqrt(xd**2 + yd**2)
    yd = yd / torch.sqrt(xd**2 + yd**2)
    color_aara = torch.sqrt(xd**2+ yd**2)
    fig, ax = plt.subplots(1,1)
    cp = ax.quiver(ObservationPosition[0],ObservationPosition[1],xd,yd,color_aara)
    fig.colorbar(cp)
    plt.rcParams['figure.dpi'] = 150
    plt.show()


In [None]:
@dataclass 
class State(EnforceClassTyping):
    '''This class represents the state of the Agent with its Position, Momentum and the Field Strength if experiences at its Position. 
       These are parameters the agent is able to observe, they uniquely define the state of the agent.'''
    Position: torch.Tensor # m
    FieldStrength: torch.Tensor #N/C or V/m
    Momentum: torch.Tensor #kg*m/s
    def Unwrap(self)->torch.Tensor:
        '''This function converts the state parameters to a single tensor for processing. '''
        return torch.cat([self.Position, 
                          self.FieldStrength,
                          self.Momentum])


In [None]:
class CriticNetwork(EnforceClassTyping):
    '''This object represents the Value Function(Critic) used to estimate the expected value of a state-action pair.
    This value function is a neural network that will learn to more accuately predict the expected value given a state-action pair.'''
    def __init__(self, layer_sizes: list, layer_activations: list, layer_activations_derivative: list):
        self.layer_sizes= layer_sizes
        self.weights= [2 * torch.rand(layer_sizes[x], layer_sizes[x+1])- 1 for x in range(len(layer_sizes)-1)]
        self.bias= [2 * torch.rand(1, layer_sizes[x+1])- 1 for x in range(len(layer_sizes)-1)]
        self.layer_activations= layer_activations
        self.layer_activations_derivative= layer_activations_derivative
    def forward(self, StateInput: torch.Tensor, ActionInput: torch.Tensor, full: bool= False)-> torch.Tensor:
        'Takes State Parameters and Action Parameters to outputs the expected return of the state-action pair predicted by the Main critic network'
        InputData = torch.cat([StateInput, ActionInput], dim=StateInput.ndim-1)
        LayerConnections= []
        ActivatedNeuronLayer= [InputData]
        for i in range(len(self.weights)):
            LayerConnections.append(torch.matmul(ActivatedNeuronLayer[i], self.weights[i]) + self.bias[i]) 
            ActivatedNeuronLayer.append(self.layer_activations[i](LayerConnections[i]))
        if full is False:
            return ActivatedNeuronLayer[-1]
        else:
            return LayerConnections, ActivatedNeuronLayer
    def compute_gradients(self, StateInput: torch.Tensor, ActionInput: torch.Tensor, OptimalReturn: torch.Tensor, loss_derivative: Callable):
        '''This function computes the gradient of the weights and biases of the network using the given derivative of a loss function and '''
        BiasGradient = [torch.zeros_like(b) for b in self.bias]
        WeightGradient = [torch.zeros_like(w) for w in self.weights]
        zs , ActivatedNeuronLayer  = self.forward(StateInput, ActionInput, full= True)
        LayerError = loss_derivative(ActivatedNeuronLayer[-1], OptimalReturn) * self.layer_activations_derivative[-1](zs[-1])
        BiasGradient[-1] = LayerError
        WeightGradient[-1] = torch.matmul(ActivatedNeuronLayer[-2].t(), LayerError)
        if ActivatedNeuronLayer[0].ndim < 2:
            ActivatedNeuronLayer[0]= ActivatedNeuronLayer[0].unsqueeze(dim=0)
        for l in range(2, len(self.layer_sizes)):
            z = zs[-l]
            LayerError = torch.matmul(LayerError, self.weights[-l+1].t()) * self.layer_activations_derivative[-l](z)
            BiasGradient[-l] = LayerError
            WeightGradient[-l] = torch.matmul(ActivatedNeuronLayer[-l-1].t(), LayerError)
        return  WeightGradient, BiasGradient
    def update_model(self, weight_grad: list, bias_grad: list, learning_rate: float):
        for i in range(len(self.weights)):
            self.weights[i] -=  learning_rate * weight_grad[i]
            self.bias[i] -=  learning_rate * bias_grad[i]


In [None]:
class ActorNetwork(EnforceClassTyping):
    '''This object represents the Policy Function(Actor) used to predict the best action to take at any given a state.
       This policy function is a neural network that will learn to predict actions that lead to better rewards.'''
    def __init__(self, layer_sizes: list, layer_activations: list, layer_activations_derivative: list):
        self.layer_sizes= layer_sizes
        self.weights= [2 * torch.rand(layer_sizes[x], layer_sizes[x+1])- 1 for x in range(len(layer_sizes)-1)]
        self.bias= [2 * torch.rand(1, layer_sizes[x+1])- 1 for x in range(len(layer_sizes)-1)]
        self.layer_activations= layer_activations
        self.layer_activations_derivative= layer_activations_derivative
    def forward(self, StateInput: torch.Tensor, full: bool= False)-> torch.Tensor:
        'Takes State Parameters to outputs Action parameters(Force applied on the x and y axis by the agent/controller) predicted by the Main actor network'
        LayerConnections= []
        ActivatedNeuronLayer= [StateInput]
        for i in range(len(self.weights)):
            LayerConnections.append(torch.matmul(ActivatedNeuronLayer[i], self.weights[i]) + self.bias[i]) 
            ActivatedNeuronLayer.append(self.layer_activations[i](LayerConnections[i]))
        if full is False:
            return torch.squeeze(ActivatedNeuronLayer[-1])
        else:
            return LayerConnections, ActivatedNeuronLayer
    def compute_gradients(self, StateBatch: torch.Tensor, CriticModel: CriticNetwork)-> torch.Tensor:
        '''This function computes the gradient of the weights and biases of the network using the given derivative of a loss function, a batch of inputs and targets'''
        BiasGradient = [torch.zeros_like(b) for b in self.bias]
        WeightGradient = [torch.zeros_like(w) for w in self.weights]
        zs , ActivatedNeuronLayer  = self.forward(StateBatch, full= True)
        ActorLoss = -torch.mean(CriticModel.forward(StateBatch, ActivatedNeuronLayer[-1].squeeze()))
        LayerError = ActorLoss * self.layer_activations_derivative[-1](zs[-1])
        BiasGradient[-1] = LayerError
        WeightGradient[-1] = torch.matmul(ActivatedNeuronLayer[-2].t(), LayerError)
        for l in range(2, len(self.layer_sizes)):
            z = zs[-l]
            sp = self.layer_activations_derivative[-l](z)
            LayerError = torch.matmul(LayerError, self.weights[-l+1].t()) * sp
            BiasGradient[-l] = LayerError
            WeightGradient[-l] = torch.matmul((ActivatedNeuronLayer[-l-1].unsqueeze(dim=1)), LayerError)
        return  WeightGradient, BiasGradient
    def update_model(self, weight_grad, bias_grad, learning_rate):
        for i in range(len(self.weights)):
            self.weights[i] -=  learning_rate * weight_grad[i]
            self.bias[i] -=  learning_rate * bias_grad[i]


In [None]:
@dataclass 
class ReplayBuffer(EnforceClassTyping):
    '''This class represents the Replay buffer which stores state transitions(State, Action, NextState, Reward, Terminal Signal) which will be used to train the Actor and Critic Networks. 
    The replay buffer'''
    BufferSize: int
    Buffer: list = None
    def __post_init__(self):
        if self.Buffer is None:
            self.Buffer = []
    @EnforceMethodTyping
    def AddExperience(self, State: State, Action: torch.Tensor, NextState: State, Reward: float, TerminalState: bool):
        '''This method adds a state transition to the replay buffer'''
        if len(self.Buffer) < self.BufferSize:
            self.Buffer.append([State, Action, NextState, Reward, TerminalState])
        else:
            self.Buffer.pop(0)
            self.Buffer.append([State, Action, NextState, Reward, TerminalState])
    @EnforceMethodTyping
    def SampleBuffer(self, BatchSize: int):
        '''This method randomly samples the replay buffer to ouput a batches of state transition variables'''
        if len(self.Buffer) >= BatchSize:
            SampledBatch = random.sample(self.Buffer, BatchSize)
            SampledStates= [SampledState[0].Unwrap() for SampledState in SampledBatch]
            SampledActions= [SampledAction[1] for SampledAction in SampledBatch]
            SampledNextStates= [SampledNextState[2].Unwrap() for SampledNextState in SampledBatch]
            SampledRewards= [torch.Tensor([SampledReward[3]]) for SampledReward in SampledBatch]
            SampledTerminalSignals= [torch.Tensor([SampledTerminalSignal[4]]) for SampledTerminalSignal in SampledBatch]
            StateBatch= torch.stack(SampledStates)
            ActionBatch= torch.stack(SampledActions)
            NextStateBatch= torch.stack(SampledNextStates)
            RewardsBatch= torch.stack(SampledRewards)
            TerminalSignalsBatch= torch.stack(SampledTerminalSignals)
        else:
            raise ValueError('BatchSize too big')
        return StateBatch, ActionBatch, NextStateBatch, RewardsBatch, TerminalSignalsBatch


In [None]:
@dataclass
class Agent(EnforceClassTyping):
    '''This class represents the agent which will interact with the environment to create state state transitions which it will use to learn a good policy and value function.

    The Mass and Charge parameters deteremine how the interacts with its environment.
    The LearningRate, LossFunction, HiddenLayerSize, and MemorySize parameters determine its learning behaviour.'''
    Charge: float
    Mass: float
    LearningRate: float
    MemorySize: int
    ActorHiddenLayerSize: list
    ActorLayerActivations: list
    ActorLayerActivationDerivatives: list
    CriticHiddenLayerSize: list
    CriticLayerActivations: list
    CriticLayerActivationDerivatives: list
    CurrentState: State 
    Memory: ReplayBuffer = field(init=False) 
    ActorModel: ActorNetwork = field(init=False)
    CriticModel: CriticNetwork = field(init=False)
    ActorTargetModel: ActorNetwork = field(init=False)
    CriticTargetModel: CriticNetwork = field(init=False)
    def __post_init__(self):
        self.Memory= ReplayBuffer(self.MemorySize)
        self.ActorModel= ActorNetwork(self.ActorHiddenLayerSize, self.ActorLayerActivations, self.ActorLayerActivationDerivatives)
        self.ActorTargetModel= self.ActorModel
        self.CriticModel= CriticNetwork(self.CriticHiddenLayerSize, self.CriticLayerActivations, self.CriticLayerActivationDerivatives)
        self.CriticTargetModel= self.CriticModel
    def ForceGenerator(self, Action: torch.Tensor)-> torch.Tensor:
        ForceVector= Action* 20
        return ForceVector
    @EnforceMethodTyping
    def UpdateCritic(self, StateBatch: torch.Tensor, ActionBatch: torch.Tensor, NextStateBatch: torch.Tensor, RewardBatch: torch.Tensor, TerminalSignalsbatch: torch.Tensor, DiscountRate: float):
        'Updates the main critic network parameters by minimizing the difference between the bellman optimal expected return and the expected return predicted by the main critic network'
        NextAction= self.ActorTargetModel.forward(NextStateBatch)
        BellmanOptimalReturn= RewardBatch+ (1-TerminalSignalsbatch)*DiscountRate*self.CriticTargetModel.forward(NextStateBatch, NextAction)
        for i in range(len(StateBatch)):
            WeightGradient, BiasGradient= self.CriticModel.compute_gradients(StateBatch[i], ActionBatch[i], BellmanOptimalReturn[i], mse_grad)
            self.CriticModel.update_model(WeightGradient, BiasGradient, self.LearningRate)
    @EnforceMethodTyping
    def UpdateActor(self, StateBatch: torch.Tensor):
        'Updates the main actor network parameters by maximizing the Expected Q-value predicted by the main critic network'
        for i in range(len(StateBatch)):
            WeightGradient, BiasGradient= self.ActorModel.compute_gradients(StateBatch[i], self.CriticModel)
            self.ActorModel.update_model(WeightGradient, BiasGradient, self.LearningRate)
    @EnforceMethodTyping
    def UpdateTargetCritic(self, SoftUpdateRate: float):
        'Updates the target critic network parameters by making in it lag behind the main critic network updates'
        for i in range(len(self.CriticTargetModel.weights)):
            self.CriticTargetModel.weights[i]= self.CriticModel.weights[i] * SoftUpdateRate + self.CriticTargetModel.weights[i] * (1.0 - SoftUpdateRate)
    @EnforceMethodTyping 
    def UpdateTargetActor(self, SoftUpdateRate: float):
        'Updates the target actor network parameters by making in it lag behind the main actor network updates'
        for i in range(len(self.ActorTargetModel.weights)):
            self.ActorTargetModel.weights[i]= self.ActorModel.weights[i] * SoftUpdateRate + self.ActorTargetModel.weights[i] * (1.0 - SoftUpdateRate)