In [None]:
import numpy as np
from tqdm import tqdm

class FANET:
    def __init__(self, size):
        self.size = size
        self.map_point = {
            'hazard 1': {'start': (10, 0, 0), 'end': (40, 20, 50), 'reward' : -10},
            'hazard 2': {'start': (60, 60, 0), 'end': (95, 95, 80), 'reward' : -10},
            'base station' : {'start': (0, 0, 0), 'end': (1, 1, 1), 'reward' : 0},
            'terminal zone' : {'start' : (90, 90, 90), 'end': (100, 100, 100), 'reward' : 1}
        }
        
        self.grid_map = self.map()
        self.signal_map = self.signal_grid()
        
        self.reset()
    
    def set_start_zone(self):
        x = np.random.randint(3, self.size[0] - 30)
        y = np.random.randint(3, self.size[1] - 30)
        z = 0
    
        if self.grid_map[x, y, z] not in [-10, 1]:
            return (x, y, z)
        else:
            return self.set_start_zone()

    def reset(self):
        self.state = self.set_start_zone()
   
    def map(self):  # reward mapping
        grid = -1 * np.ones(self.size, dtype=np.float32)
        
        for zone in self.map_point.values():
            start = zone['start']
            end = zone['end']
            reward = zone['reward']
            grid[start[0]:end[0]+1, start[1]:end[1]+1, start[2]:end[2]+1] = reward
        
        return grid
    
    def distance_from_BS(self, x, y, z):    # 현재 state에서의 거리율에 따른 FSPL을 계산 용도 -> 기지국에서 지정 Cell 까지의 거리 계산 및 그 이후의 FSPL을 계산하는 용도로 변환해야함
        bx1, by1, bz1 = self.map_point['base station']['start']
        bx2, by2, bz2 = self.map_point['base station']['end']
        
        '''
        dx = max(abs(x - bx1), abs(x - bx2))
        dy = max(abs(y - by1), abs(y - by2))
        dz = max(abs(z - bz1), abs(z - bz2))
        '''

        # 기지국의 중심좌표를 이용하여 계산
        dx = abs(x - (bx1 + bx2) / 2)
        dy = abs(y - (by1 + by2) / 2)
        dz = abs(z - (bz1 + bz2) / 2)

        distance = np.sqrt(dx ** 2 + dy ** 2 + dz ** 2)
        
        return distance
        
    def FSPL(self, x, y, z):
        frequency = 2400000000  # 2.4 GHz / 5 GHz
        distance = self.distance_from_BS(x,y,z)

        free_space_path_loss_db = 20 * np.log10(distance) + 20 * np.log10(frequency) - 147.55  
        
        return free_space_path_loss_db
    
    def signal_grid(self):
        signal_grid = np.zeros(self.size, dtype=np.float32)
        '''
        기지국으로 부터 떨어진 거리 Loss를 mapping.
        '''  
        print('Calculating Path Loss...')
        # 거리에 따른 통신감쇄율(dB) Mapping
        for x in tqdm(range(self.size[0])):
            for y in range(self.size[1]):
                for z in range(self.size[2]):
                    signal_grid[x, y, z] = self.FSPL(x, y, z)
             
        #start = self.map_point['base station']['start']
        #end = self.map_point['base station']['end']       
        #signal_grid[start[0]:end[0]+1, start[1]:end[1]+1, start[2]:end[2]+1] = 0   # 기지국 mapping
        
        print('Done!')
        
        return signal_grid
                    
    def reward(self, x, y, z):
        '''Sensor 범위 (+-2) 만큼으로 Conflict 됐을 때의 reward를 계산'''
        if self.grid_map[x+2, y+2, z] == -10 or self.grid_map[x-2, y-2, z]:    
            reward = -10
        elif self.grid_map[x+2, y+2, z] == -10 and self.grid_map[x-2, y-2, z]:
            reward = -20
        elif self.grid_map[x, y, z] == 1:   # Terminal State
            reward = 1
        else:                               # Default Reward
            reward = -1
        
        '''
        if self.grid_map[x, y, z] == -10:
            reward = -10
        elif self.grid_map[x, y, z] == 1:   # Terminal State
            reward = 1
        else:                               # Default Reward
            reward = -1
        '''
        
        return reward
    
    def termianl_reward(self, step_count):
        if step_count <= 100 * 100:
            return 30
        elif step_count <= 100 * 100* 100:
            return -10
        else:
            return -50
    
    def step(self, action):
        x, y, z = self.state
        
        if action == 0 and x < self.size[0] - 3:
            x += 1
        elif action == 1 and x > 3:
            x -= 1
        elif action == 2 and y < self.size[1] - 3:
            y += 1
        elif action == 3 and y > 3:
            y -= 1
        elif action == 4 and z < self.size[2] - 1:
            z += 1
        elif action == 5 and z > 1:
            z -= 1
        
        reward = self.reward(x, y, z)    
        self.state = (x, y, z)
        
        return self.state, reward, self.grid_map[x, y, z] == self.map_point['terminal zone']['reward']

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import time
import os

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

