In [1]:

import torch
import torch.nn.functional as F
import random
import numpy as np
import collections
from tqdm import tqdm
import torch
import copy
# import torch.nn.functional as F
# import matplotlib.pyplot as plt
# import rl_utils  

import sys 
sys.path.append("..") 
from env import ScheduleEnv
from workflow import Task, Edge, Workflow, WorkflowProducer, WorkflowProducerExample

In [2]:
vm_info = {
    't2.small' : {'price' : 0.023, 'factor' : 8.0 },
    't2.medium' : {'price' : 0.0464, 'factor' : 4.0 },
    't2.large' : {'price' : 0.0928, 'factor' : 2.0 },
    't2.xlarge' : {'price' : 0.1856, 'factor' : 1.0 }
}
workflow_list = WorkflowProducer(template_path='../template.yml')
# workflow_list = WorkflowProducerExample()


In [3]:
from algorithm.ROSA import ROSA

sc1 = ROSA(copy.deepcopy(workflow_list), vm_info)
sc1.simulation()

ROSA scheduler is started
algorithm name : ROSA
success ratio : 1.00
totol cost : 9.19
utilization : 0.29
vm count : {'t2.small': 0, 't2.medium': 0, 't2.large': 97, 't2.xlarge': 1}



['ROSA',
 1.0,
 9.19,
 0.29,
 {'t2.small': 0, 't2.medium': 0, 't2.large': 97, 't2.xlarge': 1}]

In [4]:
reward_step = 0
return_step = 0
from torch.utils.tensorboard import SummaryWriter
writer = SummaryWriter('./log/demo/')

In [5]:
class DQN(torch.nn.Module):
    def __init__(self, state_dim, action_dim) -> None:
        super().__init__()
        
        self.state_dim = state_dim
        self.action_dim = action_dim
        
        self.dropout = 0.4 # 0.3
        
        self.state_model = torch.nn.Sequential(
            torch.nn.Linear(self.state_dim, 128),
            torch.nn.ReLU(inplace=True),
            torch.nn.Dropout(p=self.dropout),
            torch.nn.Linear(128, 512),
            torch.nn.ReLU(inplace=True),
            torch.nn.Dropout(p=self.dropout),
            torch.nn.Linear(512, 128)
        )
        
        self.action_model = torch.nn.Sequential(
            torch.nn.Linear(self.action_dim, 128),
            torch.nn.ReLU(inplace=True),
            torch.nn.Dropout(p=self.dropout),
            torch.nn.Linear(128, 512),
            torch.nn.ReLU(inplace=True),
            torch.nn.Dropout(p=self.dropout),
            torch.nn.Linear(512, 128)
        )
        
    def forward(self, task_state, vm_state, masked = None):
        q = self.state_model(task_state)
        q = q.view(q.shape[0], 1, q.shape[1])
        k = self.action_model(vm_state)
        v = torch.matmul(q, k.permute(0, 2, 1)).view(vm_state.shape[0], vm_state.shape[1])
        if masked is not None:
            v = v.masked_fill(masked, 0)
        return v

class DQN_cat(torch.nn.Module):
    def __init__(self, state_dim, action_dim) -> None:
        super().__init__()
        
        self.state_dim = state_dim
        self.action_dim = action_dim
        
        self.dropout = 0.4 # 0.3
        
        self.state_model = torch.nn.Sequential(
            torch.nn.Linear(self.state_dim, 128),
            torch.nn.ReLU(inplace=True),
            torch.nn.Dropout(p=self.dropout),
            torch.nn.Linear(128, 512),
            torch.nn.ReLU(inplace=True),
            torch.nn.Dropout(p=self.dropout),
            torch.nn.Linear(512, 128)
        )
        
        self.action_model = torch.nn.Sequential(
            torch.nn.Linear(self.action_dim, 128),
            torch.nn.ReLU(inplace=True),
            torch.nn.Dropout(p=self.dropout),
            torch.nn.Linear(128, 512),
            torch.nn.ReLU(inplace=True),
            torch.nn.Dropout(p=self.dropout),
            torch.nn.Linear(512, 128)
        )
        
    def forward(self, task_state, vm_state, masked = None):
        q = self.state_model(task_state)
        q = q.view(q.shape[0], 1, q.shape[1])
        k = self.action_model(vm_state)
        v = torch.matmul(q, k.permute(0, 2, 1)).view(vm_state.shape[0], vm_state.shape[1])
        if masked is not None:
            v = v.masked_fill(masked, 0)
        return v

