<a href="https://colab.research.google.com/github/Zahra-FallahMMA/DRL_Offload_Allocation/blob/main/DRL_Offload_Allocation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# import Libraries

In [11]:
import xml.etree.ElementTree as ET
from io import StringIO
import os
import re
import numpy as np
from collections import deque, defaultdict
import random
from keras.models import Sequential
from keras.layers import Dense, Dropout
from keras.optimizers import Adam
from keras.models import load_model
from itertools import product
import tensorflow as tf
!pip install simpy
import simpy

# Set TensorFlow logging level to suppress detailed logs
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
tf.get_logger().setLevel('ERROR')

Collecting simpy
  Downloading simpy-4.1.1-py3-none-any.whl.metadata (6.1 kB)
Downloading simpy-4.1.1-py3-none-any.whl (27 kB)
Installing collected packages: simpy
Successfully installed simpy-4.1.1


# class ReplayBuffer

In [2]:
class ReplayBuffer:
    def __init__(self, max_size, input_shape, n_actions, discrete=False):
        self.mem_size = max_size
        self.mem_cntr = 0
        self.discrete = discrete
        self.state_memory = np.zeros((self.mem_size, input_shape))
        self.new_state_memory = np.zeros((self.mem_size, input_shape))
        dtype = np.int8 if self.discrete else np.float32
        self.action_memory = np.zeros((self.mem_size, n_actions), dtype=dtype)
        self.reward_memory = np.zeros(self.mem_size)
        self.terminal_memory = np.zeros(self.mem_size, dtype=np.float32)

    def store_transition(self, state, action, reward, state_, done):
        index = self.mem_cntr % self.mem_size
        self.state_memory[index] = state
        self.new_state_memory[index] = state_
        if self.discrete:
            actions = np.zeros(self.action_memory.shape[1])
            actions[action] = 1.0
            self.action_memory[index] = actions
        else:
            self.action_memory[index] = action
        self.reward_memory[index] = reward
        self.terminal_memory[index] = 1 - done
        self.mem_cntr += 1

    def sample_buffer(self, batch_size):
        max_mem = min(self.mem_cntr, self.mem_size)
        batch = np.random.choice(max_mem, batch_size)
        states = self.state_memory[batch]
        actions = self.action_memory[batch]
        rewards = self.reward_memory[batch]
        states_ = self.new_state_memory[batch]
        terminal = self.terminal_memory[batch]
        return states, actions, rewards, states_, terminal


# Class DQNAgent

In [5]:
class DQNAgent:
    def __init__(self, state_size, action_size, learning_rate=0.001, discount_factor=0.95, exploration_rate=1.0,
                 exploration_decay=0.995, exploration_min=0.01, batch_size=64, memory_size=2000):
        self.state_size = state_size
        self.action_size = action_size
        self.learning_rate = learning_rate
        self.discount_factor = discount_factor
        self.exploration_rate = exploration_rate
        self.exploration_decay = exploration_decay
        self.exploration_min = exploration_min
        self.batch_size = batch_size
        self.memory = ReplayBuffer(memory_size, state_size, action_size, discrete=True)
        self.model = self._build_model()
        self.target_model = self._build_model()
        self.update_target_model()
        self.target_update_counter = 0

    def _build_model(self):
        model = Sequential()
        model.add(Dense(64, input_dim=self.state_size, activation='relu'))
        model.add(Dropout(0.5))
        model.add(Dense(64, activation='relu'))
        model.add(Dropout(0.5))
        model.add(Dense(32, activation='relu'))
        model.add(Dense(self.action_size, activation='linear'))
        model.compile(optimizer=Adam(learning_rate=self.learning_rate), loss='mse')
        return model

    def update_target_model(self):
        self.target_model.set_weights(self.model.get_weights())

    def remember(self, state, action, reward, next_state, done):
        self.memory.store_transition(state, action, reward, next_state, done)

    def choose_action(self, state):
        if np.random.rand() <= self.exploration_rate:
            return random.randrange(self.action_size)
        state = np.array(state).reshape(1, -1)  # Ensure state is 2D
        q_values = self.model.predict(state, verbose=0)
        return np.argmax(q_values[0])

    def replay(self):
        if self.memory.mem_cntr < self.batch_size:
            return
        states, actions, rewards, next_states, dones = self.memory.sample_buffer(self.batch_size)

        targets = self.model.predict(states, verbose=0)
        target_next = self.target_model.predict(next_states, verbose=0)

        for i in range(self.batch_size):
            action_index = np.argmax(actions[i])  # Find the index of the action
            if dones[i]:
                targets[i, action_index] = rewards[i]
            else:
                targets[i, action_index] = rewards[i] + self.discount_factor * np.amax(target_next[i])

        self.model.fit(states, targets, epochs=1, verbose=0)

        if self.exploration_rate > self.exploration_min:
            self.exploration_rate *= self.exploration_decay

        # Update target model every 10 episodes or steps
        self.target_update_counter += 1
        if self.target_update_counter % 10 == 0:
            self.update_target_model()
            self.target_update_counter = 0

    def load_model(self, path):
        self.model = load_model(path)

    def save_model(self, path):
        self.model.save(path)

