<a href="https://colab.research.google.com/github/Zahra-FallahMMA/BookStore/blob/master/Random_agent.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# import Libraries

In [11]:
import xml.etree.ElementTree as ET
from io import StringIO
import os
import re
import numpy as np
from collections import deque, defaultdict
import random
from keras.models import Sequential
from keras.layers import Dense, Dropout
from keras.optimizers import Adam
from keras.models import load_model
from itertools import product
import tensorflow as tf
!pip install simpy
import simpy

# Set TensorFlow logging level to suppress detailed logs
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
tf.get_logger().setLevel('ERROR')



# class RandomAgent

In [12]:
class RandomAgent:

  def __init__(self, action_size):
      self.action_size = action_size

  def choose_action(self):
        return random.randrange(self.action_size)



# class Task

In [13]:
class Task:
    def __init__(self, id, instructions, workflow_id):
        self.id = id
        self.instructions = instructions  # Execution time or computational instructions
        self.children = []  # List of tasks that depend on this task
        self.parents = []  # List of tasks this task depends on
        self.executed = False  # Status of execution
        self.executed_on = None  # Node this task was executed on
        self.execution_time = 0  # Time taken to execute the task
        self.cost = 0  # Cost of executing the task
        self.comm_delay = 0  # Communication delay in seconds
        self.workflow_id = workflow_id  # Workflow identifier to which this task belongs


# class Workflow

In [14]:
class Workflow:
    def __init__(self, id):
        self.id = id  # Workflow identifier
        self.tasks = {}  # Dictionary of tasks in the workflow

    def add_task(self, task_id, instructions, parent_ids=[]):
        if task_id not in self.tasks:
            self.tasks[task_id] = Task(task_id, instructions, self.id)
        task = self.tasks[task_id]
        for parent_id in parent_ids:
            if parent_id not in self.tasks:
                self.tasks[parent_id] = Task(parent_id, 0, self.id)
            parent_task = self.tasks[parent_id]
            parent_task.children.append(task)
            task.parents.append(parent_task)

# class parse_dax

In [15]:
def parse_dax(file_path, workflow_id):
    tree = ET.parse(file_path)
    root = tree.getroot()

    workflow_id = workflow_id
    workflow = Workflow(workflow_id)

    # Parse jobs
    jobs = {job.attrib['id']: job for job in root.findall('{http://pegasus.isi.edu/schema/DAX}job')}

    # Add jobs to workflow
    for job_id, job in jobs.items():
        instructions = float(job.attrib.get('runtime', 0))
        workflow.add_task(job_id, instructions)

    # Parse dependencies
    for child in root.findall('{http://pegasus.isi.edu/schema/DAX}child'):
        child_id = child.attrib['ref']
        parent_ids = [parent.attrib['ref'] for parent in child.findall('{http://pegasus.isi.edu/schema/DAX}parent')]
        workflow.add_task(child_id, 0, parent_ids)  # Adds a child node with its parent nodes, setting instructions to 0 to avoid overwrite

    return workflow


def ensemble_of_workflows(name, size=10, distribution='constant', dax_path=''):
    ws = []
    ensemble = []
    directory_path = dax_path  # Directory containing DAX files

    # List and filter files in directory
    files = os.listdir(directory_path)
    filtered_files = [file for file in files if name in file]

    if distribution == 'constant':
        pattern = r'100(?!\d)'
        for s in filtered_files:
            if re.search(pattern, s):
                ensemble = [s] * size  # Replicate the matched file 'size' times
                break
    else:
        numbers = np.random.randint(0, len(filtered_files), size)
        ensemble = [filtered_files[i] for i in numbers]  # Select random files based on uniform distribution
    w_id = 0
    for name in ensemble:
        ws.append(parse_dax(dax_path+name,w_id))
        w_id = w_id + 1

    return ws

# Loading dax_file

In [16]:
from google.colab import drive
import glob

drive.mount('/content/drive')
folder_path = '/content/drive/My Drive/Zahra/dax/'

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# class Device