class Agent():
    def __init__(self, state_dim, action_dim, learning_rate, gamma,
                 epsilon, target_update, device) -> None:
        
        self.q_net = DQN(state_dim=state_dim, action_dim=action_dim).to(device)
        self.target_q_net = DQN(state_dim=state_dim, action_dim=action_dim).to(device)
        self.optimizer = torch.optim.Adam(self.q_net.parameters(), lr=learning_rate)
        self.gamma = gamma  # 折扣因子
        self.epsilon = epsilon  # epsilon-贪婪策略
        self.target_update = target_update  # 目标网络更新频率
        self.count = 0  # 计数器,记录更新次数
        self.device = device
        
        self.loss_step = 0
    
    def take_action(self, obs):
        if np.random.random() < self.epsilon:
            action = np.random.randint(len(obs[1]))
        else:
            task_state = torch.tensor([obs[0]], dtype=torch.float).to(self.device)
            vm_state = torch.tensor([obs[1]], dtype=torch.float).to(self.device)
            action = self.q_net(task_state, vm_state).argmax().item()
        return action
    
    def update(self, transition_dict):
        task_states, vm_states, masked =  self.split_states(transition_dict['states'])
        next_task_states, next_vm_states, next_masked =  self.split_states(transition_dict['states'])
        
        actions = torch.tensor(transition_dict['actions']).view(-1, 1).to(self.device)
        rewards = torch.tensor(transition_dict['rewards'], dtype=torch.float).view(-1, 1).to(self.device)
        dones = torch.tensor(transition_dict['dones'], dtype=torch.float).view(-1, 1).to(self.device)

        # 返回 batch size 个Q值
        q_values = self.q_net(task_states, vm_states, masked).gather(1, actions)  
        # 下个状态的最大Q值
        max_next_q_values = self.target_q_net(next_task_states, next_vm_states, next_masked).max(1)[0].view(-1, 1)
        q_targets = rewards + self.gamma * max_next_q_values * (1 - dones)  # TD误差目标
        dqn_loss = torch.mean(F.mse_loss(q_values, q_targets))  # 均方误差损失函数
        writer.add_scalar('loss', dqn_loss, global_step=self.loss_step, walltime=None)
        self.loss_step += 1
        self.optimizer.zero_grad()  # PyTorch中默认梯度会累积,这里需要显式将梯度置为0
        dqn_loss.backward()  # 反向传播更新参数
        self.optimizer.step()

        if self.count % self.target_update == 0:
            self.target_q_net.load_state_dict(
                self.q_net.state_dict())  # 更新目标网络
        self.count += 1
    
    def split_states(self, state_dict):
        task_states = []
        vm_states = []
        max_len = 0
        for state in state_dict:
            task_states.append(state[0])
            vm_states.append(state[1])
            max_len = max(len(state[1]), max_len)
        
        masked = torch.full((len(state_dict), max_len), False).to(self.device)  
        for i in range(len(vm_states)):
            masked[i,(len(vm_states[i])):max_len] = True
            if max_len != len(vm_states[i]):
                vm_states[i].extend([0,0,0,0, 0,0,0] for _ in range((max_len-len(vm_states[i]))))
        
        task_states = torch.tensor(task_states, dtype=torch.float).to(self.device)
        vm_states = torch.tensor(vm_states, dtype=torch.float).to(self.device)
        return task_states, vm_states, masked

In [6]:
class ReplayBuffer:
    ''' 经验回放池 '''
    def __init__(self, capacity):
        self.buffer = collections.deque(maxlen=capacity)  # 队列,先进先出

    def add(self, state, action, reward, next_state, done):  # 将数据加入buffer
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):  # 从buffer中采样数据,数量为batch_size
        transitions = random.sample(self.buffer, batch_size)
        state, action, reward, next_state, done = zip(*transitions)
        return np.array(state), action, reward, np.array(next_state), done

    def size(self):  # 目前buffer中数据的数量
        return len(self.buffer)

