In [24]:
# simple multiprocessing communication between 1 BS and several UEs
# 16.11.2023 - author Septimia Sarbu, ICON, CWC, University of Oulu

import numpy as np
import torch
from torch import nn

from scipy.stats import bernoulli, expon, poisson
from scipy.stats import rv_discrete
import matplotlib.pyplot as plt

import pdb

import math, copy

from numpy.random import randint, random

import multiprocessing as mp
from multiprocessing import Process, Pool, Lock, RLock, Semaphore, Event, Queue, Value

from queue import Empty

import os, sys
from time import sleep

from numpy import save, load

import random

random.seed(510)

import gymnasium as gym

DATA_VAL = 1.0

class UEModel():
    def __init__(self,idx,item,env):
        
        self.id = idx 
        self.msg = -1
        self.data = item
        self.s = item # the state of the UE; it starts full, having all the nSDUs it needs to transmit
        self.env = env
        
        self.seed = 48
        self.rng = np.random.default_rng(self.seed)
      
    def receive_msg(self,queue):
        self.msg = queue.get()

    def send_msg(self,queue,item):
        queue.put(item)

    def send_hold(self,queue):
        queue.put('H')
    
    def send_data(self,queue,item):
        queue.put(item)

    def send_end(self,queue):
        queue.put(None)
        
class BSModel():
    def __init__(self,idx,nUEs):
        
        self.msg = [-1 for i in range(nUEs)]
        self.got_data = [[] for i in range(nUEs)]
        self.data = [0 for i in range(nUEs)]
        
        self.seed = 480
        self.rng = np.random.default_rng(self.seed)

    def send_msg(self,queue,item,nUEs,run_UEs):
        for i in run_UEs:
            queue[i].put(item[i])
        
    def receive_msg(self,queue,nUEs,run_UEs):
        self.msg = [queue[i].get() for i in run_UEs]
        
    def receive_data(self,queue,run_UEs):
        for i in range(len(run_UEs)):
            self.got_data[run_UEs[i]] = queue[run_UEs[i]].get()

        for i in range(len(run_UEs)):
            if self.got_data[run_UEs[i]] != 'H':
                self.data[run_UEs[i]] += DATA_VAL #self.got_data[run_UEs[i]]


def runUE(UCM,DCM,ch,tSDUs,UEModel,idx):
    
    # redirect the print() to a file
    original_stdout = sys.stdout # Save a reference to the original standard output

    f = open('M_UE'+str(idx)+'.txt', 'w')

    sys.stdout = f # Change the standard output to the file we created
    
    print('UE: Running using gym env: HalfCheetah-v4', flush=True)
     
    env = gym.make("HalfCheetah-v4")
    
    UE = UEModel(idx,tSDUs,env)
    
    obs, info = UE.env.reset() 
    #print("UE CP state = ",UE.env.state)
    print("UE Mujoco state = ",obs)
    
    done_flag = 0
    
    UE.receive_msg(DCM[idx])
    
    tUE = 0 # at tUE=0 we have START
    
    if UE.msg == 'START':
        print('-----------------------------------------------------')
        print(f'>At UE-{idx} START of transmission', flush=True)
        print('-----------------------------------------------------')
        
        done_flag = 1
        
        UE.s = tSDUs
        msg_val = 'SR' # if there is data in the buffer
        sent_SR = 0
        received_SG = 0
        del_data = 0
        data_transmit = 0
        execute_act = 0
        nSDUs = 0
        sent_none = 0
        
    while done_flag == 1:        
        
        print('-----------------------------------------------------')
        tUE += 1
        print("tUE=",tUE)
        
        UE.receive_msg(DCM[idx])
        print(f'>DCM: UE received msg={UE.msg} from BS', flush=True)
        
        if execute_act == 1:
            n_obs, r, terminated, truncated, info = env.step(action)
            
            print("UE executed action received from BS")
            execute_act = 0
            print("UE next state = ",n_obs)
            if terminated or truncated:
                obs, info = UE.env.reset()
                #break
            else:
                obs = n_obs
        
        if del_data == 1:
            nSDUs += 1
            UE.s -= 1
            print(f">UE deleted data ", flush=True)
            del_data = 0
            sent_SR = 0
            received_SG = 0

        if data_transmit == 1:
            UE.send_data(ch[idx],obs)
            print(f">CH: UE sent data = {obs} to BS", flush=True)
            data_transmit = 0

        else:
            UE.send_hold(ch[idx]) # don't transmit anything over the channel; we need this to ensure the queues don't get blocked
            print(f">CH: UE sent H to BS", flush=True)

        UE.send_msg(UCM[idx],msg_val)
        print(f">UCM: UE sends msg={msg_val} to BS", flush=True)

        if msg_val == 'SR':
            sent_SR = 1
        
        ##################################################################################
        # select action and messages to perform at the next time point
        # reset message
        msg_val = 'H'

        if (UE.s == 0) and (nSDUs == tSDUs):
            if sent_none == 1:
                msg_val = 'H'

            if sent_none == 0:
                msg_val = None
                sent_none = 1
                print("None; nSDUs=",nSDUs)

 
        if UE.msg is None:
            #done_flag = 0 # end of communication
            break

        # the communication with the BS can start
        if UE.msg == 'SG':
            data_transmit = 1
            received_SG = 1

        if (UE.s > 0) and (sent_SR == 0) and (received_SG == 0):
            msg_val = 'SR'
            
        #if UE.msg == 'ACK':
        if len(UE.msg) == 6:
            execute_act = 1
            action = UE.msg
            del_data = 1

        if UE.msg == 'NACK':
            del_data = 0
            data_transmit = 1

        if sent_SR == 1:
            msg_val = 'H'
            if received_SG == 0:
                sent_SR = 0

        


    # all done
    tUE += 1
    print('UE: tUE=',tUE, flush=True)
    print('UE: waiting to send None to BS', flush=True)
    UE.send_end(UCM[idx])
    print(f'>UE-{idx}: Done, sent None to BS', flush=True)

    sys.stdout = original_stdout # Reset the standard output to its original value
    print(f'>UE-{idx}: Done', flush=True)
    print('UE: tUE=',tUE, flush=True)