In [17]:
class Device:
    def __init__(self, id, mips, cost_per_hour, env):
        self.id = id
        self.mips = mips
        self.cost_per_hour = cost_per_hour
        self.queue = deque()
        self.runnig_queue = deque()
        self.resource = simpy.Resource(env, capacity=1)

    def add_task_to_queue(self, task):
        self.queue.append(task)

    def get_next_task(self):
        return self.queue.popleft() if self.queue else None

    def waiting_time(self):
        waiting_time = 0
        for t in self.queue:
            waiting_time += t.instructions / self.mips
        return waiting_time


# class FogEnv

In [22]:
import simpy

class FogEnv:
    def __init__(self, env, iot_devices, fog_nodes, cloud_servers, workflows):
        self.env = env
        self.iot_devices = iot_devices
        self.fog_nodes = fog_nodes
        self.cloud_servers = cloud_servers
        self.cost = 0
        self.completed_workflows = 0
        self.workflows = workflows

    def assign_task(self, task, device):
        with device.resource.request() as req:
            yield req
            instructions = task.instructions
            execution_time = instructions / device.mips
            yield self.env.timeout(execution_time)
            self.cost += execution_time * device.cost_per_hour
            task.executed = True
            task.execution_time = execution_time
            device.queue.popleft()
            # print(f"Task {task.id} of workflow {task.workflow_id} completed on {device.id} at time {self.env.now}")

            # Check if the workflow is completed
            workflow = next(wf for wf in self.workflows if wf.id == task.workflow_id)
            self.check_workflow_completion(workflow)


    def get_device_by_id(self, device_id):
        for device in self.iot_devices + self.fog_nodes + self.cloud_servers:
            if device.id == device_id:
                return device
        return None

    def check_workflow_completion(self, workflow):
        # Check if all tasks in the workflow are executed
        if all(task.executed for task in workflow.tasks.values()):
            self.completed_workflows += 1  # Increment completed workflows counter
            # print(f"Workflow {workflow.id} is completed! Total completed workflows: {self.completed_workflows}")





def process_workflow(env, workflow, fog_env, agent):
      while(True):
        if all([task.executed for task in workflow.tasks.values()]):
          break
        for task in workflow.tasks.values():
            if task.executed:
              continue
            if all([parent.executed for parent in task.parents]) or task.parents == []:
                action = agent.choose_action()
                devices = fog_env.iot_devices + fog_env.fog_nodes + fog_env.cloud_servers
                device = devices[action]
                device.add_task_to_queue(task)
                yield env.process(fog_env.assign_task(task, device))

# class NewSim

In [19]:
class  NewSim:
    def __init__(self, num_iot, num_fog, num_server):
        self.num_iot = num_iot
        self.num_fog = num_fog
        self.num_server = num_server
        self.num_totla_dev = num_iot + num_fog + num_server
        self.env = simpy.Environment()
        self.reset()
        self.run()

    def reset(self):
        self.iot_devices = [Device(f'iot_{i}', 500, 0, self.env) for i in range(self.num_iot)]
        self.fog_devices = [Device(f'fog_{i}', 4000, 1, self.env) for i in range(self.num_fog)]
        self.server_devices = [Device(f'server_{i}', 6000, 8, self.env) for i in range(self.num_server)]
        self.agent = RandomAgent(action_size=self.num_totla_dev)
        self.workflows = ensemble_of_workflows(name = 'CyberShake', size=100, distribution = 'constant', dax_path="/content/drive/My Drive/Zahra/dax/")

    def run(self):
        self.fog_env = FogEnv(self.env, self.iot_devices, self.fog_devices, self.server_devices,self.workflows)
        for workflow in self.workflows:
                self.env.process(process_workflow(self.env, workflow, self.fog_env, self.agent))

        self.env.run()  # Run simulation for a time period


# run simulation

