Due Date: December 7th

# Vaccine Development with Dynamic Programming

You are the CEO of a biotech company which is considering the development of a new vaccine. Starting at phase 0 (state 0), the drug develpment can stay in the same state or advance to "phase 1  with promising results" (state 1) or advance to "phase 1 with disappointing results" (state 2), or fail completely (state 4). At phase 1, the drug can stay in the same state, fail or become a success (state 3), in which case you will sell its patent to a big pharma company for \$10 million.
These state transitions happen from month to month, and at each state, you have the option to make an additional investment of \$100,000, which increases the chances of success.

After careful study, your analysts develop the program below to simulate different scenarios using statistical data from similar projects. 

Use a discount factor of 0.996.

- 1) Write a policy iteration algorithm to compute the value of this project. Please print the full V vector.

- 2 )Write a value iteration algorithm to compute the value of this project. Please print the full V vector.

In [1]:
import numpy as np

class MDP():
  def __init__(self):
    self.A = [0, 1] # action matrix (list)
    self.S = [0, 1, 2, 3, 4] # state matrix (list)

    # Transition matrix when we dont invest
    P0 = np.array([[0.5, .15, .15, 0, .20], # staying the same
                   [0, .5, .0, .25, .25],   # promising results
                   [0, 0, .15, .05, .8],    # dissapointing results
                   [0, 0, 0, 0, 1],         # success
                   [0, 0, 0, 0, 1]])        # fail completely
    # Reward when we dont invest
    R0 = np.array([0, 0, 0, 10, 0])

    # Transition matrix when we invest
    P1 = np.array([[0.5, .25, .15, 0, .10], # staying the same
                   [0, .5, .0, .35, .15],   # promising results
                   [0, 0, .20, .05, .75],   # dissapointing results
                   [0, 0, 0, 0, 1],         # success
                   [0, 0, 0, 0, 1]])        # fail completely

    # Reward when we invest
    R1 = np.array([-0.1, -0.1, -0.1, 10, 0]) 

    self.P = [P0, P1] # stacking as lists
    self.R = [R0, R1] # stacking as lists

  # given current state and transition probability, this function 
  # predcits the random choice we make to transition from one state to another.
  def step(self, s, a): # s is current state, a is chosen action
    s_prime = np.random.choice(len(self.S), p=self.P[a][s]) # next random state predicted using the given probability distribution
    R = self.R[a][s] # finds reward function
    if s_prime == 4: # checks if the next choice was failure
      done = True 
    else:
      done = False
    return s_prime, R, done # Returns s_prime(scalar): the next choice we make
                            # Returns R(scalar): rewardwe get we take the next step
                            # returns done(bool): true if the next step was failure 

# function simulating next steps
# takes as input self, s: current state, a: chosen action, π : probabiity matrix which depends on s
  def simulate(self, s, a, π): 
    done = False # Boolean set to false implying not failed
    t = 0 # starts at time 0
    history = [] # list
    while not done: # as long as company didnt fail
      if t > 0: # when time is greater than 0
        a = π[s] # action = transition row(probability) for current state
      s_prime, R, done = self.step(s, a) # takes in inputs
                                         # returns next state(scalar), reward(scalar), failed(bool)
      history.append((s, a, R)) # adding to list current state, chosen action, reward
      s = s_prime # next state becomes current state
      t += 1 # time increments by 1
      print(t)
    
    return history # returning the list containing all choices

You can access the transition probability matrices and the reward vector as follows:

In [2]:
mdp = MDP()
P = mdp.P
R = mdp.R


s = 2 # current state
s_prime = 4  # next state
a = 1  # chosen action

# Probability of transition from state s (2) to s_prime (4) if action == a (1):
print(P[a][s, s_prime])

# Reward at state s if action = a
print(R[a][s])

0.75
-0.1


In [3]:
# rough work
π = [0, 1, 0, 1, 0] # initial policy

res = mdp.simulate(s, a, π)
res

1


