<a href="https://colab.research.google.com/github/Leonnello/sjf-memory-placement-sim/blob/main/ITS150L_SA2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

ITS150L_FOPI01_1Q2425<br>
Members:<br>
Celina Therese Tabano<br>
Noel Allen Elises

In [1]:
#### Imports
import random
import time

#visualization
import ipywidgets as widgets
from IPython.display import display, clear_output

In [2]:
class Process:
  position = None

  def __init__(self, pid, burst_time, memory_size):
    self.pid = pid
    self.burst_time = burst_time
    self.memory_size = memory_size

  # decrease remaining time
  def run(self):
    self.burst_time -= 1

    if self.burst_time == 0:
      self.position.isFree = True
      self.position = None


In [3]:
class MemoryBlock:
  def __init__(self, start, end, isFree: bool = True):
    self.start = start
    self.end = end
    self.isFree = isFree

  def get_size(self):
    return self.end - self.start + 1

In [4]:
##### GLOBAL VARIABLES
main_memory_size = 0

# counter for pid
pid_count = 1

# array of all process to be scheduled
current_processes = []

# array for all memory blocks
memory_blocks = []

run_time = 0
sc_time = 0
ch_time = 0

In [5]:
def generate_blocks(size : int):
  # init rng
  random.seed()

  # select random num of blocks
  n_blocks = 0
  if size <= 3:
    n_blocks = 1
  elif size > 3 and size <= 500:
    n_blocks = random.randrange(3, 10)
  else:
    n_blocks = random.randrange(4, round(size/100))

  end_pos = random.sample(range(1,size), n_blocks)
  end_pos.sort()

  print(end_pos)

  # divide and create blocks
  for i in range(len(end_pos)):
    if i == 0:
      memory_blocks.append(MemoryBlock(0, end_pos[i]))
    elif i == len(end_pos) - 1:
      memory_blocks.append(MemoryBlock(memory_blocks[i-1].end + 1, size - 1))
    else:
      memory_blocks.append(MemoryBlock(memory_blocks[i-1].end + 1, end_pos[i]))


In [6]:
# function to input new jobs to queue
bt_input = widgets.BoundedIntText(min=1, max=500, description=f"Burst Time")
size_input = widgets.BoundedIntText(min=1, max=500, description=f"Memory Size")
def new_job(e):
  global pid_count
  global bt_input, size_input

  burst_time = bt_input.value
  memory_size = size_input.value

  current_processes.append(Process(pid_count,burst_time, memory_size))
  print(f"Successfully added Process#{pid_count} {burst_time}ms {memory_size}kb")
  pid_count += 1

In [7]:
def pass_time(processes):
  global run_time, ch_time, sc_time

  # pause interval and increase run_time
  time.sleep(0.1)
  run_time += 1
  msg = ""

  if run_time != 0 and run_time % ch_time == 0:
    coalesce()
    repaint()
    display(gui)
    msg = f"CH at time: {run_time}"

  if run_time != 0 and run_time % sc_time == 0:
    compact()
    repaint()
    display(gui)
    msg += f"\nSC at time: {run_time}"

  # pass time for each process that is already fitted in memory block
  for n in range(len(processes)):
    if processes[n] == None:
      continue

    if processes[n].position != None:
      processes[n].run()

    if processes[n].burst_time == 0:
      repaint()
      display(gui)
      msg += f"\nProcess#{processes[n].pid} completed"
      processes[n] = None

  if msg != "":
    print(msg)
    # pause so print won't zoom
    time.sleep(0.5)


In [8]:
# function to send current_processes for sjf scheduling
# run this whenever all the processes in a sjf batch is complete?
def sjf_scheduling(processes):
  global run_time, ch_time, sc_time
  # clear array for next batch
  global current_processes
  current_processes.clear()

  # sort processes based on Burst Time
  processes = sorted(processes, key=lambda process: process.burst_time)

  # fit jobs in memory blocks
  for i in range(len(processes)):
    if processes[i] == None:
      continue
    # pause interval
    time.sleep(0.3)
    print(f"PID {processes[i].pid}: {processes[i].burst_time}s {processes[i].memory_size}Kb")

    while not best_fit(processes[i]):
    # if can't fit, pass time till there is available hole
      pass_time(processes)

    # pass time every successful fit
    pass_time(processes)

  # pass time until all jobs are done
  for i in range(len(processes)):
    while processes[i] != None:
      pass_time(processes)

  print("Done executing")
  print(f"Time completed: {run_time}s")


In [9]:
def best_fit(job_to_fit) -> bool:
  global memory_blocks
  difference = None
  block_index = -1

  for i in range(len(memory_blocks)):
    if memory_blocks[i].isFree and memory_blocks[i].get_size() >= job_to_fit.memory_size:
      current_diff = memory_blocks[i].get_size() - job_to_fit.memory_size
      if difference == None or current_diff < difference:
        difference = current_diff
        block_index = i

  # if fit is found
  if block_index != -1:
    # place job in the memory_block
    selected_block = memory_blocks[block_index]
    job_to_fit.position = selected_block
    selected_block.isFree = False

    end_pos = selected_block.end
    selected_block.end = selected_block.start + job_to_fit.memory_size - 1

    # create block for the remaining hole
    if difference != None and difference > 0:
      memory_blocks.append(MemoryBlock(selected_block.end + 1, end_pos))

    return True

  return False