from collections import namedtuple, deque
from tqdm import tqdm

class DQN(nn.Module):
    def __init__(self, input_state, action_size):
        super(DQN, self).__init__() 
        self.layer1 = nn.Linear(input_state, 128)   # input = env.size (100+100+100)
        self.layer2 = nn.Linear(128, action_size)    # output = action (6)
        
    def forward(self, x):
        x = F.relu(self.layer1(x))
        return self.layer2(x)

class DQNAgent:
    def __init__(self, env, gamma, learning_rate, batch_size, replay_buffer_size, target_update_interval):
        self.env = env
        self.gamma = gamma
        self.learning_rate = learning_rate
        self.batch_size = batch_size
        self.replay_buffer_size = replay_buffer_size
        self.target_update_interval = target_update_interval
        
        self.state_size = env.size[0] + env.size[1] + env.size[2]
        self.action_size = 6
        
        self.policy_model_path = 'DQN_policy_network.pth'
        self.policy_network = DQN(self.state_size, self.action_size).cuda()
        
        if os.path.exists(self.policy_model_path):
            self.policy_network.load_state_dict(torch.load(self.policy_model_path))
            self.policy_network.eval()
            print('Policy Model Loaded Successfully.')
        else:
            print('No Model Founded.')
            
        self.target_model = DQN(self.state_size, self.action_size).cuda()
        self.target_model.load_state_dict(self.policy_network.state_dict())
        self.target_model.eval()
        

        
        self.Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward', 'done'))
        self.replay_buffer = deque(maxlen=self.replay_buffer_size)
        
        self.optimizer = optim.Adam(self.policy_network.parameters(), lr=self.learning_rate)
        self.loss_fn = nn.MSELoss()
        
        self.epsilon_start = 1.0
        self.epsilon_end = 0.01
        self.epsilon_decay = 0.995
        self.epsilon = self.epsilon_start
        
    def encode_state(self, state):
        x, y, z = state
        x_onehot, y_onehot, z_onehot = np.zeros(self.env.size[0]), np.zeros(self.env.size[1]), np.zeros(self.env.size[2])
        
        x_onehot[x] = 1
        y_onehot[y] = 1
        z_onehot[z] = 1
        
        return np.concatenate((x_onehot, y_onehot, z_onehot))
    
    def decode_state(self, encoded_state):
        x, y, z = self.env.size
        
        x_encoded = encoded_state[:x]
        y_encoded = encoded_state[x : x+y]
        z_encoded = encoded_state[x+y : x+y+z]
        
        x = np.argmax(x_encoded)
        y = np.argmax(y_encoded)
        z = np.argmax(z_encoded)
        
        return (x, y, z)
    
    def get_action(self, state):
        if np.random.rand() < self.epsilon:
            action = np.random.randint(self.action_size)
        else:
            with torch.no_grad():
                q_values = self.policy_network(torch.tensor(state, dtype = torch.float32).cuda())
                action = torch.argmax(q_values).item()
        return action
    
    def deepQ(self, num_episodes):
        self.num_episodes = num_episodes
        
        # 성능 지표 변수 선언 부분
        step_count = [0] * self.num_episodes
        conflict_count = [0] * self.num_episodes
        path = [0] * self.num_episodes
        time_count = [0] * self.num_episodes
        loss_count = [0] * self.num_episodes
        # 성능 지표 변수 선언 부분
        print('Learning Start! ...')
        
        for episode in tqdm(range(self.num_episodes)):
            start_time = time.time()
            self.env.reset()
            state = self.env.state
            state = self.encode_state(state)
            
            # 성능 지표 변수 선언 부분
            # 처음 위치를 decode로 사용해야 후에 path 경로를 출력할 때 TypeError를 방지함
            decoded_state = self.decode_state(state)    
            path_epi = [decoded_state]
            done = False
            step = 0
            conflict = 0
            # 성능 지표 변수 선언 부분
            
            while not done:
                action = self.get_action(state)
                        
                next_state, reward, done = self.env.step(action)
                
                step += 1
                
                if step % 10000 == 0:
                    print(f'현재 Episode {episode+1}의 진행 수는 {step}입니다.')
                    
                if reward == -10:
                    conflict += 1
                elif reward == -20:
                    conflict += 2
                
                next_state = self.encode_state(next_state)
                
                decode_next_state = self.decode_state(next_state)
                path_epi.append(decode_next_state)
                
                transition = self.Transition(state, action, next_state, reward, done)
                self.replay_buffer.append(transition)
                
                state = next_state
                
                if len(self.replay_buffer) >= self.batch_size:
                    batch = np.random.choice(len(self.replay_buffer), self.batch_size, replace=False)
                    batch = [self.replay_buffer[i] for i in batch]
                    batch_state, batch_action, batch_next_state, batch_reward, batch_done = zip(*batch)
                    
                    batch_state = torch.tensor(np.array(batch_state), dtype=torch.float32).cuda()   
                    batch_action = torch.tensor(np.array(batch_action), dtype=torch.long).unsqueeze(1).cuda()
                    batch_next_state = torch.tensor(np.array(batch_next_state), dtype=torch.float32).cuda()
                    batch_reward = torch.tensor(np.array(batch_reward), dtype=torch.float32).unsqueeze(1).cuda()
                    batch_done = torch.tensor(np.array(batch_done), dtype=torch.float32).unsqueeze(1).cuda()
                    
                    current_q_values = self.policy_network(batch_state).gather(1, batch_action)
                    
                    with torch.no_grad():
                        next_q_values = self.target_model(batch_next_state).max(1)[0].unsqueeze(1)
                    target_q_values = batch_reward + (1 - batch_done) * self.gamma * next_q_values
                    
                    loss = self.loss_fn(current_q_values, target_q_values)
                    
                    self.optimizer.zero_grad()
                    loss.backward()
                    self.optimizer.step()
                    
                if done:
                    break
            
            end_time = time.time()
            step_count[episode] = step
            conflict_count[episode] = conflict
            path[episode] = path_epi
            time_count[episode] = end_time - start_time
            loss_count[episode] = loss
            
            self.epsilon = max(self.epsilon_end, self.epsilon * self.epsilon_decay)
            
            if episode % self.target_update_interval == 0:
                self.target_model.load_state_dict(self.policy_network.state_dict())
                
            
        print('Done!')
        
        print('Model Save...')
        torch.save(self.policy_network.state_dict(), self.policy_model_path)
        print('Model Save Done!')
        
        return step_count, conflict_count, path, time_count, loss_count
        