[(2, 1, -0.1)]

## 1. Solution for Policy Iteration

In [4]:
import numpy as np

class MDP():
  def __init__(self, γ, π, Vπ, π_vec):
    self.A = [0, 1]          # action matrix (list)
    self.S = [0, 1, 2, 3, 4] # state matrix (list)
    self.γ = γ               # discount function
    self.π = π               # [0, 0, 0, 0, 0] # initial policy
    self.Vπ = Vπ             # initial guess for Vπ
    self.π_vec = π_vec

    π = [0, 0, 0, 0, 0]

    # Transition matrix when we dont invest
    P0 = np.array([[0.5, .15, .15, 0, .20], # staying the same
                   [0, .5, .0, .25, .25],   # promising results
                   [0, 0, .15, .05, .8],    # dissapointing results
                   [0, 0, 0, 0, 1],         # success
                   [0, 0, 0, 0, 1]])        # fail completely
    # Reward when we dont invest
    R0 = np.array([0, 0, 0, 10, 0])

    # Transition matrix when we invest
    P1 = np.array([[0.5, .25, .15, 0, .10], # staying the same
                   [0, .5, .0, .35, .15],   # promising results
                   [0, 0, .20, .05, .75],   # dissapointing results
                   [0, 0, 0, 0, 1],         # success
                   [0, 0, 0, 0, 1]])        # fail completely

    # Reward when we invest
    R1 = np.array([-0.1, -0.1, -0.1, 10, 0]) 

    self.P = [P0, P1] # stacking as lists
    self.R = [R0, R1] # stacking as lists

  # given current state and transition probability, this function 
  # predcits the random choice we make to transition from one state to another.
  def step(self, s, a):                                     # s is current state, a is chosen action
    s_prime = np.random.choice(len(self.S), p=self.P[a][s]) # next random state predicted using the given probability distribution
    R = self.R[a][s]                                        # finds reward function
    if s_prime == 4:                                        # checks if the next choice was failure
      done = True 
    else:
      done = False
    return s_prime, R, done # Returns s_prime(scalar): the next choice we make
                            # Returns R(scalar): rewardwe get we take the next step
                            # returns done(bool): true if the next step was failure 

# function simulating next steps
# takes as input self, s: current state, a: chosen action, π : probabiity matrix which depends on s
  def simulate(self, s, a, π): 
    done = False                         # Boolean set to false implying not failed
    t = 0                                # starts at time 0
    history = []                         # list
    while not done:                      # as long as company didnt fail
      if t > 0:                          # when time is greater than 0
        a = π[s]                         # action = transition row(probability) for current state
      s_prime, R, done = self.step(s, a) # takes in inputs
                                         # returns next state(scalar), reward(scalar), failed(bool)
      history.append((s, a, R))          # adding to list current state, chosen action, reward
      s = s_prime                        # next state becomes current state
      t += 1                             # time increments by 1
      print(t)
    
    return history                       # returning the list containing all choices

  # constructing the implied reward function
  # referred to sample codes
  def construct_Rπ(self, R, π, S):
    Rπ = np.zeros(len(S))
    for s in S:
      Rπ[s] = R[π[s]][s]
    return Rπ

  # constructing the implied transition matrix for all states and actions
  # referred to sample codes
  def construct_Pπ(self, P, π, S):
    Pπ = np.zeros((len(S), len(S)))
    for s in S:
      for s_prime in S:
        Pπ[s, s_prime] = P[π[s]][s, s_prime]
    return Pπ

  # Iterative policy evaluation
  # referred to sample codes
  def policy_evaluation(self, π, Vπ, S, γ):
    Rπ = self.construct_Rπ(R, π, S)
    Pπ = self.construct_Pπ(P, π, S)
    for iteration in range(10000): # 1 step policy iteration is called value iteration. 10000 becomes 10 times or once
      Vπ = Rπ + γ * Pπ @ Vπ
    # print(Vπ)
    return Vπ

  # Policy improvement
  # referred to sample codes
  def policy_improvement(self, Vπ, S, A, γ):
    # Compute Qπ using Vπ
    Qπ = np.zeros((5, 2))
    π_prime = np.zeros(5, dtype=np.int32)
    for s in S:
      for a in A:
        # Qπ is what i expect to get when i am in state s, and i do the action a, and from tmr i'm gonna follow the policy π
        Qπ[s, a] = R[a][s] + γ * P[a][s] @ Vπ # page 15 lec 14 highlighted diff Bellmann eqn
                                              # we know Vπ after policy evaluation

    # Greedy updates
    # referred to sample codes
    for s in S:
      π_prime[s] = np.argmax(Qπ[s, :]) # check diff belman eqn derivation in notes to understand
    return π_prime

  # policy iteration
  def loop(self, π, Vπ, S, A, γ, π_vec, R, P):
    count = 0
    for i in range(10000):
      Vπ = self.policy_evaluation(π, Vπ, S, γ)
      π = self.policy_improvement(Vπ, S, A, γ)
      π_vec.append(π)
      
      if i>2:
        if np.array_equal(π_vec[i],π_vec[i-1]):
          count=count+1
        else:
          count = 0  
      
      if count == 30:

        return Vπ, π_vec

