In [122]:
import numpy as np

In [202]:
#model environment parameters
adjustment_interval_min = 15
charging_duration_min = 120
min_charging_rate_kW = 0
max_charging_rate_kW = 22
battery_capacity_kWh = 40

expected_usage_kWh = 30
sigma_kWh = 5

#optimizer (Reinforcement Learner) parameters
fully_exhousted_penalty = 0.99

#computed parameters
num_charging_adjustments = int(charging_duration_min/adjustment_interval_min)
print(num_charging_adjustments)

#series
Intervals = np.linspace(0, num_charging_adjustments - 1, num_charging_adjustments).astype(int)
Actions = np.linspace(min_charging_rate_kW, max_charging_rate_kW, max_charging_rate_kW + 1).astype(int)

A = np.ones(num_charging_adjustments)

print(Intervals, Actions, A)

8
[0 1 2 3 4 5 6 7] [ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22] [0. 0. 0. 0. 0. 0. 0. 0.] [1. 1. 1. 1. 1. 1. 1. 1.]


In [124]:
samples = np.random.normal(expected_usage_kWh, sigma_kWh, 10)

print(samples)

[34.69887272 21.49787134 30.87573638 33.42189521 37.16105302 31.22192948
 32.49166767 31.72393138 35.73044428 30.84258531]


In [216]:
def calcChargingCost(t, p):
    e = np.exp(1)
    return A[t]*(e**p)

def rewardF(totalChargeByInterval, requiredkWh):
    totalCharged = totalChargeByInterval/(int(60/adjustment_interval_min))
    # diviation penalty
    divitation = abs(30 - totalCharged)
    divitationCost = max(divitation, .001)
    # extra penalty for running out
    runOutCost = -1_000_000_000 if requiredkWh > totalCharged else 0
    return  divitationCost + runOutCost

print(rewardF(120, 30))

0.001


In [294]:
#generate env

def convertActionToChargingPower(value):
    if value == 0:
        return 0
    if value == 1:
        return 6
    if value == 2:
        return 12
    if value == 3:
        return 18

class env:
    def __init__(self):
        self.nA = 4
        self.nS = self.totalNodes(num_charging_adjustments)
        self.P = self.create_P()
        """ 
        print(self.nS)
        print(self.nA)
        print(self.P) """

    def totalNodes(self, layer):
        totalNodes = 0
        for lvl in range(0, layer):
            totalNodes = totalNodes + self.nA**lvl
        return totalNodes

    def create_P(self):
        P = []

        numberOfActions = self.nA
        
        def getActionValues(level, levelId):
            actionValue = levelId%numberOfActions
            values = [actionValue]
            ids = [levelId]
            for lvl in range(0, level):
                lastActionId = ids[0]//numberOfActions
                lastActionValue = lastActionId%numberOfActions
                values.insert(0, lastActionValue)
                ids.insert(0, lastActionId)
            return values

        actions = list()

        states = list()

        print("Actions", self.totalNodes(num_charging_adjustments))
        print("States", self.totalNodes(num_charging_adjustments))

        for actionLevel in range(0, num_charging_adjustments + 1):#+1 because the root of the tree has no pretaken action
            print(actionLevel)
            lastLayerActions = self.totalNodes(actionLevel)
            layerActions = self.totalNodes(actionLevel+1)

            states.append([])

            for levelActionId in range(0, numberOfActions**actionLevel):
                # levelActionId:
                #0 0 0 0
                #  1 1 1
                #    2 2
                #      3
                #
                # globalIds:
                #0 1 3 6
                #  2 4 7
                #    5 8
                #      9
                #

                #construct location in state tree
                action = levelActionId%numberOfActions
                globalActionId = (lastLayerActions-1) + levelActionId
                actions.append(0)

                originStateLevelId = levelActionId//numberOfActions
                originStateId = originStateLevelId+self.totalNodes(actionLevel-1)
                destinationStateId = self.totalNodes(actionLevel)+(originStateLevelId*numberOfActions)+action

                if(originStateId == len(P)):
                    P.append([])
                elif(originStateId > len(P)):
                    print("!Skipped state!")


                if(actionLevel != num_charging_adjustments):
                    actionTransition = ( 1, destinationStateId, int(-(calcChargingCost(actionLevel - 1, convertActionToChargingPower(action))/4)), False)
                    actions[globalActionId] = actionTransition
                    P[originStateId].append([actionTransition])
                else:
                    actionValues = getActionValues(actionLevel, levelActionId)
                    #remove root node
                    actionValues = actionValues[1:]
                    charge = 0
                    for i, action in enumerate(actionValues):
                        charge = charge + convertActionToChargingPower(action)
                    rewardValue = rewardF(charge, 30)-(calcChargingCost(actionLevel - 1, convertActionToChargingPower(action))/4)
                    actionTransition = (1, destinationStateId, int(rewardValue), True)
                    actions[globalActionId] = actionTransition
                    P[originStateId].append([actionTransition])
        #remove root node
        P[0] = P[0][1:]
        return P

