In [1]:
import mmap
import time
from time import strftime, localtime
import glob
import os
import json

import import_ipynb
import torch
import torch.nn.functional as F
from torch.distributions import Categorical
from torch_geometric.data import Data
from torch.utils.tensorboard import SummaryWriter
import numpy as np
import collections
import sys 
import win32pipe, win32file, pywintypes

from actor import actor_network

torch.set_printoptions(threshold=10_000)
np.set_printoptions(threshold=sys.maxsize)



BUFFER_SIZE = 200000

  from .autonotebook import tqdm as notebook_tqdm


# 통신 관련 함수

In [2]:
def sendOmnetMessage(msg):
    win32file.WriteFile(pipe, msg.encode('utf-8'))
    
def getOmnetMessage():
    response_byte = win32file.ReadFile(pipe, BUFFER_SIZE)
    response_str = response_byte[1].decode('utf-8')
    return response_str

def closePipe():
    win32file.CloseHandle(pipe)

# 실험 정보 관련 함수

In [3]:
def recordExpInfo(config):

    networkInfo = config["network_info"]

    modelNum = int(networkInfo['modelNum'])
    availableJobNum = int(networkInfo['availableJobNum'])
    nodeNum = int(networkInfo['nodeNum'])
    jobWaitingLength = int(networkInfo['jobWaitingQueueLength'])
    adjacency = eval(networkInfo['adjacencyList'])
    episode_length = int(networkInfo['episode_length'])
    node_capacity = networkInfo['node_capacity']
    job_generate_rate = networkInfo['job_generate_rate']

    node_feature_num = 2 * (modelNum * availableJobNum)
    queue_feature_num = (nodeNum + modelNum) * jobWaitingLength
    hidden_feature_num = 10*(node_feature_num + queue_feature_num)
    reward_weight = 1/modelNum
    entropy_weight = config["entropy_weight"]
    
    info = f"""
    노드 개수 : {nodeNum}
    네트워크 최대 job 개수 : {availableJobNum}
    job 대기 가능 개수 : {jobWaitingLength}
    최대 subtask 개수 : {modelNum}
    인접 리스트 : {adjacency}
    node_feature_num : {node_feature_num}
    queue_feature_num : {queue_feature_num}
    episode_length : {episode_length}
    node_capacity : {node_capacity}
    entropy_weight : {entropy_weight}
    reward_weight : {reward_weight}
    job_generate_rate : {job_generate_rate}
    """
    print(info)

    with open(f'{config["path_name"]}/info.txt', 'w') as f:
        f.write(f'{info}')

