In [None]:
import tensorflow as tf
print(tf.__version__)
from gym import Env
from gym.spaces import Discrete, Box
import numpy as np
import random
import math
import matplotlib.pyplot as plt
from tensorflow.keras.models import load_model
tf.keras.backend.set_floatx('float64')

#implementation for single UAV case with one FR

e = 2.718

Episode_length = 20

#map parameters based on obstacles
alpha = 0.7               
beta = 10
        
#defining the action space
del_theta = 90*3.14/180    
d = 5
    
#measurement noise parameters
muLOS = 0
stdLOS = 0.5
muNLOS = 5
stdNLOS = 5

def gaussianpdf(x, mu, std):
    pdfx = (math.pow(e,(-0.5*math.pow((x-mu)/std,2))))/(std*math.pow(2*3.14,0.5))
    return pdfx  

#maximum likelihood function for state estimotor(Not called in this code)
def maxlf(uavx, uavy, uavz, tx, ty, tz):

    d = math.pow(math.pow((uavx-tx),2) + math.pow((uavy-ty),2) + math.pow((uavz-tz),2),0.5)
    elevation = math.asin((uavz-tz)/d)
    lamda = 1/(1+alpha*math.pow(e,-beta*(elevation-alpha)))
    
    for i in range(tx-uavx-10,tx-uavx+10):
        x=i
        yij = lamda*gaussianpdf(x,tx-uavx+muLOS,stdLOS) + (1-lamda)*gaussianpdf(x,tx-uavx+muNLOS,stdNLOS)
        plt.scatter(i,yij)
    return yij

#function to generate the next states of the target based on action 'a'
def statetransition(stateold,a):
    delK = 1
    alpha0 = 0.95
    
    zerom = np.matrix([[0, 0, 0],[0, 0, 0], [0, 0, 0]])
    phibar = np.matrix([[1, delK, delK*delK/2],[0, 1, delK], [0, 0, alpha0]])
    Lbarv = np.matrix([delK*delK/2, delK, 0])
    
    statenew = np.dot(phibar,stateold.T).T + np.dot(a,Lbarv).T
            
    return statenew

#Prediction of target state(Not called in this code)
def target_pred(xvectorold, yvectorold, zvectorold, l):
    
    target_action = np.zeros((5,3))
    target_action[0] = np.matrix([0, 0, 0])
    target_action[1] = np.matrix([1, 0, 0])
    target_action[2] = np.matrix([-1, 0, 0])
    target_action[3] = np.matrix([0, 1, 0])
    target_action[4] = np.matrix([0, -1, 0])

    lbar = np.zeros(5)
    lbar[0] = 0
    lbar[1] = 1
    lbar[2] = 2
    lbar[3] = 3
    lbar[4] = 4
    
    for j in range(0,5):
        if lbar[j] == l:
            transprob = 0.1
        else:
            transprob = 0.225
        xvectornew = statetransition(xvectorold, target_action[int(lbar[j])][0]).T
        yvectornew = statetransition(yvectorold, target_action[int(lbar[j])][1]).T
        zvectornew = statetransition(zvectorold, target_action[int(lbar[j])][2]).T
           
    return xvectornew, yvectornew, zvectornew, state_prob