env = env()

Actions 21845
States 21845
0
1
2
3
4
5
6
7
8


In [295]:
#take example path always taking action 2
for prob, next_state, reward, done in env.P[0][0]:
    print("1. Action: ", prob, next_state, reward, done)
    for prob, next_state, reward, done in env.P[next_state][3]:
        print("2. Action: ", prob, next_state, reward, done)
        for prob, next_state, reward, done in env.P[next_state][3]:
                print("3. Action: ", prob, next_state, reward, done)
                for prob, next_state, reward, done in env.P[next_state][3]:
                        print("4. Action: ", prob, next_state, reward, done)
                        for prob, next_state, reward, done in env.P[next_state][2]:
                                print("5. Action: ", prob, next_state, reward, done)
                                for prob, next_state, reward, done in env.P[next_state][3]:
                                        print("6. Action: ", prob, next_state, reward, done)
                                        for prob, next_state, reward, done in env.P[next_state][0]:
                                                print("7. Action: ", prob, next_state, reward, done)
                                                for prob, next_state, reward, done in env.P[next_state][3]:
                                                    print("8. Action: ", prob, next_state, reward, done)


convertActionToChargingPower(2)


1. Action:  1 1 0 False
2. Action:  1 8 -16414992 False
3. Action:  1 36 -16414992 False
4. Action:  1 148 -16414992 False
5. Action:  1 595 -40688 False
6. Action:  1 2384 -16414992 False
7. Action:  1 9537 0 False
8. Action:  1 38152 -1016414987 True


12

In [None]:
def value_iteration(env, theta=0.0001, discount_factor=1.0):
    """
    Value Iteration Algorithm.
    
    Args:
        env: OpenAI env. env.P represents the transition probabilities of the environment.
            env.P[s][a] is a list of transition tuples (prob, next_state, reward, done).
            env.nS is a number of states in the environment. 
            env.nA is a number of actions in the environment.
        theta: We stop evaluation once our value function change is less than theta for all states.
        discount_factor: Gamma discount factor.
        
    Returns:
        A tuple (policy, V) of the optimal policy and the optimal value function.
    """
    
    def one_step_lookahead(state, V):
        """
        Helper function to calculate the value for all action in a given state.
        
        Args:
            state: The state to consider (int)
            V: The value to use as an estimator, Vector of length env.nS
        
        Returns:
            A vector of length env.nA containing the expected value of each action.
        """
        A = np.zeros(env.nA)
        for a in range(env.nA):
            for prob, next_state, reward, done in env.P[state][a]:
                A[a] += prob * (reward + discount_factor * V[next_state])
        return A
    
    V = np.zeros(env.nS)
    while True:
        # Stopping condition
        delta = 0
        # Update each state...
        for s in range(env.nS):
            # Do a one-step lookahead to find the best action
            A = one_step_lookahead(s, V)
            best_action_value = np.max(A)
            # Calculate delta across all states seen so far
            delta = max(delta, np.abs(best_action_value - V[s]))
            # Update the value function. Ref: Sutton book eq. 4.10. 
            V[s] = best_action_value        
        # Check if we can stop 
        if delta < theta:
            break
    
    # Create a deterministic policy using the optimal value function
    policy = np.zeros([env.nS, env.nA])
    for s in range(env.nS):
        # One step lookahead to find the best action for this state
        A = one_step_lookahead(s, V)
        best_action = np.argmax(A)
        # Always take the best action
        policy[s, best_action] = 1.0
    
    return policy, V