In [4]:
def main(config):
    global adjacency, writer

    recordExpInfo(config)
    
    model = actor_network(config)
    reward_history = []
    v_history = []

    adjacency = torch.tensor(adjacency, dtype=torch.long)

    step = 1
    episode = 1

    max_reward = 0

    average_reward = 0
    average_reward_num = 0

    temp_history = []

    isStop = False
    node_selected_num = [0 for i in range(config["node_num"])]
    void_selected_num = 0

    pre_state_1 = {}
    pre_state_2 = {}

    model = model.cuda()
    
    while True:
        # time.sleep(config["cpu_load_balance_time"])
        
        msg = getOmnetMessage()
        
        if msg == "action": # omnet의 메세지, state 받으면 됨
            sendOmnetMessage("ok")
            state_1 = getOmnetMessage()
            state_2 = getOmnetMessage()

            if len(state_1) == 0:
                state_1 = state_2
            

            if len(pre_state_1) == 0: # action 시작
                state_1 = json.loads(state_1) # state 받았으므로 action 하면됨.
                state_2 = json.loads(state_2) # state 받았으므로 action 하면됨.
                
            else:
                state_1 = json.loads(state_1)
                pre_state_1['jobWaiting'] = state_1['jobWaiting']
                pre_state_1['sojournTime'] = state_1['sojournTime']
                state_1 = pre_state_1

                state_2 = json.loads(state_2)
                pre_state_2['jobWaiting'] = state_2['jobWaiting']
                pre_state_2['sojournTime'] = state_2['sojournTime']
                state_2 = pre_state_2
            
            sendOmnetMessage("ok") # 답장

            node_waiting_state_1 = np.array(eval(str(state_1['nodeState'])))
            node_processing_state_1 = np.array(eval(state_1['nodeProcessing']))
            link_state_1 = np.array(eval(state_1['linkWaiting']))
            job_waiting_state_1 = np.array(eval(state_1['jobWaiting']))
            activated_job_list_1 = eval(state_1['activatedJobList'])
            isAction_1 = int(state_1['isAction'])
            reward_1 = float(state_1['reward'])
            averageLatency_1 = float(state_1['averageLatency'])
            completeJobNum_1 = int(state_1['completeJobNum'])
            sojournTime_1 = float(state_1['sojournTime'])

            node_waiting_state_2 = np.array(eval(str(state_2['nodeState'])))
            node_processing_state_2 = np.array(eval(state_2['nodeProcessing']))
            link_state_2 = np.array(eval(state_2['linkWaiting']))
            job_waiting_state_2 = np.array(eval(state_2['jobWaiting']))
            activated_job_list_2 = eval(state_2['activatedJobList'])
            isAction_2 = int(state_2['isAction'])
            reward_2 = float(state_2['reward'])
            averageLatency_2 = float(state_2['averageLatency'])
            completeJobNum_2 = int(state_2['completeJobNum'])
            sojournTime_2 = float(state_2['sojournTime'])
            

        

            writer.add_scalar("completeJobNum/train", completeJobNum_2 ,step)
            

            # 이 timestep에서 발생한 모든 샘플에 똑같은 보상 적용.
            if averageLatency_2 == -1:
                reward_2 = 0
            else:
                reward_2 = completeJobNum_2
            
            writer.add_scalar("Reward/train", reward_2, step)
                
            first_sample = True
            if config["is_train"] and len(pre_state_1) == 0:
                for history in temp_history:
                    history[3] = reward_2
                    history[4] = network_state
                    history[5] = job_waiting_state
                    model.history.append(history)
                    model.put_data(history)

            model.history = model.history[-config["replay_buffer_size"]:]
            

            temp_history = []

            job_index = int(state_2['jobIndex'])

            #print('sojourn time :', sojournTime)


            node_state_1 = np.concatenate((node_waiting_state_1,node_processing_state_1) ,axis = 1)
            node_state_1 = torch.tensor(node_state_1, dtype=torch.float)

            node_state_2 = np.concatenate((node_waiting_state_2,node_processing_state_2) ,axis = 1)
            node_state_2 = torch.tensor(node_state_2, dtype=torch.float)

            #print(reward)

            link_state_1 = torch.tensor(link_state_1, dtype=torch.float)
            link_state_2 = torch.tensor(link_state_2, dtype=torch.float)

            job_waiting_num = 0
            job_waiting_queue = collections.deque()
            for job in job_waiting_state_2:
                if any(job): # 하나라도 0이 아닌 것 이 있으면 job이 있는것임.
                    job_waiting_num += 1
                    job_waiting_queue.append(job)
            
            job_waiting_state_1 = torch.tensor(job_waiting_state_1, dtype=torch.float).view(1, -1)
            job_waiting_state_2 = torch.tensor(job_waiting_state_2, dtype=torch.float).view(1, -1)
            # print(job_waiting_state)

            network_state_1 = Data(x=node_state_1, edge_attr=link_state_1, edge_index=adjacency)
            network_state_2 = Data(x=node_state_2, edge_attr=link_state_2, edge_index=adjacency)

            network_state = [network_state_1.cuda(), network_state_2.cuda()]
            job_waiting_state = [job_waiting_state_1.cuda(), job_waiting_state_2.cuda()]

            pre_state_1 = state_1
            pre_state_2 = state_2
            
            if average_reward_num == 0:
                average_reward = reward_2
                average_reward_num = 1
            else:
                average_reward = average_reward + (reward_2 - average_reward)/(average_reward_num + 1)
                average_reward_num += 1
                
            if step > 1:
                for i in range(config["node_num"]):
                    node_tag = "node/" + str(i) + "/train"
                    writer.add_scalar(node_tag, node_selected_num[i], step)

                writer.add_scalar("node/void/train", void_selected_num, step)
                    
                node_selected_num = [0 for i in range(config["node_num"])] # node selected num 초기화
                void_selected_num = 0

                if reward_2 != 0:
                    with torch.no_grad():
                        state = model.gnn([network_state, job_waiting_state])
                        writer.add_scalar("Value/train", torch.mean(model.v(state)), step)
                

                writer.flush()

            
            
            # print(job_waiting_queue)
            if job_waiting_num == 0:
                isAction_2 = False
                

            if isAction_2:
                job_idx = job_index
                job = job_waiting_queue.popleft()
                src = -1
                dst = 1
                for i in range(config["node_num"]):
                    if job[i] == -1:
                        src = i
                    if job[i] == 1:
                        dst = i

                if src == -1:
                    src = dst
                    
                #print(f"src : {src}, dst : {dst}")
                #print(job)
                subtasks = job[config["node_num"]:]
                offloading_vector = []
                temp_data = []
                scheduling_start = False
                # print(subtasks)
                step += 1

                with torch.no_grad():
                    feature = model.gnn([network_state, job_waiting_state])
                    new_feature = feature.repeat(config["model_num"], 1)
                    prob, entropy, output = model.pi([new_feature, (torch.zeros(config["model_num"], config["lstm_hidden_num"]).cuda(), torch.zeros(config["model_num"], config["lstm_hidden_num"]).cuda())])

                writer.add_scalar("Entropy/train", entropy[0], step)
                #print(f'prob : {prob}')

                prob = prob.to("cpu")
                entropy = entropy.to("cpu")
                output = output.to("cpu")

                # isVoid = F.sigmoid(dists[modelNum].sample())

                m = Categorical(prob) 
                nodes = m.sample()

                node = nodes[0].item()

                #print(f'node : {node}')
                
                # void action 실험용
                # node = nodeNum 
                
                
                # void action 뽑으면 void만 업데이트
                if node == config["node_num"] and not scheduling_start: 
                    prob[0] = torch.Tensor([0] * config["node_num"] + [1.0])
                    action_mask = [int(not scheduling_start) if i == node else int(scheduling_start) for i in range(config["node_num"] + 1)]

                    temp_history.append([
                        [network_state[0], network_state[1]], 
                        [job_waiting_state[0], job_waiting_state[1]], 
                        node, 0, 
                        [network_state[0], network_state[1]], 
                        [job_waiting_state[0], job_waiting_state[1]],
                        prob[0][node].item(), 
                        0, action_mask, 0]
                    )

                    sendOmnetMessage("void")

                    #print("action finish.")
                    
                    if getOmnetMessage() == "ok":
                        void_selected_num += 1

                else:
                    scheduling_start = True

                if scheduling_start:

                    with torch.no_grad():
                        feature = model.gnn([network_state, job_waiting_state])
                        new_feature = feature.repeat(config["model_num"], 1)
                        prob, entropy, output = model.pi([new_feature, (torch.zeros(config["model_num"], config["lstm_hidden_num"]).cuda(), torch.zeros(config["model_num"], config["lstm_hidden_num"]).cuda())])

                    output = output[:, 0:-1]

                    prob = prob.to("cpu")
                    entropy = entropy.to("cpu")
                    output = output.to("cpu")

                    prob = F.softmax(output, dim=0)
                    prob = torch.concat([prob, torch.zeros(config["model_num"], 1)], dim =1)

                    m = Categorical(prob)
                    nodes = m.sample()
                    action_mask = [1] * config["node_num"] + [0]

                    for index in range(config["model_num"]):

                        node = nodes[index].item()

                        temp_history.append([
                        [network_state[0], network_state[1]], 
                        [job_waiting_state[0], job_waiting_state[1]], 
                        node, -1, 
                        [-1, -1], 
                        [-1, -1],
                        prob[index][node].item(), 
                        0, action_mask, index]
                        )

                        offloading_vector.append(node)
                        node_selected_num[node] += 1
                    
                if len(offloading_vector) != 0: # for문을 다 돌면 -> void action 안뽑으면
                    # print(offloading_vector)
                    msg = str(offloading_vector)
                    sendOmnetMessage(msg)
                    
                    #print("action finish.")
                    if(getOmnetMessage() == "ok"):
                        pass

        elif msg == "stop":
            
            sendOmnetMessage("ok")
            pre_state_1 = {}
            pre_state_2 = {}
            
        elif msg == "episode_finish":
            sendOmnetMessage("ok")

            episodic_reward = getOmnetMessage()
            episodic_reward = json.loads(episodic_reward)
            
            finish_num = float(episodic_reward['reward'])
            complete_num = int(episodic_reward['completNum'])
            average_latency = float(episodic_reward['averageLatency'])

            normalized_finish_num = model.return_normalize_reward(finish_num)
            
            writer.add_scalar("EpisodicReward/train", finish_num, episode)
            writer.add_scalar("NormalizedEpisodicReward/train", normalized_finish_num, episode)
            writer.add_scalar("CompleteNum/train", complete_num, episode)
            writer.add_scalar("averageLatency/train", average_latency ,episode)

            episode += 1
            sendOmnetMessage("ok")

            if finish_num > max_reward:
                modelPathName = config["path_name"] + "/max_model.pth"
                torch.save(model.state_dict(), modelPathName)
                max_reward = finish_num

            writer.add_scalar("AverageReward/train", average_reward, step)
            average_reward = 0
            average_reward_num = 0

            if config["is_train"]:
            
                tm = localtime(time.time())
                time_string = strftime('%Y-%m-%d %I:%M:%S %p', tm)
                print(f"[{time_string}] training....")
                model.train_net()
                tm = localtime(time.time())
                time_string = strftime('%Y-%m-%d %I:%M:%S %p', tm)
                print(f"[{time_string}] training complete")

                tm = localtime(time.time())
                time_string = strftime('%Y-%m-%d %I:%M:%S %p', tm)
                print(f"[{time_string}] training replay buffer....")
                model.train_net_history()
                tm = localtime(time.time())
                time_string = strftime('%Y-%m-%d %I:%M:%S %p', tm)
                print(f"[{time_string}] training complete")

                if episode % 100 == 0:
                    modelPathName = config["path_name"] + f"/model_{episode}.pth"
                    torch.save(model.state_dict(), modelPathName)

                    time.sleep(10)
                
                modelPathName = config["path_name"] + "/model.pth"
                torch.save(model.state_dict(), modelPathName)

                
                
                
                