env = FANET((100, 100, 100))
agent = DQNAgent(env, gamma=0.999, learning_rate=0.0001, batch_size=32, replay_buffer_size=500000, target_update_interval=100)

step_count, conflict_count, path, time_count, loss_count = agent.deepQ(num_episodes=100000)


In [None]:
def visualize_3d(env, path):
    fig = plt.figure()
    ax = fig.add_subplot(111, projection='3d')

    hazard_indices = np.where(env.grid_map == -10)
    terminal_indices = np.where(env.grid_map == 1)

    ax.scatter(*hazard_indices, c='r', marker='s', label='Hazard Zone')
    ax.scatter(*terminal_indices, c='g', marker='s', label='Terminal Zone')

    x, y, z = np.array(path).T
    ax.plot(x, y, z, '>', label='Path')

    ax.set_xlim(-10, env.size[0]+10)
    ax.set_ylim(-10, env.size[1]+10)
    ax.set_zlim(0, env.size[2]+10)

    ax.legend()

    plt.show()
    
def visualize_2d(env, path):
    hazard_indices = np.where(env.grid_map == -10)
    terminal_indices = np.where(env.grid_map == 1)

    plt.scatter(*hazard_indices[:2], c='r', marker='s', label='Hazard Zone')
    plt.scatter(*terminal_indices[:2], c='g', marker='s', label='Terminal Zone')

    x, y, _ = np.array(path).T
    plt.plot(x, y, '-o', label='Path')

    plt.xlim(-10, env.size[0]+10)
    plt.ylim(-10, env.size[1]+10)
    plt.grid(True)

    plt.legend()

    plt.show()
    
# Step Count
plt.plot(step_count)
plt.xlabel('Episode')
plt.ylabel('Steps')
plt.title('Q-Learning')
plt.show() 
        
# Conflict Count
plt.plot(conflict_count, color='orange', label = 'Num of Conflict') 
plt.legend()
plt.show()    

# Path / 마지막 에피소드만 출력!
visualize_3d(env, path[-1])
visualize_2d(env, path[-1])

# Time
plt.plot(time_count)
plt.title("Time Duration")
plt.show()

# Loss -> 오류남 -> 오류 수정 완료
# Torch를 바로 plot 하면 오류, np로 바꿔줌
plt.plot([loss.item() for loss in loss_count])
plt.title("Loss")
plt.show()
