In [11]:
import os 
import gym
import torch
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.common.evaluation import evaluate_policy
import math
import numpy as np
from gym import spaces
import pygame
from gym.spaces import Tuple

In [3]:
def calc_distance(pos1, pos2):
    """
    Calculates the distance between two positions using the Euclidean distance formula.
    """
    x1, y1 = pos1
    x2, y2 = pos2
    return math.sqrt((x2 - x1)**2 + (y2 - y1)**2)
def calculate_signal_strength(agent, basestation):
    base_station_params = {
            (3, 3):(20, 2, 4),
            (15, 3):(20, 2, 4),
            (27, 3):(20, 2, 4),
            (3, 15):(10, 3, 3),
            (15, 15):(10, 3, 3),
            (27, 15):(10, 3, 3),
            (3, 27):(15, 1, 5),
            (15, 27):(15, 1, 5),
            (27, 27):(15, 1, 5),
            (3, 39):(15, 1, 5),
            (15, 39):(15, 1, 5),
            (27, 39):(15, 1, 5),
            (3, 51):(15, 1, 5),
            (15, 51):(15, 1, 5),
            (27, 51):(5, 1, 5),
            
        }
            
     # calculate the signal strength using this formula
    Pt, Gt, Gr = base_station_params[basestation]
    d = calc_distance(agent, basestation)
    return Pt*Gt*Gr/((d+1)**2)

In [4]:
class GridEnvironmentdqn(gym.Env):
    
    def __init__(self, ranges):
        """
        Initializes the environment with a grid size, base station positions, and ranges.
        """
        self.grid_size = (40, 60)
        self.base_stations=  [(3, 3),(15,3),(27,3), (3, 15),(15,15),(27,15), (3, 27),(15,27),(27,27), (3, 39),(15,39),(27,39), (3, 51), (15, 51), (27, 51)]
        self.ranges = ranges
        
        self.agent_position = (20, 30)# initial position of the agent
        self.previous_action = None # previous base station selected by the agent
        self.new_signal_strength = 0
        self.SINR = 0
        self.selection_space = [bs for i, (bs, r) in enumerate(zip(self.base_stations, self.ranges)) if calc_distance(self.agent_position, bs) <= r]
        self.new_action = random.choice(self.selection_space)
        self.action_space = gym.spaces.Discrete(8)
        self.handover_num = 0        
        self.basestation_selection = random.choice(self.selection_space)
        self.state_space = np.concatenate((np.array(self.agent_position),np.array(self.basestation_selection),np.array(self.new_signal_strength),np.array(self.SINR),np.array(self.handover_num)),axis=None)
        self.observation_space = spaces.Box(low=0, high=60, shape=(self.state_space.shape[0],), dtype=np.float32)
        self.reward = 0
        self.frequency = 28e9
        
        self.base_station_params = {
            (3, 3):(20, 2, 4),
            (15, 3):(20, 2, 4),
            (27, 3):(20, 2, 4),
            (3, 15):(10, 3, 3),
            (15, 15):(10, 3, 3),
            (27, 15):(10, 3, 3),
            (3, 27):(15, 1, 5),
            (15, 27):(15, 1, 5),
            (27, 27):(15, 1, 5),
            (3, 39):(15, 1, 5),
            (15, 39):(15, 1, 5),
            (27, 39):(15, 1, 5),
            (3, 51):(15, 1, 5),
            (15, 51):(15, 1, 5),
            (27, 51):(5, 1, 5),
            
        }
    
    def reset(self):
        """
        Resets the environment and returns the initial state.
        """
        self.done = False
