<a href="https://colab.research.google.com/github/SawadaSyahmi/5GNRcustomEnv/blob/main/5G_NR_ENV_PPO_MLP_policy_Custom_Environment.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!pip install PPO
!pip install stable-baselines3[extra]
!pip install gym
!pip install keras
!pip install keras-rl2

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting PPO
  Downloading ppo-0.1.1.tar.gz (2.7 kB)
Collecting importlib
  Downloading importlib-1.0.4.zip (7.1 kB)
Building wheels for collected packages: PPO, importlib
  Building wheel for PPO (setup.py) ... [?25l[?25hdone
  Created wheel for PPO: filename=ppo-0.1.1-py3-none-any.whl size=4059 sha256=38b5d0deb8dafc9a836bb723d4747fc226c2cd2d7812b5a417a17393d384c2a2
  Stored in directory: /root/.cache/pip/wheels/13/2f/07/ac1a4817d21cbd164781e1d97ede46593e1c03ca500d3fd349
  Building wheel for importlib (setup.py) ... [?25l[?25hdone
  Created wheel for importlib: filename=importlib-1.0.4-py3-none-any.whl size=5875 sha256=41f42fc4c7f25aab563cb1c813b218cf3130df6349d2da64e6baeeb5b9ede9fd
  Stored in directory: /root/.cache/pip/wheels/86/e4/cb/62b0e9efd7da1e984baec0c0ded0b727a7ed25e1904ed51fca
Successfully built PPO importlib
Installing collected packages: importlib, PPO
Successfully ins

# 1. Import Dependencies

In [3]:
import gym 
from gym import Env
from gym import spaces, logger
from gym.spaces import Discrete, Box, Dict, MultiBinary, MultiDiscrete 
import numpy as np
import random
from gym.utils import seeding
import os
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import VecFrameStack
from stable_baselines3.common.evaluation import evaluate_policy
import math

# 3. Building an Environment

In [4]:
class environment(Env):

########### init method or constructor #############################
    def __init__(self):
        # env config

        self.np_random = None
        self.ULDLratio = 0.9
        self.max_tx_power_interference = 40
        self.subcarrierSpacing1 = 4
        self.subcarrierSpacing2 = 4
        self.txPower = 40
        self.f_c = 28e9  # Hz
        self.cell_radius = 150  # in meters.
        self.inter_site_distance = 3 * self.cell_radius / 2.
        self.FSPL = False
        self.x_bs_1, self.y_bs_1 = 0, 0
        self.x_bs_2, self.y_bs_2 = self.inter_site_distance, 0
        self.guardband = 120

        # RL config
        self.seed(seed=10)
        self.state = None
        self.num_actions = 32
        self.step_count = 0  # which step
        self.reward_min = -20
        self.reward_max = 100
        self.length = 60

        bounds_lower = np.array([
            0.1,
            0,
            1,
            1,
            0,
            1,
            1,
            ])

        bounds_upper = np.array([
            self.ULDLratio,                     
            self.inter_site_distance + self.cell_radius,
            self.txPower,
            self.max_tx_power_interference,
            self.guardband, 
            self.subcarrierSpacing1,
            self.subcarrierSpacing2            
            ])

        self.action_space = spaces.Discrete(self.num_actions)  # action size is here
        self.observation_space = spaces.Box(bounds_lower, bounds_upper,shape=(7,),
                                            dtype=np.float32)  # spaces.Discrete(2) # state size is here 
        int_ratio,int_radius,int_tx,int_itx,int_gb,int_ss1,int_ss2 = self.observation_space.sample()                                    
        # self.intSINR = self.SINR(int_tx,int_itx,int_ratio,int_gb,int_ss1,int_ss2)                                    
        # self.intBitrate = self.bitrate(int_ratio,int_ss1)
        self.intBitrate = 10
        self.intSINR = 10
        # Set start state
        self.state = None
        # Set shower length


########### seeding #############################

    def seed(self, seed=None):
        self.np_random, seed = seeding.np_random(seed)
        return [seed]        

########### step action #############################

    def step(self, action):
        
        # Apply action
        # 0 -1 = -1 temperature
        # 1 -1 = 0 
        # 2 -1 = 1 temperature 
        (ULDLratio, x_cell_radius, txPower, interferencePower, guardband, subcarrierSpacing1, subcarrierSpacing2) = self.state
        print(action)
        power_tx = action & 0b000001                # 1 power up, 0 power down
        ratio = (action & 0b000010)             >> 1
        subcarrier1 = (action & 0b000100)       >> 2
        gb = (action & 0b001000)                >> 3
        subcarrier2 = (action & 0b010000)       >> 4
        intpower = (action & 0b100000)          >> 5            

        self.step_count += 1    

        if (power_tx == 1):
            txPower *= 10**(1/10.)
        else:
            txPower *= 10**(-1/10.)

        if (intpower == 1):
            interferencePower *= 10**(1/10.)
        else:
            interferencePower *= 10**(-1/10.)         

        if (ratio == 1):
            ULDLratio  = ULDLratio + 0.1
        else:
            ULDLratio = ULDLratio - 0.1
            
        if (subcarrier1 == 1):
            subcarrierSpacing1 = subcarrierSpacing1 + 1
        else:
            subcarrierSpacing1 = subcarrierSpacing1 - 1

        if (subcarrier2 == 1):
            subcarrierSpacing2 = subcarrierSpacing2 + 1
        else:
            subcarrierSpacing2 = subcarrierSpacing2 - 1     

        if (gb == 1):
            guardband = guardband + 15
        else:
            guardband = guardband - 15
                                

        
        # Reduce shower length by 1 second
        self.length -= 1 
        # print(self.length)
        

        # Calculate reward
        SINRupdated, SEupdated= self.SINR(txPower,interferencePower,ULDLratio,guardband,subcarrierSpacing1,subcarrierSpacing2)
        bitrate_updated = self.bitrate(ULDLratio, guardband)

        if SINRupdated >= 0.8*self.intSINR: 
            reward = 1 
        else: 
            reward = -1 

        if bitrate_updated >= 0.5*self.intBitrate: 
            reward = 1 
        else: 
            reward = -1 
            

        # Check if shower is done
        if self.length <= 0: 
            done = True
            self.length = 60
        else:
            done = False
            

        # Apply temperature noise
        #self.state += random.randint(-1,1)
        # Set placeholder for info
        info = {}
        
        self.state = (ULDLratio, x_cell_radius, txPower, interferencePower, guardband, subcarrierSpacing1,subcarrier2)
        # Return step information
        return self.state, reward, done, info

########### rendering #############################

    def render(self):
        # Implement viz
        pass

########### reset #############################

    def reset(self):
        # Initialize f_n of both cells
        self.state = [self.np_random.uniform(low=0,   high=self.ULDLratio),
                      self.np_random.uniform(low=0,   high=self.inter_site_distance + self.cell_radius),
                      self.np_random.uniform(low=1,   high=self.txPower / 2),
                      self.np_random.uniform(low=1,   high=self.max_tx_power_interference / 2),
                      self.np_random.uniform(low=0,   high=self.guardband),
                      self.np_random.uniform(low=1,   high=self.subcarrierSpacing1),
                      self.np_random.uniform(low=1,   high=self.subcarrierSpacing2),
                      ]

        self.step_count = 0
        return np.array(self.state)


########### calculation #############################
    def SINR(self,txpower,intpower,ratio,gb,ss1,ss2):
        T = 290  # Kelvins
        B = 15000  # Hz
        k_Boltzmann = 1.38e-23
        noisePower = k_Boltzmann * T * B  # this is in Watts

        receivedSinr = 10 * np.log10(
            self.receivedPower(txpower) / (intpower + noisePower + self.iniPower(gb,ss1,ss2)))
        #print('receivedSinr Power = ' + str(receivedSinr))

        SE = np.log2(1 + receivedSinr)
        #print('Spectrum Efficiency = ' + str(SE))

        return receivedSinr,SE

    def receivedPower(self,txpower):
        d_mainBS, d_interferBS = 100, 100

        if self.FSPL == True:
            pathloss = 20 * np.log10(d_mainBS) + 20 * np.log10(self.f_c)
            #print('pathloss = ' + str(pathloss))
        else:
            f_c = self.f_c
            c = 3e8  # speed of light
            d = 100
            h_B = 20
            h_R = 1.5

            # COST231
            C = 3
            a = (1.1 * np.log10(f_c / 1e6) - 0.7) * h_R - (1.56 * np.log10(f_c / 1e6) - 0.8)
            pathloss = 46.3 + 33.9 * np.log10(f_c / 1e6) + 13.82 * np.log10(h_B) - a + (
                    44.9 - 6.55 * np.log10(h_B)) * np.log10(d / 1000.) + C
            #print('pathloss = ' + str(pathloss))

        receivedPower = txpower * pathloss
        return receivedPower

    def bitrate(self,ratio,ss1):
        N_prb = 100
        ss = int(ss1)

        if ss == 1:
            N_prb = 133
        elif ss == 2:
            N_prb = 133
        elif ss == 3:
            N_prb = 133
        elif ss == 4:
            N_prb = 133


        J = 1  # carrier aggregation
        v = 1  # MIMO layers
        Q = 6  # modulation order
        Rmax = 0.92578125  # LDPC code
        f = 1  # scaling factor
        OH = 0.14  # overhead control
        ULDLratio = ratio
        T_ofdm = 0.0000357142857142852  # (10 ** -3) / (14 * 2^u)
        _bitrate = v * Q * f * Rmax * (12 * N_prb / T_ofdm) * (1 - OH)
        Mbitrate = _bitrate * (10 ** -6)
        Mbitrate = Mbitrate * ULDLratio

        #print('Max BitRate = ' + str(Mbitrate))
        return Mbitrate

    def iniPower(self,gb,ss1,ss2):

        ss1 = int(ss1)
        #print('Max ss1 = ' + str(ss1))
        ss2 = int(ss2)
        #print('Max ss2 = ' + str(ss2))

        # if (ss1 == 1):
        #   n_bs1 = 1025
        # elif (ss1 == 2):
        #   n_bs1 = 1025
        # elif (ss1 == 3):
        #   n_bs1 = 1025
        # elif (ss1 == 4):
        #   n_bs1 = 1025
        # else:
        #     print('Wrong Subcarrier Spacing')

        # if (ss2 == 1):
        #   n_bs2 = 512
        # elif (ss2 == 2):
        #   n_bs2 = 512
        # elif (ss2 == 3):
        #   n_bs2 = 512
        # elif (ss2 == 4):
        #   n_bs2 = 512
        # else:
        #     print('Wrong Subcarrier Spacing')
        n_bs1 = 1025
        n_bs2 = 512
        pt_bs2 = 0.5
        k = 100
        n2 = 2
        z1 = 250
        v1 = 0
        OFDM_overlap = n_bs1 / n_bs2
        gain = 1
        total_v = 0
        total_z = 0
        a = pt_bs2 / n_bs1

        d = np.sin(math.pi / n_bs2 * (z1 - v1 - gb) * OFDM_overlap * n_bs1)
        e = np.sin(math.pi / n_bs2 * (z1 - v1 - gb))
        c = np.absolute((d / e) ** 2)
        f = OFDM_overlap * c
        b = gain / (n_bs2 * n_bs1) * (c + f)
        gain_ratio = gain / (n_bs2 * n_bs1)
        inipower_b = gain_ratio * c
        total_v = total_v + b
        inipower = a * total_v

        # for z1 in range(1, z+1):
        #     for v1 in range(v, 1, -1):
        #         if not z1 == v1:
        #             d = np.sin(math.pi / n_bs2 * (z1 - v1 - gb) * OFDM_overlap * n_bs1)
        #             e = np.sin(math.pi / n_bs2 * (z1 - v1 - gb))
        #             c = np.absolute((d / e) ** 2)
        #             f = OFDM_overlap * c
        #             b = gain / (n_bs2 * n_bs1) * (c + f)
        #             total_v = total_v + b
        #             print(total_v)
        #     total_z = total_z + total_v
        # inipower = a * total_z
        #print('ini Power = ' + str(inipower_b))
        return 0.45

    def spectrum_efficiency(self):
        SE = np.log2(1 + self.SINR())
        #print('Spectrum Efficiency = ' + str(SE))
        return SE
    

In [5]:
env=environment()

  "Box bound precision lowered by casting to {}".format(self.dtype)


In [40]:
env.observation_space.sample()

array([  0.7407396, 291.50287  ,  16.409758 ,  18.031494 ,   4.004028 ,
         2.159822 ,   2.6272109], dtype=float32)

In [41]:
env.reset()

array([  0.53355063, 268.84673703,   1.22370088,  15.8333506 ,
        64.3207984 ,   3.34243916,   2.46139243])

# 4. Test Environment

In [6]:
episodes = 5
for episode in range(1, episodes+1):
    state = env.reset()
    done = False
    score = 0 
    
    while not done:
        env.render()
        action = env.action_space.sample()
        n_state, reward, done, info = env.step(action)
        score+=reward
    print('Episode:{} Score:{}'.format(episode, score))
env.close()

4
2
4
2
6
3
13
14
1
14
14
15
13
6
1
14
7
4
7
8
2
12
10
0
4
1
11
3
10
11
7
5
1
14
11
15
8
1
1
10
1
15
15
10
6
5
3
0
12
5
4
1
9
6
4
11
14
12
8
14
Episode:1 Score:60
10
9
2
1
7
5
15
14
14
4
15
4
8
12
14
9
3
3
14
10
12
0
15
8
6
9
5
12
5
10
10
0
10
12
1
15
3
12
5
14
5
12
5
12
6
11
8
12
8
12
5
8
3
3
13
6
1
3
9
14
Episode:2 Score:38
11
10
10
1
12
0
9
6
11
13
1
13
15
8
3
1
14
13
5
0
5
6
5
0
9
0
2
10
6
12
0
2
1
9
7
10
12
0
6
9
5
9
3
1
15
13
14
8
5
4
12
7
9
14
3
9
2
13
11
11
Episode:3 Score:-22
13
15
13
10
2
14
5
14
7
4
2
14
15
7
10
5
4
4
13
13
12
10
12
6
4
1
12
2
3
9
7
4
11
4
14
4
13
3
0
5
7
10
14
0
11
9
10
4
7
6
10
14
9
1
3
2
1
14
10
12
Episode:4 Score:60
6
12
8
4
6
3
15
11
8
14
12
3
5
5
10
15
0
15
3
5
2
15
9
15
5
3
2
10
2
10
1
1
4
11
4
2
6
13
0
15
8
5
2
2
11
4
11
9
14
15
11
12
10
5
8
10
5
10
13
2
Episode:5 Score:60


In [None]:
env.close()

# 5. Train Model

In [None]:
log_path = os.path.join('Training', 'Logs')

In [None]:
model = PPO("MlpPolicy", env, verbose=1, tensorboard_log=log_path)

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.


In [None]:
model.learn(total_timesteps=400000)

# 6. Save Model

In [None]:
model.save('PPO')

In [None]:
evaluate_policy(model, env, n_eval_episodes=10, render=False)