class UAV(Env):
    def __init__(self):
        # Actions UAV can take
        self.uav_action_space = Discrete(4)
        self.x1 = 20
        self.y1 = 25
        self.z1 = 25
        
        self.tx = 30
        self.ty = 30
        self.tz = 0
        self.txdot = 0.4
        self.tydot = 0.4
        self.tzdot = 0
        self.txddot = 0
        self.tyddot = 0
        self.tzddot = 0
        
        
       # Set episode length
        self.episode_length = Episode_length
        
    def step(self, action, lamdaprev,i):
        # Apply UAV action
        self.x1 += d*math.cos(action*del_theta)
        self.y1 += d*math.sin(action*del_theta)
        self.z1 += 0
        
        target_action = np.zeros((5,3))
        target_action[0] = np.matrix([0, 0, 0])
        target_action[1] = np.matrix([1, 0, 0])
        target_action[2] = np.matrix([-1, 0, 0])
        target_action[3] = np.matrix([0, 1, 0])
        target_action[4] = np.matrix([0, -1, 0])
       
    
        a = target_action[random.randint(0,4)]
        
              
        txst = np.matrix([self.tx, self.txdot, self.txddot])
        txnext = statetransition(txst, a[0])
        self.tx = txnext[0,0]
        self.txdot = txnext[0,1]%2.5
        self.txddot = txnext[0,2]
               
        tyst = np.matrix([self.ty, self.tydot, self.tyddot])
        tynext = statetransition(tyst, a[1])
        self.ty = tynext[0,0]
        self.tydot = tynext[0,1]%2.5
        self.tyddot = tynext[0,2]
               
        # Reduce episode length by 1 second
        self.episode_length -= 1       
        
        # Calculate reward
        d1 = math.pow(math.pow((self.x1 - self.tx),2) + math.pow((self.y1 - self.ty),2) + math.pow((self.z1 - self.tz),2),0.5)
        elevation1 = math.asin((self.z1 - self.tz)/d1)
        lamda1 = 1/(1+alpha*math.pow(e,-beta*(elevation1-alpha)))
        
        reward = 20*(lamda1-lamdaprev)
        lamda = lamda1
        
         
        # Check if episode is done
        if self.episode_length <= 0: 
            done = True
        else:
            done = False
              
        info = {}
        
        state = np.zeros(6)
        state[0] = self.x1
        state[1] = self.y1
        state[2] = self.z1
        state[3] = self.tx - self.x1
        state[4] = self.ty - self.y1
        state[5] = self.tz - self.z1
       
        # Return step information
        return state, reward, lamda, elevation1,done, info

   
    def reset(self):
        # Reset states
        self.x1 = 20
        self.y1 = 25
        self.z1 = 25
        
        self.tx = 30
        self.ty = 30
        self.tz = 0
        self.txdot = 0.4
        self.tydot = 0.4
        self.tzdot = 0
        self.txddot = 0
        self.tyddot = 0
        self.tzddot = 0
        
        # Reset episode time and states
        self.episode_length = Episode_length 
        done = False
        
        state = np.zeros(6)
        state[0] = self.x1
        state[1] = self.y1
        state[2] = self.z1
        state[3] = self.tx - self.x1
        state[4] = self.ty - self.y1
        state[5] = self.tz - self.z1

        return state

#Duel Deep Q-learning architecture
class DDQN(tf.keras.Model):
    def __init__(self):
        super(DDQN, self).__init__()
        self.d1 = tf.keras.layers.Dense(50, activation='relu')
        self.d2 = tf.keras.layers.Dense(50, activation='relu')
        self.d3 = tf.keras.layers.Dense(50, activation='relu')
        self.v = tf.keras.layers.Dense(1, activation=None)
        self.a = tf.keras.layers.Dense(4, activation=None)

    def call(self, input_data):
        x = self.d1(input_data)
        x = self.d2(x)
        x = self.d3(x)
        v = self.v(x)
        a = self.a(x)
        Q = v +(a -tf.math.reduce_mean(a, axis=1, keepdims=True))
        return Q

    def advantage(self, state):
        x = self.d1(state)
        x = self.d2(x)
        x = self.d3(x)
        a = self.a(x)
        return a

#Used to fill and sample tuples for training
class exp_replay():
    def __init__(self, buffer_size= 5000):
        self.buffer_size = buffer_size
        self.state_mem = np.zeros((self.buffer_size, 6), dtype=np.float32)
        self.action_mem = np.zeros((self.buffer_size), dtype=np.int32)
        self.reward_mem = np.zeros((self.buffer_size), dtype=np.float32)
        self.next_state_mem = np.zeros((self.buffer_size, 6), dtype=np.float32)
        self.done_mem = np.zeros((self.buffer_size), dtype=np.bool)
        self.pointer = 0

    def add_exp(self, state, action, reward, next_state, done):
        idx  = self.pointer % self.buffer_size 
        self.state_mem[idx] = state
        self.action_mem[idx] = action
        self.reward_mem[idx] = reward
        self.next_state_mem[idx] = next_state
        self.done_mem[idx] = 1 - int(done)
        self.pointer += 1

    def sample_exp(self, batch_size= 128):
        max_mem = min(self.pointer, self.buffer_size)
        batch = np.random.choice(max_mem, batch_size, replace=False)
        states = self.state_mem[batch]
        actions = self.action_mem[batch]
        rewards = self.reward_mem[batch]
        next_states = self.next_state_mem[batch]
        dones = self.done_mem[batch]
        return states, actions, rewards, next_states, dones
    