#         x = random.randint(0, 39)
#         y = random.randint(0, 59)
        self.agent_position = (20, 30) # set the agent position to a random position on the map
        self.previous_action = None # reset the previous action to None
        self.basestation_selection = random.choice(self.selection_space)
        self.new_signal_strength = self.calculate_signal_strength(self.new_action)
        self.SINR = self.calculate_SINR(self.new_action)
        self.handover_num = 0
        self.reward = 0
        self.state_space = np.concatenate((np.array(self.agent_position),np.array(self.basestation_selection),np.array(self.new_signal_strength),np.array(self.SINR),np.array(self.handover_num)),axis=None)
        self.selection_space = [bs for i, (bs, r) in enumerate(zip(self.base_stations, self.ranges)) if calc_distance(self.agent_position, bs) <= r]
        return self.state_space

    def take_action(self, action):
        # update the bs selection
        
        self.basestation_selection = action
        # choose a random direction
        direction = random.choice(["up", "down", "left", "right"])

        # update the agent position based on the chosen direction
        if direction == "up":
            self.agent_position = (self.agent_position[0] - 1, self.agent_position[1])
        elif direction == "down":
            self.agent_position = (self.agent_position[0] + 1, self.agent_position[1])
        elif direction == "left":
            self.agent_position = (self.agent_position[0], self.agent_position[1] - 1)
        elif direction == "right":
            self.agent_position = (self.agent_position[0], self.agent_position[1] + 1)
        # check if the agent has gone outside the map
        
        if self.agent_position[0] <0 or self.agent_position[0] >40 or self.agent_position[1] < 0 or self.agent_position[1] > 60:
            self.reset()

        # update the action space to only contain the base stations within range
        self.selection_space = [bs for i, (bs, r) in enumerate(zip(self.base_stations, self.ranges)) if calc_distance(self.agent_position, bs) <= r]
#         self.state_space = np.concatenate((np.array(self.agent_position),np.array(self.basestation_selection),np.array(self.new_signal_strength),np.array(self.SINR)),axis=None)
                              
    
    def calculate_signal_strength(self, action):
        
         # calculate the signal strength using this formula
        Pt, Gt, Gr = self.base_station_params[self.new_action]
        d = calc_distance(self.agent_position, self.new_action)
        return Pt*Gt*Gr/((d+1)**2)
        
    def calculate_SINR(self, action):
        signal_power = self.new_signal_strength
        interference_power = 0
        noise_power = -174
        signal_distance = calc_distance(self.agent_position, action)
        for bs,(Pt, Gt, Gr) in self.base_station_params.items():
            if bs != action:
                interference_distance = calc_distance(self.agent_position, bs)
                interference_power += Pt + Gt + Gr-(10*math.log10(interference_distance+1))
                                        
        self.SINR = signal_power - (interference_power + noise_power)
        return self.SINR  
    
    def get_reward(self, action):
        
        
        # check if there was a handover
#         if self.previous_action is not None and self.previous_action != self.basestation_selection:
#         self.previous_action = self.basestation_selection
        threshold = 90
        if self.previous_action is not None and self.previous_action != action:
            #handover happens
            self.reward -= 0
            self.handover_num = 1
            if self.new_signal_strength > self.state_space[4]:
                self.reward += 3
                if self.SINR > threshold:
                    self.reward += 6
        else: 
            self.handover_num = 0
            self.reward += 10
            
            if self.new_signal_strength > self.state_space[4]:
                
                self.reward += 3
                if self.SINR > threshold:
                    self.reward += 6
        self.basestation_selection = action
        return self.reward

    def step(self, action):
        self.reward = 0
        
        if action == 0:
            self.new_action = self.base_stations[0]
        elif action == 1:
            self.new_action = self.base_stations[1]
        elif action == 2:
            self.new_action = self.base_stations[2]
        elif action == 3:
            self.new_action = self.base_stations[3]
        elif action == 4:
            self.new_action = self.base_stations[4]
        elif action == 5:
            self.new_action = self.base_stations[5]
        elif action == 6:
            self.new_action = self.base_stations[6]
        elif action == 7:
            self.new_action = self.base_stations[7]
        elif action == 8:
            self.new_action = self.base_stations[8]
        elif action == 9:
            self.new_action = self.base_stations[9]
        elif action == 10:
            self.new_action = self.base_stations[10]
        elif action == 11:
            self.new_action = self.base_stations[11]
        elif action == 12:
            self.new_action = self.base_stations[12]
        elif action == 13:
            self.new_action = self.base_stations[13]
        elif action == 14:
            self.new_action = self.base_stations[14]
            
        if self.new_action not in self.selection_space:
            self.reset()
            self.done = True
            self.reward -= 1000
#             return self.state_space, self.reward, self.done, {}
        if self.agent_position[0] <0 or self.agent_position[0] >40 or self.agent_position[1] < 0 or self.agent_position[1] > 60:
            self.reset()
            return self.state_space, -1,True, {}
        self.take_action(self.new_action)
        # take action and get the current state and done flag
        self.new_signal_strength = self.calculate_signal_strength(self.new_action)
        self.SINR = self.calculate_SINR(self.new_action)
