In [4]:
import numpy as np

class Q_Learning:

    ##########

    def __init__(self, env, alpha, gamma, epsilon, numberOfEpisodes, numberOfBins, lowerBounds, upperBounds):
      
        import numpy as np

        self.env = env # env - Cart Pole environment
        self.alpha = alpha # alpha - step size (learning rate)
        self.gamma = gamma # gamma - discount rate
        self.epsilon = epsilon # epsilon - parameter for epsilon-greedy approach
        self.actionNumber = env.action_space.n
        self.numberOfEpisodes = numberOfEpisodes  # numberOfEpisodes - total number of simulation episodes
        self.numberOfBins = numberOfBins # numberOfBins - list of 4 integers, number of bins for state discretization
        self.lowerBounds = lowerBounds # lowerBounds - list of 4 floats, lower bounds for state discretization
        self.upperBounds = upperBounds # upperBounds - list of 4 floats, upper bounds for state discretization
         
        # List to store the sum of rewards for each episode
        self.sumRewardsEpisode = []
         
        # Initialize Q-value matrix with random values
        self.Qmatrix=np.random.uniform(low=0, high=1, size=(numberOfBins[0],numberOfBins[1],numberOfBins[2],numberOfBins[3],self.actionNumber))
         
    ##########

    def returnIndexState(self, state):

    # This function converts a continuous state into a discrete index tuple for indexing the Q-value matrix based on the discretization grid
    # INPUT: state - list of 4 elements: [cart position, cart velocity, pole angle, pole angular velocity]
    # OUTPUT: 4-dimensional tuple representing the indices for the Q-value matrix

        # Extract state variables
        position = state[0]
        velocity = state[1]
        angle = state[2]
        angularVelocity = state[3]
        
        # Create discretization bins for each state variable
        cartPositionBin = np.linspace(self.lowerBounds[0], self.upperBounds[0], self.numberOfBins[0])
        cartVelocityBin = np.linspace(self.lowerBounds[1], self.upperBounds[1], self.numberOfBins[1])
        poleAngleBin = np.linspace(self.lowerBounds[2], self.upperBounds[2], self.numberOfBins[2])
        poleAngularVelocityBin = np.linspace(self.lowerBounds[3], self.upperBounds[3], self.numberOfBins[3])
         
        # Digitize each state variable to find the corresponding bin index
        indexPosition = np.maximum(np.digitize(position, cartPositionBin) - 1, 0)
        indexVelocity = np.maximum(np.digitize(velocity, cartVelocityBin) - 1, 0)
        indexAngle = np.maximum(np.digitize(angle, poleAngleBin) - 1, 0)
        indexAngularVelocity = np.maximum(np.digitize(angularVelocity, poleAngularVelocityBin) - 1, 0)
         
        # Return the indices as a tuple
        return tuple([indexPosition, indexVelocity, indexAngle, indexAngularVelocity])   

    ##########

    def selectAction(self, state, index):

    # This function selects an action based on the current state using the epsilon-greedy approach
    # INPUTS: state - the current state for which to compute the action, index - the current episode index
    # OUTPUT: An action selected according to the epsilon-greedy policy

        # For the first 500 episodes, select completely random actions to ensure exploration
        if index < 500:
            return np.random.choice(self.actionNumber)
        
        # Generate a random number in the interval [0.0, 1.0) for epsilon-greedy selection
        randomNumber = np.random.random()
        
        # After 7000 episodes, gradually decrease the epsilon parameter to reduce exploration
        if index > 7000:
            self.epsilon *= 0.999
        
        # If the random number is less than epsilon, select a random action (exploration)
        if randomNumber < self.epsilon:
            return np.random.choice(self.actionNumber)
        
        # Otherwise, select the action with the highest Q-value (exploitation)
        else:
            # Get the discretized index of the current state
            stateIndex = self.returnIndexState(state)
        
            # Find the action(s) with the maximum Q-value for the current state
            maxQActions = np.where(self.Qmatrix[stateIndex] == np.max(self.Qmatrix[stateIndex]))[0]
        
            # Randomly choose among the actions with the highest Q-value
            return np.random.choice(maxQActions)
     
    ##########
      
    def simulateEpisodes(self):

    # function for simulating learning episodes

        import numpy as np
        
        # loop through the episodes
        for indexEpisode in range(self.numberOfEpisodes):
            
            # list that stores rewards per episode - this is necessary for keeping track of convergence 
            rewardsEpisode = []
            
            # reset the environment at the beginning of every episode
            (stateS, _) = self.env.reset()
            stateS = list(stateS)
            
            print("Simulating episode {}".format(indexEpisode))
            
            # step from one state to another until a terminal state is reached
            terminalState = False
            while not terminalState:
                # return a discretized index of the state
                stateSIndex = self.returnIndexState(stateS)
                
                # select an action on the basis of the current state, denoted by stateS
                actionA = self.selectAction(stateS, indexEpisode)
                
                # step and return the state, reward, and boolean denoting if the state is a terminal state
                # prime means that it's the next state
                (stateSprime, reward, terminalState, _, _) = self.env.step(actionA)          
                
                rewardsEpisode.append(reward)
                
                stateSprime = list(stateSprime)
                
                stateSprimeIndex = self.returnIndexState(stateSprime)
                
                # return the max value
                QmaxPrime = np.max(self.Qmatrix[stateSprimeIndex])                                               
                                                
                if not terminalState:
                    # stateS + (actionA,) - to append the tuples
                    error = reward + self.gamma * QmaxPrime - self.Qmatrix[stateSIndex + (actionA,)]
                    self.Qmatrix[stateSIndex + (actionA,)] = self.Qmatrix[stateSIndex + (actionA,)] + self.alpha * error
                else:
                    # in the terminal state, Qmatrix[stateSprime, actionAprime] = 0 
                    error = reward - self.Qmatrix[stateSIndex + (actionA,)]
                    self.Qmatrix[stateSIndex + (actionA,)] = self.Qmatrix[stateSIndex + (actionA,)] + self.alpha * error
                
                # set the current state to the next state                    
                stateS = stateSprime
            
            print("Sum of rewards {}".format(np.sum(rewardsEpisode)))        
            self.sumRewardsEpisode.append(np.sum(rewardsEpisode))
         
    ##########
     
    def simulateLearnedStrategy(self):
        
    # function for simulating the final learned optimal policy
    # OUTPUT: env1 - created Cart Pole environment, obtainedRewards - a list of obtained rewards during time steps of a single episode

        import gym 
        import time
        
        # Create the Cart Pole environment with human render mode
        env1 = gym.make('CartPole-v1', render_mode='human')
        
        # Reset the environment to get the initial state
        (currentState, _) = env1.reset()
        env1.render()
        
        # Define the number of time steps
        timeSteps = 1000
        
        # List to store obtained rewards at every time step
        obtainedRewards = []
        
        for timeIndex in range(timeSteps):
            print(timeIndex)
            
            # Select greedy actions
            actionInStateS = np.random.choice(np.where(self.Qmatrix[self.returnIndexState(currentState)] == np.max(self.Qmatrix[self.returnIndexState(currentState)]))[0])
            
            # Take the action and get the next state, reward, and whether the state is terminal
            currentState, reward, terminated, truncated, info = env1.step(actionInStateS)
            
            # Append the reward to the list
            obtainedRewards.append(reward)
            
            # Sleep for a short duration to slow down the simulation for human view
            time.sleep(0.05)
            
            if terminated:
                # If the episode ends, sleep for a second and break the loop
                time.sleep(1)
                break
        
        return obtainedRewards, env1

    ##########

    def simulateRandomStrategy(self):
    
    # function for simulating random actions many times to evaluate the optimal policy and to compare it with a random policy
    # OUTPUT: sumRewardsEpisodes - every entry of this list is a sum of rewards obtained by simulating the corresponding episode, env2 - created Cart Pole environment
    
        import gym 
        import time
        import numpy as np
        
        # Create the Cart Pole environment
        env2 = gym.make('CartPole-v1')
        
        # Reset the environment to get the initial state
        (currentState, _) = env2.reset()
        env2.render()
        
        # Number of simulation episodes
        episodeNumber = 100
        
        # Time steps in every episode
        timeSteps = 1000
        
        # Sum of rewards in each episode
        sumRewardsEpisodes = []
        
        for episodeIndex in range(episodeNumber):
            rewardsSingleEpisode = []
            
            # Reset the environment at the start of each episode
            initial_state = env2.reset()
            print(episodeIndex)
            
            for timeIndex in range(timeSteps):
                # Select a random action
                random_action = env2.action_space.sample()
                
                # Take the action and get the next state, reward, and whether the state is terminal
                observation, reward, terminated, truncated, info = env2.step(random_action)
                
                # Append the reward to the list
                rewardsSingleEpisode.append(reward)
                
                if terminated:
                    break
            
            # Append the sum of rewards for this episode to the list
            sumRewardsEpisodes.append(np.sum(rewardsSingleEpisode))
        
        return sumRewardsEpisodes, env2