In [10]:
def is_in_array(array, element, *args) -> bool:
  for n in range(len(array)):
    match args:
      case ("start","end"):
        if array[n].start == element.start and array[n].end == element.end:
          return True

  return False

In [11]:
# coalesce
def coalesce():
  global memory_blocks
  coalesced_blocks = []

  # sort by start #
  memory_blocks = sorted(memory_blocks, key=lambda block: block.start)

  if len(memory_blocks) == 1:
    coalesced_blocks = memory_blocks
  else:
    prev = None
    for i in range(len(memory_blocks) - 1):
      current = memory_blocks[i]
      next = memory_blocks[i+1]

      if prev is not None:
        # if previous was merged, update current
        if prev.isFree and prev.end == current.end:
          current = prev

      if current.isFree and next.isFree:
        # if prev is current, meaning prev and current will be merged
        # delete duplicate from final array
        if prev is current:
          coalesced_blocks.pop()

        current.end = next.end

      if not is_in_array(coalesced_blocks, current, "start", "end"):
        coalesced_blocks.append(current)

      if i == len(memory_blocks) - 2:
        if not is_in_array(coalesced_blocks, next, "start", "end"):
          coalesced_blocks.append(next)

      prev = current

  memory_blocks = coalesced_blocks.copy()


In [12]:
def compact():
  global memory_blocks

  # #ignore function based on time interval
  # if run_time == 0 or run_time % 20 != 0: return

  memory_blocks = sorted(memory_blocks, key=lambda block: block.start)

  # count non-free blocks
  # process_count = sum(1 for block in memory_blocks if not block.isFree)

  #get all non-free blocks (biggest to smallest) then clear
  process_blocks = sorted([block for block in memory_blocks if not block.isFree], key=lambda block: block.get_size(), reverse = True)
  memory_blocks.clear()

  end = -1 # for first loop

  #change positions then append
  for block in process_blocks:
    start = end + 1
    end = block.get_size() - 1
    memory_blocks.append(MemoryBlock(start, end, False))

  #compacted hole
  start = end + 1
  end = main_memory_size - 1
  memory_blocks.append(MemoryBlock(start, end, True))

In [None]:
#v2 using html - implementation below

import ipywidgets as widgets
from IPython.display import display, HTML

#displays memory in a single frame
def repaint():
  global memory_blocks
  global main_memory_size
  currentPos = 0

  width_ratio = main_memory_size/ 1000
  blocks_html = ''

  for block in memory_blocks:
    p2 = int(block.get_size() * width_ratio)
    fill_color = 'transparent' if block.isFree else '#FFAE00'

    # label_pos = (currentPos[0] + 5, currentPos[1] + 90)
    # get_status = "Free" if block.isFree else "Allocated"
    # drawObj.text(label_pos, f"[{block.start},{block.end}]\n{get_status}", fill="black")

    blocks_html += f'<div style="border: 5px solid #000000; width: {p2}px; height: 100px; background-color: {fill_color}; position: absolute; top: 48px; left: {currentPos}px;"></div>'

    #increment pointer with block size
    currentPos += int(block.get_size() * width_ratio)

    # with visual:
    #visual.clear_output(wait=True)
    clear_output(wait=True)

  # statics
  text_memory = '<h2>Memory</h2>'
  memory_rect = f'<div style="border: 5px solid #000000; width: 1000px; height: 100px;">{blocks_html}</div>'

  html_content = widgets.HTML(text_memory + memory_rect)
  display(html_content)

In [None]:
def on_run(e):
  # global current_processes
  global main_memory_size, sc_time, ch_time

  if len(current_processes) != 0:
    # update values on run
    main_memory_size = mem_input.value
    sc_time = compact_input.value
    ch_time = coalesce_input.value

    generate_blocks(main_memory_size)
    sjf_scheduling(current_processes.copy())

add_job_button = widgets.Button(description="Add Job")
add_job_button.on_click(new_job)
run_button = widgets.Button(description="Run SJF")
run_button.on_click(on_run)

In [None]:
repaint()

mem_input = widgets.BoundedIntText(value=1000, min=1, max=1000, description="Memory Size(kb):")
compact_input = widgets.BoundedIntText(value=20, min=1, description="Compaction Interval(ms):")
coalesce_input = widgets.BoundedIntText(value=5, min=1, description="Coalesce Interval(ms):")

display(widgets.HBox([mem_input, compact_input, coalesce_input]))

gui = widgets.Output()
with gui:
  display(widgets.HBox([bt_input, size_input, add_job_button]))
  display(widgets.HBox([run_button]))

display(gui)

HTML(value='<h2>Memory</h2><div style="border: 5px solid #000000; width: 1000px; height: 100px;"><div style="b…

Output()

CH at time: 235
Process#3 completed
Done executing
Time completed: 235s
