In [1]:
import numpy as np
import random
import matplotlib.pyplot as plt
import scipy.special

In [37]:
N_CHANNEL_STATES = 2
TAU_LIMIT = 9
DISCOUNT_FACTOR = 0.9
TRANSMITED_POWER = 10
WEIGHING_FACTOR = 1

In [38]:
class Environment:
    def __init__(self) -> None:
        self.time = 1
        self.A = np.array([[  1,  0, .5,  0],
                             [  0,  1,  0, .5],
                             [  0,  0,  1,  0],
                             [  0,  0,  0,  1]])
        self.Q = np.eye(4) * 2
        self.C = np.array([[ 1.2,   0,   0,   0],
                           [   0, 1.2,   0,   0]])
        self.R = np.eye(2) * 0.6
        self.sigma_2 = 8 # Noise power
        self.packet_length = 10 
        self.P_bar = 0.6613
        self.zeta = TRANSMITED_POWER # Transmitted energy

        self.state = random.randint(0, N_CHANNEL_STATES*(TAU_LIMIT + 1))

        self.channel_gains = np.array([1, 1.5])
        self.eta = np.array([[ .6, .4],
                             [.45,.55]])
        self.h_tau_P = []
        self.h_tau_P.append(np.array([self.P_bar]))
        self.h_tau_P.append(self.A @ self.A.T * self.P_bar)
        for i in range(9):
            self.h_tau_P.append(self.A @ self.h_tau_P[-1] @ self.A.T)
        # print(self.h_tau_P)
        # print(self.channel_gains)

        self.p_js = np.zeros(N_CHANNEL_STATES)
        for j in range(N_CHANNEL_STATES):
            SNR = self.channel_gains[j] * self.zeta / self.sigma_2
            self.p_js[j] = self.f(SNR)
 
    def robot_step(self, action):
        tau = self.state // N_CHANNEL_STATES
        h = self.state % N_CHANNEL_STATES

        self.time += 1
        self.next_state(action)

        return (np.trace(self.h_tau_P[tau]) if tau > 0 else self.h_tau_P[0]) + action * WEIGHING_FACTOR * self.zeta

    def f(self, SNR):   # Calculates the probability of successful transmission based on SNR
        sqrt_SNR = np.sqrt(SNR * 2)
        I = scipy.special.erf(sqrt_SNR)
        
        return I**self.packet_length

    def next_state(self, action):
        tau = self.state // N_CHANNEL_STATES
        h = self.state % N_CHANNEL_STATES

        new_h = random.choices(population=range(N_CHANNEL_STATES), weights=self.eta[h, :], k=1)[0]
        new_tau = tau + 1
        if action == 1:
            r = random.uniform(0,1)
            if r < self.p_js[h]:
                new_tau = 0
        
        if new_tau > TAU_LIMIT: # Ceiling functionality
            new_tau = TAU_LIMIT
        
        self.state = (new_tau * N_CHANNEL_STATES) + new_h
        
    def reset(self):
        self.time = 1
        self.state = random.randint(0, N_CHANNEL_STATES - 1)

    def reset_to_explore_starts(self):
        self.time = 1
        self.state = random.randint(0, N_CHANNEL_STATES * (TAU_LIMIT + 1) - 1)

    def evaluate_policy(self, policy):
        runs = 100
        length_of_episode = 200
        discount = 1
        reward = 0
        for run in range(runs):
            self.reset()
            discount = 1
            for t in range(length_of_episode):
                reward += self.robot_step(policy[self.state]) * discount
                discount *= DISCOUNT_FACTOR
                # print(reward, discount)
        return reward / (runs)

