In [1]:
import sys, pprint, json, numpy as np
sys.path.append('../ppgsi_mdp_risk')
from ppgsi_mdp_risk.env.SimpleMDP import SimpleMDP

Define One-State Many-Actions Env (OSMA)

In [2]:
def convert_to_array(dictionary):
    '''Converts lists of values in a dictionary to numpy arrays'''
    return np.array([v for v in dictionary.values()])

In [3]:
# Parameters
states, actions = 1, 1    # States and Actions
p = 0.5                   # Probability
c = 1                     # Cost
s0 = 0                    # Initial State
alpha = 0.99              # Alpha
discount_factor = 1       # Discount Factor
max_iterations = 5        # Max of Iterations

In [4]:
s_mdp = SimpleMDP(
    num_states = states, 
    num_actions = actions, 
    _fixed_probability = p, 
    _float_probability = 0.1
)

In [5]:
# Define Y


In [6]:
def maximum_value_policy_cvar(V, PI, CVAR):
    res = []
    for s in V.keys():
        a = PI[s]
        Tp = T[s][a]

        for sl in Tp.keys():
            if Tp[sl] > 0:
                res.append(CVAR[sl])

    return max(res)

In [7]:
# Define Transition Function (T)
T = s_mdp.build_transition_probabilities()

# Define Value Function (V) and CVAR
V = s_mdp._build_V0(0)
CVAR0 = s_mdp._build_V0(0)
CVARy = s_mdp._build_V0(0)

# Define Policy (PI)
PI = s_mdp._build_PI0(0)

#############
# Algoritmo #
#############

i, _quiet = 0, False

# Etapa 01
while i < max_iterations:
    if not _quiet: print(f'[{i}/{max_iterations}] Executando Iteração')
    for s in V.keys():
        if not _quiet: print(f'[{i}/{max_iterations} - {s}] Calculando Estado.')
        
        a = PI[s] # Define Action
        V[s] = c + discount_factor * sum(convert_to_array(V) * convert_to_array(T[s][a])) # Calculate Risk Neutral Value Function
        CVAR0[s] = c + discount_factor * maximum_value_policy_cvar(V, PI, CVAR0)

    if not _quiet:
        print(f"""
            Value Function: {V} ||
            CVAR (0): {CVAR0}
        """)
    if not _quiet: print('-- ~~ >>')
    
    i += 1

[0/5] Executando Iteração
[0/5 - 0] Calculando Estado.
[0/5 - sG] Calculando Estado.

            Value Function: {0: 1.0, 'sG': 1.0} ||
            CVAR (0): {0: 1, 'sG': 2}
        
-- ~~ >>
[1/5] Executando Iteração
[1/5 - 0] Calculando Estado.
[1/5 - sG] Calculando Estado.

            Value Function: {0: 2.0, 'sG': 2.0} ||
            CVAR (0): {0: 3, 'sG': 4}
        
-- ~~ >>
[2/5] Executando Iteração
[2/5 - 0] Calculando Estado.
[2/5 - sG] Calculando Estado.

            Value Function: {0: 3.0, 'sG': 3.0} ||
            CVAR (0): {0: 5, 'sG': 6}
        
-- ~~ >>
[3/5] Executando Iteração
[3/5 - 0] Calculando Estado.
[3/5 - sG] Calculando Estado.

            Value Function: {0: 4.0, 'sG': 4.0} ||
            CVAR (0): {0: 7, 'sG': 8}
        
-- ~~ >>
[4/5] Executando Iteração
[4/5 - 0] Calculando Estado.
[4/5 - sG] Calculando Estado.

            Value Function: {0: 5.0, 'sG': 5.0} ||
            CVAR (0): {0: 9, 'sG': 10}
        
-- ~~ >>


In [8]:
# Etapa 2
CVAR = {}
for s in V.keys():
    if s == 'sG': continue
    
    a = PI[s]
    
    CVAR[s] = {}
    if not _quiet: print(f'[{s}] Calculando Estado.')
    t, PG, C, Vc, Ps = 0, {}, {}, {}, {}
    first_it = True
    
    PG[0] = 0
    C[0] = 0
    Vc[0] = 0
    Ps[0] = s_mdp._build_V0(0)
    Ps[0][s] = 1

    _max_it = 100
    _ = 0
    while first_it or ((1 - PG[t]) < alpha and (_ < _max_it)):
        first_it = False
        CVAR[s][(1 - PG[t])] = (V[s] - Vc[t] * PG[t]) / (1 - PG[t])
        t += 1

        Ps[t] = s_mdp._build_V0(0)
        
        
        for sl in V.keys():
            Ps[t][sl] = sum(convert_to_array(T[sl][a]) * convert_to_array(Ps[t-1]))

        PG[t] = sum([Ps[t][sl] for sl in Ps[t] if sl == s])
        C[t] = C[t-1] + discount_factor**(t-1)
        Vc[t] = (Vc[t-1] * PG[t-1] + C[t] * (PG[t]- PG[t-1])) / PG[t]

        print(f"""
            Vc[t-1]: {Vc[t-1]: }
            PG[t-1]: {PG[t-1]:}
            PG[t]: {PG[t]:}
            C[t]: {C[t]:}

            (PG[t]- PG[t-1]): {(PG[t]- PG[t-1])}
            Vc[t]: {Vc[t]}

            ---

            Ps: {Ps}
            PG: {PG}
            C: {C}
            Vc: {Vc}
            CVAR: {CVAR}
        """)

        if not _quiet: print('-- ~~ >>')
        
        _ += 1
    

    if not _quiet: print('Max Iterations: ', _)

[0] Calculando Estado.

            Vc[t-1]:  0
            PG[t-1]: 0
            PG[t]: 0.5
            C[t]: 1

            (PG[t]- PG[t-1]): 0.5
            Vc[t]: 1.0

            ---

            Ps: {0: {0: 1, 'sG': 0}, 1: {0: 0.5, 'sG': 0}}
            PG: {0: 0, 1: 0.5}
            C: {0: 0, 1: 1}
            Vc: {0: 0, 1: 1.0}
            CVAR: {0: {1: 5.0}}
        
-- ~~ >>

            Vc[t-1]:  1.0
            PG[t-1]: 0.5
            PG[t]: 0.25
            C[t]: 2

            (PG[t]- PG[t-1]): -0.25
            Vc[t]: 0.0

            ---

            Ps: {0: {0: 1, 'sG': 0}, 1: {0: 0.5, 'sG': 0}, 2: {0: 0.25, 'sG': 0.0}}
            PG: {0: 0, 1: 0.5, 2: 0.25}
            C: {0: 0, 1: 1, 2: 2}
            Vc: {0: 0, 1: 1.0, 2: 0.0}
            CVAR: {0: {1: 5.0, 0.5: 9.0}}
        
-- ~~ >>

            Vc[t-1]:  0.0
            PG[t-1]: 0.25
            PG[t]: 0.125
            C[t]: 3

            (PG[t]- PG[t-1]): -0.125
            Vc[t]: -3.0

            ---

  