In [5]:
π = [0, 0, 0, 0, 0] # initial policy guess for π
Vπ = np.zeros(5)    # initial Vπ
π_vec = []          # vector that
γ = .996            # discount function

mdp = MDP(γ, π, Vπ, π_vec) # iniatilising class MDP

A = mdp.A   # calling the variables outside th class to call it in the loop function
S = mdp.S
R = mdp.R
P = mdp.P

x, y = mdp.loop(π, Vπ, S, A, γ, π_vec, R, P) # calling loop to perform policy iteration

In [6]:
x # Vπ

array([ 3.32067538,  6.74501992,  0.58546908, 10.        ,  0.        ])


### Above is the V vector for policy iteration

In [7]:
# y # π_vec

## 2. Solution for Value Iteration

In [8]:
import numpy as np

class MDP():
  def __init__(self, γ, π, Vπ, π_vec):
    self.A = [0, 1]          # action matrix (list)
    self.S = [0, 1, 2, 3, 4] # state matrix (list)
    self.γ = γ               # discount function
    self.π = π               # [0, 0, 0, 0, 0] # initial policy
    self.Vπ = Vπ             # initial guess for Vπ
    self.π_vec = π_vec

    π = [0, 0, 0, 0, 0]

    # Transition matrix when we dont invest
    P0 = np.array([[0.5, .15, .15, 0, .20], # staying the same
                   [0, .5, .0, .25, .25],   # promising results
                   [0, 0, .15, .05, .8],    # dissapointing results
                   [0, 0, 0, 0, 1],         # success
                   [0, 0, 0, 0, 1]])        # fail completely
    # Reward when we dont invest
    R0 = np.array([0, 0, 0, 10, 0])

    # Transition matrix when we invest
    P1 = np.array([[0.5, .25, .15, 0, .10], # staying the same
                   [0, .5, .0, .35, .15],   # promising results
                   [0, 0, .20, .05, .75],   # dissapointing results
                   [0, 0, 0, 0, 1],         # success
                   [0, 0, 0, 0, 1]])        # fail completely

    # Reward when we invest
    R1 = np.array([-0.1, -0.1, -0.1, 10, 0]) 

    self.P = [P0, P1] # stacking as lists
    self.R = [R0, R1] # stacking as lists

  # given current state and transition probability, this function 
  # predcits the random choice we make to transition from one state to another.
  def step(self, s, a):                                     # s is current state, a is chosen action
    s_prime = np.random.choice(len(self.S), p=self.P[a][s]) # next random state predicted using the given probability distribution
    R = self.R[a][s]                                        # finds reward function
    if s_prime == 4:                                        # checks if the next choice was failure
      done = True 
    else:
      done = False
    return s_prime, R, done # Returns s_prime(scalar): the next choice we make
                            # Returns R(scalar): rewardwe get we take the next step
                            # returns done(bool): true if the next step was failure 