# 통신 관련 초기화

In [5]:
PIPE_NAME = "\\\\.\\pipe\\worker1"
BUFFER_SIZE = 200000

try:
    pipe = win32pipe.CreateNamedPipe(
        PIPE_NAME,
        win32pipe.PIPE_ACCESS_DUPLEX,
        win32pipe.PIPE_TYPE_MESSAGE | win32pipe.PIPE_READMODE_MESSAGE | win32pipe.PIPE_WAIT,
        1,
        BUFFER_SIZE,
        BUFFER_SIZE,
        0,
        None
    )    
except:
    pass

win32pipe.ConnectNamedPipe(pipe, None)



0

In [6]:
initial_message = getOmnetMessage()
networkInfo = json.loads(initial_message)

modelNum = int(networkInfo['modelNum'])
availableJobNum = int(networkInfo['availableJobNum'])
nodeNum = int(networkInfo['nodeNum'])
jobWaitingLength = int(networkInfo['jobWaitingQueueLength'])
adjacency = eval(networkInfo['adjacencyList'])
episode_length = int(networkInfo['episode_length'])
node_capacity = networkInfo['node_capacity']
job_generate_rate = networkInfo['job_generate_rate']

node_feature_num = 2 * (modelNum * availableJobNum)
queue_feature_num = (nodeNum + modelNum) * jobWaitingLength
hidden_feature_num = 10*(node_feature_num + queue_feature_num)
reward_weight = 1/modelNum