In [43]:
class MDP:
    def __init__(self) -> None:
        self.policy = np.array(np.random.rand(N_CHANNEL_STATES * (TAU_LIMIT + 1)) > 0.5, dtype=np.int32)
            # This is initially a random policy, which will be finalized after values are calculated
        self.estimated_costs = np.ones(shape=(N_CHANNEL_STATES * (TAU_LIMIT + 1), 2)) * 1000 
            # This is to understand the estimated cost of taking an action in a given state, 
            # which is initialized to a very high value to prevent usage of unexplored states
        self.count_of_transitions = np.zeros(shape=(2, N_CHANNEL_STATES * (TAU_LIMIT + 1), N_CHANNEL_STATES * (TAU_LIMIT + 1)))
            # This is the count of transitions from state s to state s' given action a
        self.estimate_transitions = np.zeros(shape=(2, N_CHANNEL_STATES * (TAU_LIMIT + 1), N_CHANNEL_STATES * (TAU_LIMIT + 1)))
            # This is the normalized probability of the transition s -> s' given action a
        self.estimate_values = np.zeros(N_CHANNEL_STATES * (TAU_LIMIT + 1))
            # Once we calculate the transitions, this is our current estimate of the value of each state
        self.n_states = N_CHANNEL_STATES * (TAU_LIMIT + 1)
        
    def calculate_transitions(self, env):
        env.reset_to_explore_starts()
        for run in range(1000):
            # Exploring starts with any state starting
            env.reset_to_explore_starts()
            # To generate episodes
            for t in range(100):
                # Interact with the environment
                s = env.state
                a = 1 if run > 500 else 0
                r = env.robot_step(action=a)
                s_dash = env.state

                # If this is the first exploration of the state, update the estimated cost :
                if self.count_of_transitions[a, s, s_dash] == 0:
                    self.estimated_costs[s, a] = r

                # Update counts:
                self.count_of_transitions[a, s, s_dash] += 1
        
        # Normalize the counts to get transition probabilities:
        for s in range(N_CHANNEL_STATES * (TAU_LIMIT + 1)):
            for a in range(2):
                if np.sum(self.count_of_transitions[a, s, :]) != 0:
                    self.estimate_transitions[a, s, :] = self.count_of_transitions[a, s, :] / np.sum(self.count_of_transitions[a, s, :])
        # print(self.estimate_transitions)
        # print(self.estimated_costs)

    
    def value_iter(self):
        count = 0
        for i in range(10):
            count += 1
            opt1 = self.estimated_costs[:, 0] + DISCOUNT_FACTOR * np.reshape(self.estimate_transitions[0, :, :], newshape=(self.n_states, self.n_states)) @ self.estimate_values
                # Estimated value of choosing to not transmit (action = 0)
            opt2 = self.estimated_costs[:, 1] + DISCOUNT_FACTOR * np.reshape(self.estimate_transitions[1, :, :], newshape=(self.n_states, self.n_states)) @ self.estimate_values
                # Estimated value of choosing to transmit (action = 1)
            
            self.policy = np.argmin(np.array([opt1, opt2]), axis=0)

            new_vals = np.min(np.array([opt1, opt2]), axis=0)

            # print(new_vals)

            if np.linalg.norm(new_vals - self.estimate_values) < 1:
                break
            self.estimate_values = new_vals
        # print(count)

    
    def calculate_transitions_2(self, env):
        self.new_transitions = np.zeros(shape=(2, N_CHANNEL_STATES * (TAU_LIMIT + 1), N_CHANNEL_STATES * (TAU_LIMIT + 1)))
        for s in range(self.n_states - N_CHANNEL_STATES):
            tau = s // N_CHANNEL_STATES
            h = s % N_CHANNEL_STATES

            new_tau = (tau + 1)
            for new_h in range(2):
                new_state_0 = (new_tau * N_CHANNEL_STATES) + new_h
                new_state_1 = (tau * N_CHANNEL_STATES) + new_h
                if new_state_0 < self.n_states:
                    self.new_transitions[0, s, new_state_0] = env.eta[h, new_h]
                    self.new_transitions[1, s, new_state_0] = env.eta[h, new_h] * (1 - env.p_js[h])
                self.new_transitions[1, s, new_state_1] = env.eta[h, new_h] * env.p_js[h]
            c = np.trace(env.h_tau_P[tau]) if tau > 0 else env.h_tau_P[0]
            self.estimated_costs[s, 0] = c
            self.estimated_costs[s, 1] = c + env.zeta
        
        
        for s in range(self.n_states - N_CHANNEL_STATES, self.n_states):
            tau = s // N_CHANNEL_STATES
            h = s % N_CHANNEL_STATES
            for new_h in range(2):
                new_state = (tau * N_CHANNEL_STATES) + new_h
                self.new_transitions[1, s, new_state] = env.eta[h, new_h] * env.p_js[h]
            c = np.trace(env.h_tau_P[tau])  if tau > 0 else env.h_tau_P[0]
            self.estimated_costs[s, 1] = c + env.zeta
        print(self.new_transitions)

    def value_iter_2(self):
        count = 0
        for i in range(10000):
            count += 1
            opt1 = self.estimated_costs[:, 0] + DISCOUNT_FACTOR * np.reshape(self.new_transitions[0, :, :], newshape=(self.n_states, self.n_states)) @ self.estimate_values
                # Estimated value of choosing to not transmit (action = 0)
            opt2 = self.estimated_costs[:, 1] + DISCOUNT_FACTOR * np.reshape(self.new_transitions[1, :, :], newshape=(self.n_states, self.n_states)) @ self.estimate_values
                # Estimated value of choosing to transmit (action = 1)
            
            self.policy = np.argmin(np.array([opt1, opt2]), axis=0)

            new_vals = np.min(np.array([opt1, opt2]), axis=0)

            if np.linalg.norm(new_vals - self.estimate_values) < 1:
                break
            self.estimate_values = new_vals

    
    def print_policy(self):
        print(np.reshape(self.policy, newshape=(N_CHANNEL_STATES, TAU_LIMIT + 1)))

In [44]:
env = Environment()
p_MDP = MDP()
p_MDP.calculate_transitions(env)
p_MDP.value_iter()
p_MDP.print_policy()

[[0 0 0 0 0 0 1 1 1 1]
 [1 1 1 1 1 1 1 1 1 1]]