In [7]:
lr = 1e-3
num_episodes = 200
gamma = 0.95
epsilon = 0.05
target_update = 100
buffer_size = 2000
minimal_size = 700
batch_size = 256
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")


env = ScheduleEnv(workflow_list, vm_info)
np.random.seed(0)
torch.manual_seed(0)

agent = Agent(state_dim=3, action_dim=7, 
              learning_rate=lr, gamma=gamma, epsilon=epsilon,
              target_update=target_update, device=device)

In [8]:
return_list = []

replay_buffer = ReplayBuffer(buffer_size)
for i in range(10):
    with tqdm(total=int(num_episodes / 10), desc='Iteration %d' % i) as pbar:
        for i_episode in range(int(num_episodes / 10)):
            episode_return = 0
            state = env.reset()
            done = False
            while not done:
                action = agent.take_action(state)
                next_state, reward, done, _, info = env.step(action)
                writer.add_scalar('reward', reward, global_step=reward_step, walltime=None)
                reward_step += 1
                # print(info)
                replay_buffer.add(state, action, reward, next_state, done)
                state = next_state
                episode_return += reward
                
                # 当buffer数据的数量超过一定值后,才进行Q网络训练
                if replay_buffer.size() > minimal_size:
                    b_s, b_a, b_r, b_ns, b_d = replay_buffer.sample(batch_size)
                    transition_dict = {
                        'states': b_s,
                        'actions': b_a,
                        'next_states': b_ns,
                        'rewards': b_r,
                        'dones': b_d
                    }
                    agent.update(transition_dict)
                    
            return_list.append(episode_return)
            writer.add_scalar('return', episode_return, global_step=return_step, walltime=None)
            return_step += 1
            if (i_episode + 1) % 10 == 0:
                pbar.set_postfix({
                    'episode':
                    '%d' % (num_episodes / 10 * i + i_episode + 1),
                    'return':
                    '%.3f' % np.mean(return_list[-10:])
                })
            pbar.update(1)

  return np.array(state), action, reward, np.array(next_state), done
Iteration 0:  55%|█████▌    | 11/20 [03:43<03:02, 20.31s/it, episode=10, return=-356.974]


KeyboardInterrupt: 

In [None]:
# torch.save(agent.q_net.state_dict(),'./model/tmp')
# agent.q_net.load_state_dict(torch.load('./model/tmp'))
agent.q_net.eval()

agent.epsilon = 0

rtn = 0
obs = env.reset()
env.print_res = True
done = False
while not done:
    action = agent.take_action(obs)
    obs, reward, done, _, info = env.step(action)
    rtn += gamma * rtn + reward
    # for i in info:
    #     print(i)
    # print('----------------------')

[26] Scheduling Task ID00002, exe time 199.44
active vm num: 1, avalible vm num: 4, target: 3
    vm id: 0, finish time: 0.4432, cost num: 1, remain:0.5568
    vm id: 1, finish time: 0.2216, cost num: 1, remain:0.7784
    vm id: 2, finish time: 0.1108, cost num: 1, remain:0.8892
    *vm id: 3, finish time: 0.0554, cost num: 1, remain:0.9446
schedule task ID00002 to [new] vm 140052077019248(t2.xlarge), predict cost: 0.1856(1*0.1856)
task start time: 26, caculate time: 199.44, finish time: 225.44, time out: False
vm start time: 26, vm avalible time: 225.44, vm remain time: 3400.56
reward = wf_finish(0) - (punish(0) + cost(0.1856)) = -0.1856
----------------------
next task ID00025 to schedule
[26] Scheduling Task ID00025, exe time 127.34
active vm num: 2, avalible vm num: 5, target: 3
    vm id: 0, finish time: 0.09077222222222221, cost num: 0, remain:0.9092277777777779
    vm id: 1, finish time: 0.2829777777777778, cost num: 1, remain:0.7170222222222221
    vm id: 2, finish time: 0.1414