In [20]:
simulation = NewSim(num_iot=10, num_fog=8, num_server=5)
print(simulation.fog_env.cost)

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Task ID00052 of workflow 49 completed on server_0 at time 15.170854166666695
Task ID00060 of workflow 36 completed on iot_3 at time 15.171636666666696
Task ID00045 of workflow 33 completed on fog_3 at time 15.176920000000027
Task ID00046 of workflow 33 completed on fog_2 at time 15.177085000000028
Task ID00053 of workflow 49 completed on server_1 at time 15.177644166666695
Task ID00054 of workflow 49 completed on fog_3 at time 15.177936666666696
Task ID00057 of workflow 89 completed on server_2 at time 15.179839166666694
Task ID00058 of workflow 89 completed on fog_3 at time 15.185411666666694
Task ID00059 of workflow 89 completed on fog_6 at time 15.185651666666693
Task ID00070 of workflow 3 completed on iot_6 at time 15.202850833333352
Task ID00080 of workflow 47 completed on iot_6 at time 15.204850833333353
Task ID00071 of workflow 3 completed on server_0 at time 15.207760833333353
Task ID00060 of workflow 57 completed

# Data Collection

In [24]:
def run_simulation_with_results_tracking():

    # Workflow configurations with specific sizes
    workflow_configs = {
        'CyberShake': [30, 50, 100, 1000],
        'Montage': [20, 40, 60, 80, 100, 200, 300],
        'Inspiral': [30, 50, 100, 1000],
        'Sipht': [29, 58, 100, 968]
    }
    workflow_distributions = ['constant', 'uniform']
    dax_path = "/content/drive/My Drive/Zahra/dax/"

    # Dictionary to store results
    results = {}

    # Loop through configurations
    for name, sizes in workflow_configs.items():
        for size in sizes:
            for distribution in workflow_distributions:
                print(f"Running simulation for Workflow: {name}, Size: {size}, Distribution: {distribution}")

                # Set up the simulation with the current parameters
                simulation = NewSim(
                    num_iot=10,
                    num_fog=8,
                    num_server=5,
                )

                # Update the workflow parameters
                simulation.workflows = ensemble_of_workflows(
                    name=name,
                    size=size,
                    distribution=distribution,
                    dax_path=dax_path
                )

                # Run the simulation
                simulation.run()

                # Save results in the dictionary
                key = (name, size, distribution)
                results[key] = {
                    'total_cost': simulation.fog_env.cost,
                    'total_time': simulation.env.now  # Assuming `env.now` tracks simulation time
                }

                # Print results for current run
                print(f"Total cost for Workflow {name} ({distribution}, size={size}): {results[key]['total_cost']}")
                print(f"Total time for Workflow {name} ({distribution}, size={size}): {results[key]['total_time']}")

    # Print summary of results
    print("\nSummary of Results:")
    for key, value in results.items():
        name, size, distribution = key
        print(f"Workflow: {name}, Size: {size}, Distribution: {distribution} -> Cost: {value['total_cost']}, Time: {value['total_time']}")


In [25]:
run_simulation_with_results_tracking()

Running simulation for Workflow: CyberShake, Size: 30, Distribution: constant
Total cost for Workflow CyberShake (constant, size=30): 34.09993416666667
Total time for Workflow CyberShake (constant, size=30): 44.829170833333336
Running simulation for Workflow: CyberShake, Size: 30, Distribution: uniform
Total cost for Workflow CyberShake (uniform, size=30): 69.21489250000015
Total time for Workflow CyberShake (uniform, size=30): 72.51855833333339
Running simulation for Workflow: CyberShake, Size: 50, Distribution: constant
Total cost for Workflow CyberShake (constant, size=50): 60.771981666666605
Total time for Workflow CyberShake (constant, size=50): 55.70866749999991
Running simulation for Workflow: CyberShake, Size: 50, Distribution: uniform
Total cost for Workflow CyberShake (uniform, size=50): 89.72427083333385
Total time for Workflow CyberShake (uniform, size=50): 76.10841333333353
Running simulation for Workflow: CyberShake, Size: 100, Distribution: constant
Total cost for Workfl