# function simulating next steps
# takes as input self, s: current state, a: chosen action, π : probabiity matrix which depends on s
  def simulate(self, s, a, π): 
    done = False                         # Boolean set to false implying not failed
    t = 0                                # starts at time 0
    history = []                         # list
    while not done:                      # as long as company didnt fail
      if t > 0:                          # when time is greater than 0
        a = π[s]                         # action = transition row(probability) for current state
      s_prime, R, done = self.step(s, a) # takes in inputs
                                         # returns next state(scalar), reward(scalar), failed(bool)
      history.append((s, a, R))          # adding to list current state, chosen action, reward
      s = s_prime                        # next state becomes current state
      t += 1                             # time increments by 1
      print(t)
    
    return history                       # returning the list containing all choices

  # constructing the implied reward function
  # referred to sample codes
  def construct_Rπ(self, R, π, S):
    Rπ = np.zeros(len(S))
    for s in S:
      Rπ[s] = R[π[s]][s]
    return Rπ

  # constructing the implied transition matrix for all states and actions
  # referred to sample codes
  def construct_Pπ(self, P, π, S):
    Pπ = np.zeros((len(S), len(S)))
    for s in S:
      for s_prime in S:
        Pπ[s, s_prime] = P[π[s]][s, s_prime]
    return Pπ

  # Iterative policy evaluation
  # referred to sample codes
  def policy_evaluation(self, π, Vπ, S, γ):
    Rπ = self.construct_Rπ(R, π, S)
    Pπ = self.construct_Pπ(P, π, S)
    for iteration in range(1): # 1 step policy iteration is called value iteration. 10000 becomes 10 times or once
      Vπ = Rπ + γ * Pπ @ Vπ
    # print(Vπ)
    return Vπ

  # Policy improvement
  # referred to sample codes
  def policy_improvement(self, Vπ, S, A, γ):
    # Compute Qπ using Vπ
    Qπ = np.zeros((5, 2))
    π_prime = np.zeros(5, dtype=np.int32)
    for s in S:
      for a in A:
        # Qπ is what i expect to get when i am in state s, and i do the action a, and from tmr i'm gonna follow the policy π
        Qπ[s, a] = R[a][s] + γ * P[a][s] @ Vπ # page 15 lec 14 highlighted diff Bellmann eqn
                                              # we know Vπ after policy evaluation

    # Greedy updates
    # referred to sample codes
    for s in S:
      π_prime[s] = np.argmax(Qπ[s, :]) # check diff belman eqn derivation in notes to understand
    return π_prime

  # value iteration
  def loop(self, π, Vπ, S, A, γ, π_vec, R, P):
    count = 0
    for i in range(10000):
      Vπ = self.policy_evaluation(π, Vπ, S, γ)
      π = self.policy_improvement(Vπ, S, A, γ)
      π_vec.append(π)
      
      if i>2:
        if np.array_equal(π_vec[i],π_vec[i-1]):
          count=count+1
        else:
          count = 0  
      
      if count == 30:

        return Vπ, π_vec

In [9]:
π = [0, 0, 0, 0, 0] # initial policy guess for π
Vπ = np.zeros(5)    # initial Vπ
π_vec = []          # vector that
γ = .996            # discount function

mdp = MDP(γ, π, Vπ, π_vec) # iniatilising class MDP

A = mdp.A   # calling the variables outside th class to call it in the loop function
S = mdp.S
R = mdp.R
P = mdp.P

x, y = mdp.loop(π, Vπ, S, A, γ, π_vec, R, P) # calling loop to perform policy iteration

In [10]:
x # Vπ

array([ 3.32067536,  6.74501992,  0.58546908, 10.        ,  0.        ])


### Above is the V vector for value iteration

In [11]:
# y # π_vec