def runBS(UCM,DCM,ch,BSModel,nUEs,tEND,seed):
    
    ptbler = 0.5
    # redirect the print() to a file
    original_stdout = sys.stdout # Save a reference to the original standard output

    f = open('M_BS.txt', 'w')

    sys.stdout = f # Change the standard output to the file we created

    print('BS: Running', flush=True)

    run_UEs = [i for i in range(nUEs)] # this list contains the idx of the UEs that are transmitting
    old_run_UEs = [i for i in range(nUEs)] # this list contains the idx of the UEs that are transmitting

    BS = BSModel(0,nUEs)

    tBS = 0
    print('-----------------------------------------------------')
    print("tBS=",tBS)
    value = ['START' for i in range(nUEs)]
    BS.send_msg(DCM,value,nUEs,run_UEs)
    print(f'>BS put {value} in DCM queue', flush=True)
    print('-----------------------------------------------------')

    done_flag = 1
    sent_none = 0

    n_c = 0

    # no message, this is just to not block the queue
    msg_val = ['H' for i in range(nUEs)]

    all_SR = 0
    trans_UEs = []

    all_data = 0
    data_UEs = []

    idx_done_UE = -1 # idx of a UE in run_UEs that has finnished the transmission; only 1 UE can transmit at a given time
    idx_done_UE_new = -1

    done_UE = -1 # global idx of the above

    sent_SG = 0

    while done_flag == 1:
        n_c += 1
        tBS += 1
        print('-----------------------------------------------------')
        print("tBS=",tBS)

        ##################################################################################
        # receive msg, send on ch, send msg

        if n_c == tEND:
            for idx in range(len(run_UEs)):
                msg_val[run_UEs[idx]] = None

        BS.send_msg(DCM,msg_val,nUEs,run_UEs)
        print(f'>DCM: BS sent msg={msg_val} to UEs = {run_UEs}', flush=True)

        BS.receive_data(ch,run_UEs)
        print(f'>CH: BS received data={BS.got_data} from UEs = {run_UEs}', flush=True)

        BS.receive_msg(UCM,nUEs,run_UEs)
        print(f'>UCM: BS received msg={BS.msg} from UEs = {run_UEs}', flush=True)

        # if data is received at the same time point BS has sent None, it is discarded
        for idx in range(len(run_UEs)):
            if msg_val[run_UEs[idx]] is None:
                if BS.got_data[run_UEs[idx]] != 'H':
                    BS.data[run_UEs[idx]] -= DATA_VAL


        if n_c == tEND:
            break

        
        print("run_UEs=",run_UEs)
        # if the UEs try to send data at the same time, the BS will see it in the UCM messages

        trans_UEs = []
        data_UEs = []

        if len(run_UEs) >= 1:
            for idx in range(len(run_UEs)):
                #if BS.msg[run_UEs[idx]] == 'SR':
                if BS.msg[idx] == 'SR':
                    all_SR += 1
                    trans_UEs.append(run_UEs[idx])

                if BS.got_data[run_UEs[idx]] != 'H':
                    all_data += 1
                    data_UEs.append(run_UEs[idx])

        #print("all_data=",all_data)
        #print("data_UEs=",data_UEs)

        ##################################################################################
        # select action and messages to perform at the next time point
        
        # reset the messages
        msg_val = ['H' for i in range(nUEs)]
        #print("msg_val has been reset: msg_val=",msg_val)

        old_run_UEs = copy.deepcopy(run_UEs)
        if idx_done_UE > -1:
            #print("done_UE=",done_UE)
            #print("idx_done_UE=",idx_done_UE)
            #print("run_UEs=",run_UEs)
            #print("old_run_UEs=",old_run_UEs)

            idx_k = -1
            for k in range(len(run_UEs)):
                if run_UEs[k] == done_UE:
                    idx_k = k
                    break

            run_UEs.pop(idx_k)
            idx_done_UE = -1

            #print("After pop: done_UE=",done_UE)
            #print("After pop: run_UEs=",run_UEs)

            idx_done_UE = idx_done_UE_new
            idx_done_UE_new = -1


        #print("len(run_UEs)=",len(run_UEs))
        #print("msg_val=",msg_val)

        #print("all_SR=",all_SR)

        if (all_SR > 1):
            # if more than 1 UE has sent 'SR' and this has not happened at the previous iteration
            # choose only one to send 'SG' to
            # at 2 consecutive iterations both UEs can send 'SR' simultaneously,
            #because they didn't have time to process the BS's response

            sent_SG = 1
            # select the UE

            x2 = np.arange(0,len(trans_UEs))
            pmfs2 = np.zeros([len(trans_UEs)]) # if all UEs send SR, pmf of which UE to send SG to
            for i in range(len(trans_UEs)):
                pmfs2[i] = 1.0 / len(trans_UEs)

            custom_distrib2 = rv_discrete(name='custom',values=(x2,pmfs2),seed=seed+int(random.random()*1000))
            #custom_distrib2 = rv_discrete(name='custom',values=(x2,pmfs2))
            idx_UE = custom_distrib2.rvs(size=1)[0]
            select_UE = trans_UEs[idx_UE]
            print("To which UE BS sends SG: select_UE=",select_UE)

            msg_val = ['H' for i in range(nUEs)]
            msg_val[select_UE] = 'SG'

            all_SR = 0
            trans_UEs = []

        if all_data > 1:
            print("ERROR: this should not happen, because the BS gave a 'SG' to only 1 UE")
            all_data = 0
            data_UEs = []

        if (all_SR == 1) and (len(run_UEs) >= 1):
            select_UE = trans_UEs[0]
            msg_val[select_UE] = 'SG'

            all_SR = 0
            trans_UEs = []

 
        print("-----------------------------------------")
 
        if (all_data == 1) and (len(run_UEs) >= 1):
            select_UE = data_UEs[0]

            keep = bernoulli(1-ptbler).rvs(size=1,random_state=seed+int(random.random()*1000))[0]#use [0] to make it a scalar, otherwise it is and array
            #keep = bernoulli(1-ptbler).rvs(size=1)[0]#use [0] to make it a scalar, otherwise it is and array
            print("keep=",keep)
            keep_val = int(1 * keep)
            if keep_val == 1:
                #msg_val[select_UE] = 'ACK'
                # select action vector between [-1, 1]
                act = BS.rng.uniform(low=-1,high=1,size=6)
                msg_val[select_UE] = act
                
            if keep_val == 0:
                BS.data[select_UE] -= DATA_VAL

                msg_val = ['H' for i in range(nUEs)] # if 'NACK' then no 'SG' to the other UE that has sent 'SR'
                msg_val[select_UE] = 'NACK'

            all_data = 0
            data_UEs = []
            sent_SG = 0

        #print("msg_val=",msg_val)

        print(f'>BS has total data={BS.data} from UEs = {run_UEs}', flush=True)

        #print("old_run_UEs=",old_run_UEs)

        for idx in range(len(BS.msg)):
            if BS.msg[idx] == None:
                msg_val[old_run_UEs[idx]] = None
                done_UE = old_run_UEs[idx]
                idx_done_UE = idx

        #print("done_UE=",done_UE)
        #print("idx_done_UE=",idx_done_UE)

        if len(run_UEs) == 0:
            break

    # all done
    print('BS: tBS=',tBS, flush=True)
    print('BS: Done', flush=True)
    print('BS has data =',BS.data, flush=True)

    sys.stdout = original_stdout # Reset the standard output to its original value
    
# entry point
if __name__ == '__main__':
        
    # create the UE and the BS
    tSDUs = 20 + 1 # SDUs to transmit from each UE to BS, then the episode ends
    tEND = 20 + 1
    
    nUEs = 2
    
    # create the shared queues
    UCM = [Queue() for i in range(nUEs)]
    DCM = [Queue() for i in range(nUEs)]
    
    ch = [Queue() for i in range(nUEs)]

    # start the UE
    UE_processes = [Process(target=runUE, args=(UCM,DCM,ch,tSDUs,UEModel,i,)) for i in range(nUEs)]
    for UE_process in UE_processes:
        UE_process.start()
    
    # start the BS
    seedBS = 230
    BS_process = Process(target=runBS, args=(UCM,DCM,ch,BSModel,nUEs,tEND,seedBS,))
    BS_process.start()
        
    BS_process.join()
    
    # wait for all processes to finish
    for UE_process in UE_processes:
        UE_process.join()

    print("All processes have terminated successfully")



>UE-0: Done>UE-1: Done

UE: tUE=UE: tUE=  2222

All processes have terminated successfully