In [7]:
os.chdir("C:/Users/user/Desktop/suhwan/connection_test/python_agent/experiment/subtask_test")
folderList = glob.glob("history*")

pathName = "history" + str(len(folderList))

print(pathName)

os.mkdir(pathName)

writer = SummaryWriter(pathName)

history1


In [8]:
if __name__ == '__main__':
    sendOmnetMessage("init") # 입력 끝나면 omnet에 전송
    print("네트워크 초기화 완료")

    config = {
        "learning_rate"         : 0.0001,
        "gamma"                 : 0.97,
        "entropy_weight"        : 0.1,
        "lambda"                : 0.8,
        "eps_clip"              : 0.2,
        "batch_size"            : 128,
        "loss_coef"             : 0.5,
        "job_generate_rate"     : 0.003,
        "is_train"              : True,
        "replay_buffer_size"    : 10000,
        "history_learning_time" : 3,
        "current_learning_time" : 0,
        "node_feature_num"      : 2 * (modelNum * availableJobNum),
        "queue_feature_num"     : (nodeNum + modelNum) * jobWaitingLength,
        "hidden_feature_num"    : 10*(node_feature_num + queue_feature_num),
        "reward_weight"         : 1.0/modelNum,
        "node_num"              : nodeNum,
        "model_num"             : modelNum,
        "lstm_hidden_num"       : 10,
        "cpu_load_balance_time" : 0.1,
        "network_info"          : networkInfo,
        "path_name"             : pathName,
    }

    main(config)

네트워크 초기화 완료

    노드 개수 : 10
    네트워크 최대 job 개수 : 5
    job 대기 가능 개수 : 15
    최대 subtask 개수 : 3
    인접 리스트 : [[0, 1, 0, 2, 0, 3, 0, 6, 0, 8, 1, 3, 1, 4, 1, 5, 1, 7, 1, 8, 3, 4, 4, 5, 4, 6, 4, 7, 5, 9, 7, 9], [1, 0, 2, 0, 3, 0, 6, 0, 8, 0, 3, 1, 4, 1, 5, 1, 7, 1, 8, 1, 4, 3, 5, 4, 6, 4, 7, 4, 9, 5, 9, 7]]
    node_feature_num : 30
    queue_feature_num : 195
    episode_length : 100
    node_capacity : 0.030000, 0.090000
    entropy_weight : 0.1
    reward_weight : 0.3333333333333333
    job_generate_rate : 30
    
[2023-01-18 03:32:44 PM] training....
[2023-01-18 03:32:44 PM] training complete
[2023-01-18 03:32:44 PM] training replay buffer....
[2023-01-18 03:32:45 PM] training complete
[2023-01-18 03:32:52 PM] training....
[2023-01-18 03:32:52 PM] training complete
[2023-01-18 03:32:52 PM] training replay buffer....
[2023-01-18 03:32:54 PM] training complete
[2023-01-18 03:33:01 PM] training....
[2023-01-18 03:33:01 PM] training complete
[2023-01-18 03:33:01 PM] training replay buffer.