In [1]:
class Job():
    '''
    This class aims to represent jobs that are to be assigned to nodes
    The basic characteristics of nodes jobs are CPU request,
    memory request and file size to send to the master. These numbers represent the max,
    resoruces the job will use from the node
    '''
    
    def __init__(self, id_job, cpu_request, memory_request, file_size):
        self.id_job = id_job
        self.cpu_request = cpu_request
        self.memory_request = memory_request
        self.file_size = file_size
    
    def set_cpu_request(self, cpu_request):
        ''' This method sets the CPU request of the job'''
        self.cpu_request = cpu_request
        
    def set_memory_request(self, memory_request):
        ''' This method sets the memory request of the job'''
        self.memory_request = memory_request
        
    def set_file_size(self, file_size):
        ''' 
        This method sets the file size of the job
        For now the file_size is represented as standard integer units.
        More info in bw method of class Node()
        '''
        self.file_size = file_size
    
    def get_id_job(self):
        '''This method returns the id of the job'''
        return self.id_job
        
    def get_cpu_request(self):
        ''' This method returns CPU request of the job'''
        return self.cpu_request
    
    def get_memory_request(self):
        ''' This method returns the memory request'''
        return self.memory_request
    
    def get_file_size(self):
        ''' This method returns the file size'''
        return self.file_size

In [2]:
class Node():
    '''
    This class aims to represente computing nodes of a cluster
    The basic properties of the nodes are CPU, memory and bandwidth (BW)
    BW is considered the capacity of data transmision between the node and an hypothetic node where data is sent
    '''
    
    def __init__(self, cpu_capacity, memory_capacity, bw):
        self.cpu_capacity = cpu_capacity
        self.memory_capacity = memory_capacity
        self.bw = bw
        self.cpu_available = cpu_capacity
        self.memory_available = memory_capacity
        self.cpu_used = 0
        self.memory_used = 0
        self.jobs = []
        
    def set_cpu_capacity(self, cpu_capacity):
        '''
        This method sets the total CPU capacity of the node
        The CPU units are in milicores: 1000m = 1core
        Whenever this method is call the available CPU gets to 100%. The jobs are released
        '''
        if cpu_capacity - self.cpu_used < 0:
            print('CPU resources cannot be reduced')
        else:
            self.cpu_capacity = cpu_capacity
            self.update_available_resources()
    
    def set_memory_capacity(self, memory_capacity):
        '''
        This method sets the total memory capacity of the node
        The memory units are MB: 1GB = 1000MB = 1000000KB
        Whenever this method is call the available CPU gets to 100%. The jobs are released
        '''
        if memory_capacity - self.memory_used < 0:
            print('Memory resources cannot be reduced')
        else:
            self.memory_capacity = memory_capacity
            self.update_available_resources()
        
    def set_bw(self, bw):
        '''
        This method sets the total bw capacity of the link b
        For now the bandwidth is represented as the number of file_size units
        the node is capable to consume in one time step
        '''
        self.bw = bw
        
    def get_cpu_capacity(self):
        '''This method returns the cpu capacity of the node'''
        return self.cpu_capacity
    
    def get_memory_capacity(self):
        '''This method returns the memory capcity of the node'''
        return self.memory_capacity
    
    def get_bw(self):
        '''This method returns the bw'''
        return self.bw
    
    def get_cpu_used(self):
        '''This method returns the used CPU'''
        return self.cpu_used
        
    def get_memory_used(self):
        '''This method returns the used memory'''
        return self.memory_used
    
    def get_cpu_available(self):
        '''This method returns the available CPU'''
        return self.cpu_available
    
    def get_memory_available(self):
        '''This method returns the available memory'''
        return self.memory_available
    
    def update_available_resources(self):
        '''This method updates the available resources'''
        self.cpu_available = self.cpu_capacity - self.cpu_used
        self.memory_available = self.memory_capacity - self.memory_used
        
    def consume_resources(self, job):
        '''
        This method decreases the available cpu and memory
        based on the appended job CPU and memory requests.
        '''
        self.cpu_used += job.get_cpu_request()
        self.memory_used += job.get_memory_request()
        self.update_available_resources()
        
    def release_resources(self, job):
        ''' 
        This method calculates and returns the file transfer time
        given the file size and the bw of the node
        '''
        self.cpu_used -= job.get_cpu_request()
        self.memory_used -= job.get_memory_request()
        self.update_available_resources()
        
    def transfer_duration(self, job):
        ''' 
        This method calculates and returns the file transfer time
        given the file size and the bw of the node
        '''
        total_transfer_duration = job.get_file_size() / self.bw
        return total_transfer_duration
    
    def check_resources(self, job):
        '''
        This method returns a False if the node has not enough available resources to allocate the job.
        Returns True otherwise
        '''
        cpu_resources = job.get_cpu_request() > self.get_cpu_available()
        memory_resources = job.get_memory_request() > self.get_memory_available()
        
        if cpu_resources or memory_resources:
            return False
        else:
            return True
            
    def append_job(self, job):
        '''
        This method appends the job passed as an argument to the node.
        The available resources in the node are updated.
        The transfer time of the file attached to job is calculated and returned
        '''
        if self.check_resources(job):
            self.jobs.append(job)
            self.consume_resources(job)
            total_transfer_duration = self.transfer_duration(job)
            return total_transfer_duration
        
        else:
            print('The job cannot be allocated. There is not enough available resoruces in the node.')
    
    def terminate_job(self, job):
        '''
        This method releases the specified job running in the node
        The available resources in the node are updated
        '''
        self.jobs.remove(job)
        self.release_resources(job)
        

## First experiment

In this first experiment the goal is to check that both classes work properly and can interact between them. The main components will be:

1. Two nodes: Each node will have different charactersitics

    * Node's CPU
    * Node's memory
    * Node's BW

2. Two types of jobs: Each type will have different characteristics

    * Jobs CPU
    * Jobs memory
    * jobs file_size
* jobs will be created and appended in each of the nodes dynamically at different time steps
* Number of jobs created at each time step
* jobs that do not have space will wait in a buffer
* Maximum jobs in the buffer

This process will last for a certain number of timesteps. Data of the deplyment will be stored for later visualization and tracking of the performance.


## Tests

In [72]:
node_1 = Node(cpu_capacity=2, memory_capacity=1000, bw=20)
job_1 = Job(cpu_request=0.5, memory_request=750, file_size=800)
job_2 = Job(cpu_request=1.5, memory_request=100, file_size=1300)

In [73]:
job_2 = Job(cpu_request=1.5, memory_request=100, file_size=1300)

In [60]:
print(node_1.append_job(job_1))
node_1.append_job(job_2)

40.0
The job cannot be allocated. There is not enough available resoruces in the node.


In [61]:
print(node_1.get_memory_available())
node_1.set_memory_capacity(memory_capacity=900)
print(node_1.get_memory_available())

250
150


In [62]:
node_1.terminate_job(job_1)
print(node_1.get_cpu_used())
print(node_1.get_memory_used())

0.0
0


In [74]:
node_1.append_job(job_2)

65.0

In [75]:
node_1.get_cpu_capacity()

2

In [76]:
print(node_1.get_cpu_available())
print(node_1.get_memory_available())

0.5
900


In [77]:
node_1.set_cpu_capacity(cpu_capacity=1)

CPU resources cannot be reduced


In [69]:
node_1.get_cpu_used()

1.5