#         self.state_space = np.concatenate((np.array(self.agent_position),np.array(self.basestation_selection),np.array(self.new_signal_strength),np.array(self.SINR)),axis=None)
        # get the reward
        
        reward = self.get_reward(self.new_action)
        
        # update previous action
        self.previous_action = self.basestation_selection
        # update the signal strength
        
        self.state_space = np.concatenate((np.array(self.agent_position),np.array(self.basestation_selection),np.array(self.new_signal_strength),np.array(self.SINR),np.array(self.handover_num)),axis=None)
        observation = np.concatenate((np.array(self.agent_position),np.array(self.basestation_selection),np.array(self.new_signal_strength),np.array(self.SINR),np.array(self.handover_num)),axis=None)
        
        # render the environment
        self.render()

        # return the state, reward, done flag, and an empty dictionary
        return observation, reward, self.done, {}


    def render(self, mode='human'):
        # initialize Pygame
        pygame.init()
        # set the window size
        screen = pygame.display.set_mode((400, 600))
        # set the window title
        pygame.display.set_caption("Grid Environment")
        # set the background color to white
        screen.fill((255, 255, 255))
        # draw the grid
        for i in range(41):
            pygame.draw.line(screen, (0, 0, 0), (i*10, 0), (i*10, 600))
        for i in range(61):
            pygame.draw.line(screen, (0, 0, 0), (0, i*10), (400, i*10))
        # draw the base stations
        for bs in self.base_stations:
            pygame.draw.circle(screen, (0, 0, 255), (bs[0]*10+5, bs[1]*10+5), 5)
        
        # draw the agent
        pygame.draw.circle(screen, (255, 0, 0), (self.agent_position[0]*10+5, self.agent_position[1]*10+5), 5)
        # display the action space
        font = pygame.font.Font(None, 36)
        text = font.render(f"Action Space: {self.selection_space}", True, (0, 0, 0))
        screen.blit(text, (10, 10))
        # display the agent position
        text = font.render(f"Agent Position: {self.agent_position}", True, (0, 0, 0))
        screen.blit(text, (10, 50))
        # display the accumulated reward
        text = font.render(f"Reward: {self.reward}", True, (0, 0, 0))
        screen.blit(text, (10, 90))
        # update the screen
        if mode == 'human':
          # Update the display
          pygame.display.flip()
        elif mode == 'rgb_array':
          # Return the rendered image as a NumPy array
          return np.array(pygame.surfarray.array3d(screen))


        
    def close(self):
        # Close the pygame window
        pygame.display.quit()

        # Shut down pygame
        pygame.quit()

In [5]:
import os 
import gym
import torch
import random
from stable_baselines3 import PPO
from stable_baselines3 import DQN
from stable_baselines3 import A2C
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.common.evaluation import evaluate_policy
env = GridEnvironmentdqn([15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15,15])
env = DummyVecEnv([lambda: env])
log_path = os.path.join("Training2", "Logs")
import tensorflow as tf

writer = tf.summary.create_file_writer(log_path)

In [6]:
DQN_path = os.path.join('Training', 'Saved Models', 'DQN_Model_300000')

In [7]:
model = DQN('MlpPolicy', env, verbose=1, tensorboard_log = log_path,batch_size=2048,learning_starts=25000,)

Using cuda device


In [214]:
DQN_path = os.path.join('Training', 'Saved Models', 'DQN_Model_300000with2048')
model = DQN.load(DQN_path, env= env)

In [None]:
model.learn(total_timesteps=300000)

In [217]:
model.save(DQN_path)

In [10]:
env.close()

# PPO training

In [190]:
PPO_path = os.path.join('Training', 'Saved Models', 'PPO_Model_300000')

In [191]:
model = PPO('MlpPolicy', env, verbose=1, tensorboard_log = log_path,n_steps = 2048, learning_rate=0.0003, )

Using cuda device


In [138]:
model = PPO.load(PPO_path, env=env)

In [None]:
model.learn(total_timesteps=300000)

In [193]:
model.save(PPO_path)

In [None]:
env.close()

# a2c training

In [186]:
A2C_path = os.path.join('Training', 'Saved Models', 'A2C_Model_300000')

In [15]:
model = A2C("MlpPolicy", env, verbose=1,tensorboard_log = log_path,n_steps= 2048, learning_rate = 0.0005, gamma=0.9)

Using cuda device


In [None]:
model = A2C.load(A2C_path,env=env)

In [188]:
model.learn(total_timesteps=300000)

Logging to Training2\Logs\A2C_13


<stable_baselines3.a2c.a2c.A2C at 0x247fa3f4d30>

In [189]:
model.save(A2C_path)