#Initializes and invokes the necessary functions for training and testing    
class agent():
        def __init__(self, eps = 1.0, gamma=0.99, replace=100, lr=0.001):
            self.gamma = gamma
            self.epsilon = eps
            self.min_epsilon = 0.0001
            self.epsilon_decay = 5e-5
            self.replace = replace
            self.trainstep = 0
            self.memory = exp_replay()
            self.batch_size = 128
            self.q_net = DDQN()
            self.target_net = DDQN()
            opt = tf.keras.optimizers.Adam(learning_rate=lr)
            self.q_net.compile(loss='mse', optimizer=opt)
            self.target_net.compile(loss='mse', optimizer=opt)

        #Implemetation of the epsilon-greedy strategy
        def act(self, state, test = 0):
            if test ==1:
                actions = self.q_net.advantage(np.array([state]))
                action = np.argmax(actions)
                return action
            else:
                if np.random.rand() <= self.epsilon:
                   return np.random.choice([i for i in range(0,4)])

                else:
                   actions = self.q_net.advantage(np.array([state]))
                   action = np.argmax(actions)
                   return action


        def update_mem(self, state, action, reward, next_state, done):
            self.memory.add_exp(state, action, reward, next_state, done)

        #target function is updated every 100 iterations
        def update_target(self):
            self.target_net.set_weights(self.q_net.get_weights())     

        def update_epsilon(self):
            self.epsilon = self.epsilon - self.epsilon_decay if self.epsilon > self.min_epsilon else self.min_epsilon
            return self.epsilon

          
        def train(self):
            if self.memory.pointer < self.batch_size:
                return 
          
            if self.trainstep % self.replace == 0:
                self.update_target()
            states, actions, rewards, next_states, dones = self.memory.sample_exp(self.batch_size)
            target = self.q_net.predict(states)
            next_state_val = self.target_net.predict(next_states)
            max_action = np.argmax(self.q_net.predict(next_states), axis=1)
            batch_index = np.arange(self.batch_size, dtype=np.int32)
            q_target = np.copy(target)
            q_target[batch_index, actions] = rewards + self.gamma * next_state_val[batch_index, max_action]*dones
            self.q_net.train_on_batch(states, q_target)
            self.update_epsilon()
            self.trainstep += 1

#Code block to train the agent for 10000 episodes
uav = UAV()
CRLBprev = 0

K=Episode_length
steps = 10000
rewardplot = np.zeros(10000)
agents = agent()
for s in range(0,steps):
    done = False
    state = uav.reset()
    total_reward = 0
    CRLBprev = 0
    for i in range(0,K):
          action = agents.act(state)
          next_state, reward, CRLBprev, done, _ = uav.step(action, CRLBprev,i)
          agents.update_mem(state, action, reward, next_state, done)
          agents.train()
          
          state = next_state
          total_reward += reward
    
    rewardplot[s] = total_reward
    if done:
       print("total reward after {} episode is {} and epsilon is {}".format(s, total_reward, agents.epsilon))

In [None]:
#Code block to test the agent

uavtest = UAV()
lamdaprev = 0

K=Episode_length

agenttest1 = agent()

#loading the trained model saved as \tmp\DRL_single_UAV_q_net_lamda_10000
agenttest1.q_net = tf.keras.models.load_model('DRL_single_UAV_q_net_lamda_10000')
                                                                                 
xuav1 = np.zeros(K)
yuav1 = np.zeros(K)
zuav1 = np.zeros(K)
t_x = np.zeros(K)
t_y = np.zeros(K)
t_z = np.zeros(K)
lamdaplot = np.zeros(K)
elplot = np.zeros(K)

done = False
state = uavtest.reset()
total_reward = 0
for i in range(0,K):
      xuav1[i] = state[0]
      yuav1[i] = state[1]
      zuav1[i] = state[2]
      t_x[i] = state[3] + state[0] 
      t_y[i] = state[4] + state[1]
      t_z[i] = state[5] + state[2]

      x = agenttest1.q_net.d1(np.array([state]))
      x = agenttest1.q_net.d2(x)
      x = agenttest1.q_net.d3(x)
      actions = agenttest1.q_net.a(x)
      action = np.argmax(actions)

      next_state, reward, lamdaprev, elplot[i], done, _ = uavtest.step(action, lamdaprev,i)
      state = next_state
      total_reward += reward
    
      lamdaplot[i] = lamdaprev
      
    
if done:
   print("total reward after {} episode is {} and epsilon is {}".format(1, total_reward, agenttest1.epsilon))