# class Task

In [6]:
class Task:
    def __init__(self, id, instructions, workflow_id):
        self.id = id
        self.instructions = instructions  # Execution time or computational instructions
        self.children = []  # List of tasks that depend on this task
        self.parents = []  # List of tasks this task depends on
        self.executed = False  # Status of execution
        self.executed_on = None  # Node this task was executed on
        self.execution_time = 0  # Time taken to execute the task
        self.cost = 0  # Cost of executing the task
        self.comm_delay = 0  # Communication delay in seconds
        self.workflow_id = workflow_id  # Workflow identifier to which this task belongs


# Class Workflow

In [7]:
class Workflow:
    def __init__(self, id):
        self.id = id  # Workflow identifier
        self.tasks = {}  # Dictionary of tasks in the workflow

    def add_task(self, task_id, instructions, parent_ids=[]):
        if task_id not in self.tasks:
            self.tasks[task_id] = Task(task_id, instructions, self.id)
        task = self.tasks[task_id]
        for parent_id in parent_ids:
            if parent_id not in self.tasks:
                self.tasks[parent_id] = Task(parent_id, 0, self.id)
            parent_task = self.tasks[parent_id]
            parent_task.children.append(task)
            task.parents.append(parent_task)

# class parse_dax

In [8]:
def parse_dax(file_path, workflow_id):
    tree = ET.parse(file_path)
    root = tree.getroot()

    workflow_id = workflow_id
    workflow = Workflow(workflow_id)

    # Parse jobs
    jobs = {job.attrib['id']: job for job in root.findall('{http://pegasus.isi.edu/schema/DAX}job')}

    # Add jobs to workflow
    for job_id, job in jobs.items():
        instructions = float(job.attrib.get('runtime', 0))
        workflow.add_task(job_id, instructions)

    # Parse dependencies
    for child in root.findall('{http://pegasus.isi.edu/schema/DAX}child'):
        child_id = child.attrib['ref']
        parent_ids = [parent.attrib['ref'] for parent in child.findall('{http://pegasus.isi.edu/schema/DAX}parent')]
        workflow.add_task(child_id, 0, parent_ids)  # Adds a child node with its parent nodes, setting instructions to 0 to avoid overwrite

    return workflow


def ensemble_of_workflows(name, size=10, distribution='constant', dax_path=''):
    ws = []
    ensemble = []
    directory_path = dax_path  # Directory containing DAX files

    # List and filter files in directory
    files = os.listdir(directory_path)
    filtered_files = [file for file in files if name in file]

    if distribution == 'constant':
        pattern = r'100(?!\d)'
        for s in filtered_files:
            if re.search(pattern, s):
                ensemble = [s] * size  # Replicate the matched file 'size' times
                break
    else:
        numbers = np.random.randint(0, len(filtered_files), size)
        ensemble = [filtered_files[i] for i in numbers]  # Select random files based on uniform distribution
    w_id = 0
    for name in ensemble:
        ws.append(parse_dax(dax_path+name,w_id))
        w_id = w_id + 1

    return ws

# Loading dax files

In [9]:
from google.colab import drive
import glob

drive.mount('/content/drive')
folder_path = '/content/drive/My Drive/Zahra/dax/'

Mounted at /content/drive


# class Device

In [10]:
class Device:
    def __init__(self, id, mips, cost_per_hour, env):
        self.id = id
        self.mips = mips
        self.cost_per_hour = cost_per_hour
        self.queue = deque()
        self.runnig_queue = deque()
        self.resource = simpy.Resource(env, capacity=1)

    def add_task_to_queue(self, task):
        self.queue.append(task)

    def get_next_task(self):
        return self.queue.popleft() if self.queue else None

    def waiting_time(self):
        waiting_time = 0
        for t in self.queue:
            waiting_time += t.instructions / self.mips
        return waiting_time


# class FogEnv

In [62]:
import simpy

class FogEnv:
    def __init__(self, env, iot_devices, fog_nodes, cloud_servers, workflows):
        self.env = env
        self.iot_devices = iot_devices
        self.fog_nodes = fog_nodes
        self.cloud_servers = cloud_servers
        self.cost = 0
        self.completed_workflows = 0
        self.workflows = workflows

    def assign_task(self, task, device):
        with device.resource.request() as req:
            yield req
            instructions = task.instructions
            execution_time = instructions / device.mips
            yield self.env.timeout(execution_time)
            self.cost += execution_time * device.cost_per_hour
            task.executed = True
            task.execution_time = execution_time
            device.queue.popleft()
            print(f"Task {task.id} of workflow {task.workflow_id} completed on {device.id} at time {self.env.now}")

            # Check if the workflow is completed
            workflow = next(wf for wf in self.workflows if wf.id == task.workflow_id)
            self.check_workflow_completion(workflow)

    def get_state(self, task):
        return [self.cost] + [d.waiting_time() for d in self.iot_devices + self.fog_nodes + self.cloud_servers]

    def get_device_by_id(self, device_id):
        for device in self.iot_devices + self.fog_nodes + self.cloud_servers:
            if device.id == device_id:
                return device
        return None

    def check_workflow_completion(self, workflow):
        # Check if all tasks in the workflow are executed
        if all(task.executed for task in workflow.tasks.values()):
            self.completed_workflows += 1  # Increment completed workflows counter
            print(f"Workflow {workflow.id} is completed! Total completed workflows: {self.completed_workflows}")





def process_workflow(env, workflow, fog_env, agent):
      while(True):
        if all([task.executed for task in workflow.tasks.values()]):
          break
        for task in workflow.tasks.values():
            if task.executed:
              continue
            if all([parent.executed for parent in task.parents]) or task.parents == []:
                state = np.array(fog_env.get_state(task)).reshape(1, -1)
                action = agent.choose_action(state)
                devices = fog_env.iot_devices + fog_env.fog_nodes + fog_env.cloud_servers
                device = devices[action]
                device.add_task_to_queue(task)
                yield env.process(fog_env.assign_task(task, device))
                next_state = np.array(fog_env.get_state(task)).reshape(1, -1)
                agent.remember(state, action, -1*fog_env.cost, next_state, False)
                agent.replay()

# class NewSim



In [64]:
class  NewSim:
    def __init__(self, num_iot, num_fog, num_server, learning_rate=0.001, discount_factor=0.95,
                 exploration_rate=1.0, exploration_decay=0.995, exploration_min=0.01, batch_size=64, memory_size=2000):
        self.num_iot = num_iot
        self.num_fog = num_fog
        self.num_server = num_server
        self.num_totla_dev = num_iot + num_fog + num_server
        self.learning_rate = learning_rate
        self.discount_factor = discount_factor
        self.exploration_rate = exploration_rate
        self.exploration_decay = exploration_decay
        self.exploration_min = exploration_min
        self.batch_size = batch_size
        self.memory_size = memory_size
        self.env = simpy.Environment()
        self.reset()
        self.run()

    def reset(self):
        self.iot_devices = [Device(f'iot_{i}', 500, 0, self.env) for i in range(self.num_iot)]
        self.fog_devices = [Device(f'fog_{i}', 4000, 1, self.env) for i in range(self.num_fog)]
        self.server_devices = [Device(f'server_{i}', 6000, 8, self.env) for i in range(self.num_server)]
        self.agent = DQNAgent(state_size=1+self.num_totla_dev, action_size=self.num_totla_dev, learning_rate=self.learning_rate, discount_factor=self.discount_factor,
                                  exploration_rate=self.exploration_rate, exploration_decay=self.exploration_decay,
                                  exploration_min=self.exploration_min, batch_size=self.batch_size, memory_size=self.memory_size)

        self.workflows = ensemble_of_workflows(name = 'CyberShake', size=2, distribution = 'constant', dax_path="/content/drive/My Drive/Zahra/dax/")
    def run(self):
        fog_env = FogEnv(self.env, self.iot_devices, self.fog_devices, self.server_devices,self.workflows)
        for workflow in self.workflows:
                self.env.process(process_workflow(self.env, workflow, fog_env, self.agent))

        self.env.run()  # Run simulation for a time period


In [65]:
def run_simulation(num_runs=100):
    learning_rate = 0.0001
    discount_factor = 0.99
    exploration_rate = 0.5
    exploration_decay = 0.995
    exploration_min = 0.05

    best_mean_delay = float('inf')
    best_mean_cost = float('inf')
    best_params = None

    simulation = NewSim(num_iot=10, num_fog=8, num_server=5, learning_rate=learning_rate,
                        discount_factor=discount_factor, exploration_rate=exploration_rate,
                        exploration_decay=exploration_decay, exploration_min=exploration_min)

run_simulation(num_runs=1)

Task ID00002 of workflow 0 completed on iot_5 at time 0.30889999999999995
Task ID00002 of workflow 1 completed on iot_1 at time 0.30889999999999995
Task ID00003 of workflow 1 completed on fog_7 at time 0.3226225
Task ID00004 of workflow 1 completed on iot_1 at time 0.32482249999999996
Task ID00005 of workflow 1 completed on fog_7 at time 0.33851749999999997
Task ID00006 of workflow 1 completed on iot_9 at time 0.33967749999999997
Task ID00003 of workflow 0 completed on iot_7 at time 0.41867999999999994
Task ID00007 of workflow 1 completed on iot_1 at time 0.43757749999999995
Task ID00008 of workflow 1 completed on iot_0 at time 0.43889749999999994
Task ID00004 of workflow 0 completed on iot_1 at time 0.43977749999999993
Task ID00009 of workflow 1 completed on fog_6 at time 0.45187749999999993
Task ID00010 of workflow 1 completed on fog_2 at time 0.45206749999999996
Task ID00005 of workflow 0 completed on iot_1 at time 0.5493374999999999
Task ID00006 of workflow 0 completed on fog_5 at 