In [None]:
import docker
from time import sleep
from typing import List, Dict, Any
import time
from os import getpid

import psutil

from datetime import datetime

now = datetime.now()
formatted = now.isoformat()
print(formatted)


In [2]:
docker_client = docker.from_env()

In [3]:
image_dict = {
    "blackscholes": "anakli/cca:parsec_blackscholes",
    "canneal": "anakli/cca:parsec_canneal",
    "dedup": "anakli/cca:parsec_dedup",
    "ferret": "anakli/cca:parsec_ferret",
    "freqmine": "anakli/cca:parsec_freqmine",
    "radix": "anakli/cca:splash2x_radix",
    "vips": "anakli/cca:parsec_vips",
}

In [4]:
p = psutil.Process(getpid())
p.cpu_affinity([0])

In [5]:
import numpy as np

poll_interval = 0.1  # seconds
change_interval = 8 # seconds

class DummyMemcachedStats:

    def __init__(self):
        self.measurement = 0
        self.measurement_time = time.time()

    def read(self):
        if time.time() - self.measurement_time > change_interval:
            self.measurement = np.random.randint(0, 180000)
            self.measurement_time = time.time()
    
    # queries received in the last count*10ms
    def last_measurements(self, count=10):
        self.read()
        return self.measurement

    def qps(self):
        return self.last_measurements(int(1/poll_interval))


In [6]:
# tmp = DummyMemcachedStats()
# for i in range(20):
#    print(tmp.last_measurements())
#    sleep(1)

In [7]:
scaling = [1.70, 1.70, 1.70, 1.95, 1.95, 1.95, 1.95]
duration = [100, 220, 16, 288, 394, 43, 82]
interference = [9, 9, 10, 11, 11, 8, 11]

jobs = list(zip(image_dict.keys(), scaling, duration, interference))

start_queue = sorted(jobs, key=lambda x: (x[3], -x[2], x[1]), reverse=False)
print("Starting queue:")
for job in start_queue:
    print(f"  {job[0]}: {job[1]}, {job[2]} {job[3]}")

Starting queue:
  radix: 1.95, 43 8
  canneal: 1.7, 220 9
  blackscholes: 1.7, 100 9
  dedup: 1.7, 16 10
  freqmine: 1.95, 394 11
  ferret: 1.95, 288 11
  vips: 1.95, 82 11


In [8]:
class Job:
    def __init__(self, name, scaling, duration, inteference):
        self.name = name
        self.scaling = scaling
        self.duration = duration
        self.interference = inteference
        self.image_name = image_dict[name]
        self.container = None
        self.cpuset_cpus = ""
        self.start_time = None  # Initialize start_time to None

    def __repr__(self):
        return f"Job({self.name}, {self.scaling}, {self.duration}, {self.interference})"
    
    def is_scaling_job(self):
        return self.scaling > 1.9 and self.interference > 10
    
    def set_container(self, container):
        self.container = container
        self.start_time = time.time()
    
    def update_cpusets_cpu(self, additional_cpus):
        self.cpuset_cpus += f",{additional_cpus}" if self.cpuset_cpus else additional_cpus

        self.container.reload()
        if self.container.status == 'running':
            self.container.update(cpuset_cpus=self.cpuset_cpus)

    def remove_cpu(self, cpu):
        # Fix the remove_cpu method
        cpu_list = self.cpuset_cpus.split(",")
        if cpu in cpu_list:
            cpu_list.remove(cpu)
            self.cpuset_cpus = ",".join(cpu_list)
            self.container.update(cpuset_cpus=self.cpuset_cpus)

    def runtime(self):
        if self.start_time:
            return time.time() - self.start_time
        return 0
    
    def is_finished(self):
        if not self.container:
            return False        
        try:
            self.container.reload()
            return self.container.status == 'exited'
        except:
            # Container might be removed already
            return True

In [9]:
start_queue = [Job(*job) for job in start_queue]
curr_jobs: List[Job] = []

avail_cpus = ["2", "3"]


cpu_1_used = False
cpu_1_job = None

mem_cached_measurments = DummyMemcachedStats()

memcached_id = 0#[p.pid for p in psutil.process_iter(['name']) if p.info['name'] == 'memcached'][0]
#memcached = psutil.Process(memcached_id)

client = docker.from_env()

polling_interval = 0.1
prev_qps = 0

while len(start_queue) > 0 or len(curr_jobs) > 0:

    # Check if any job has finished
    for job in curr_jobs:
        if job.is_finished():
            print(f"Job {job.name} completed after {job.runtime():.2f} seconds")

            # Free up the CPUs that were allocated to this job
            for cpu in job.cpuset_cpus.split(","):
                if cpu and cpu not in avail_cpus:
                    avail_cpus.append(cpu)
            
            curr_jobs.remove(job)
            if job == cpu_1_job:
                cpu_1_job = None

            print(f"Released CPUs: {job.cpuset_cpus}. Available CPUs: {avail_cpus}")

    # Get the current CPU and QPS
    cpu_cores_usage = psutil.cpu_percent(interval=None, percpu=True)
    qps = mem_cached_measurments.last_measurements()

    if abs(prev_qps - qps) > 10000:
        print(f"New QPS: {qps}")
        prev_qps = qps

    if qps < 100000 and not cpu_1_used and not ("1" in avail_cpus):
        # print("Low QPS, assigning more CPUS...")
        cpu_1_used = True
        
        avail_cpus.insert(0, "1")
        # memcached.cpu_affinity([0])
        print(f"Setting CPU affinity of Memcached to 0 with QPS {qps}")
    elif ("1" in avail_cpus or cpu_1_used) and not (qps < 100000):
        # print("High QPS, releasing CPU 1...")
        
        cpu_1_used = False

        if "1" in avail_cpus:
            avail_cpus.remove("1")
        # memcached.cpu_affinity([0, 1])
        print(f"Setting CPU affinity of Memcached to 0, 1 with QPS {qps}")

        if cpu_1_job is None:
            continue
    
        if len(cpu_1_job.cpuset_cpus.split(",")) == 1:
            print(f"Pausing job {cpu_1_job.name} on CPU 1 due to high QPS")
            cpu_1_job.container.pause()
            curr_jobs.remove(cpu_1_job)
            start_queue.insert(0, cpu_1_job)
            cpu_1_job.cpuset_cpus = ""
            cpu_1_job = None
        else:
            cpu_1_job.remove_cpu("1")
            print(f"Removing CPU 1 from job {cpu_1_job.name} due to high QPS")

    if len(avail_cpus) == 0:
        # print("No available CPUs, waiting...")

        sleep(polling_interval)
        continue

    print(f"Available CPUs: {avail_cpus}")
            
    avail_cpu = avail_cpus.pop(0)

    # If we have a scaling job, we do not need to pop
    if len(start_queue) == 0:
        scaling_jobs = [job for job in curr_jobs if job.is_scaling_job()] or curr_jobs
    else:
        scaling_jobs = [job for job in curr_jobs if job.is_scaling_job()]

    if len(scaling_jobs) > 0:
        scaling_job = scaling_jobs[0]

        scaling_job.update_cpusets_cpu(avail_cpu)

        cpuset_cpus = scaling_job.cpuset_cpus

        print(f"Added CPU {avail_cpu} to {scaling_job.name}")
        continue
    
    if len(start_queue) == 0:
        # No more jobs to start
        sleep(polling_interval)
        continue

    #
    job = start_queue.pop(0)

    # Job already started but was paused
    if job.container is not None:
        print(f"Job {job.name} unpaused and assigned to CPU {avail_cpu}")
        curr_jobs.append(job)
        job.container.update(cpuset_cpus=avail_cpu)
        job.cpuset_cpus = avail_cpu
        job.container.unpause()
        
        if avail_cpu == "1":
            cpu_1_job = job

        continue

    run_command = (
        "./run -a run -S splash2x -p radix -i native -n 1"
        if job.name == "radix"
        else f"./run -a run -S parsec -p {job.name} -i native -n {3 if job.is_scaling_job() else 1}"
    )
    
    container = client.containers.run(
        image=job.image_name,
        command=run_command,
        detach=True,
        remove=True,
        name="parsec-" + job.name,
        cpuset_cpus=avail_cpu,
    )

    print(f"Started job {job.name} on CPU {avail_cpu}")
    job.set_container(container)
    job.cpuset_cpus = avail_cpu
    curr_jobs.append(job)

    if avail_cpu == "1":
        cpu_1_job = job

Setting CPU affinity of Memcached to 0 with QPS 0
Available CPUs: ['1', '2', '3']
Started job radix on CPU 1
Available CPUs: ['2', '3']
Started job canneal on CPU 2
Available CPUs: ['3']
Started job blackscholes on CPU 3
New QPS: 154750
Setting CPU affinity of Memcached to 0, 1 with QPS 154750
Pausing job radix on CPU 1 due to high QPS
New QPS: 50247
Setting CPU affinity of Memcached to 0 with QPS 50247
Available CPUs: ['1']
Job radix unpaused and assigned to CPU 1
New QPS: 122496
Setting CPU affinity of Memcached to 0, 1 with QPS 122496
Pausing job radix on CPU 1 due to high QPS
New QPS: 112255
New QPS: 138978
New QPS: 54779
Setting CPU affinity of Memcached to 0 with QPS 54779
Available CPUs: ['1']
Job radix unpaused and assigned to CPU 1
New QPS: 141026
Setting CPU affinity of Memcached to 0, 1 with QPS 141026
Pausing job radix on CPU 1 due to high QPS
New QPS: 174348
New QPS: 108063
New QPS: 30501
Setting CPU affinity of Memcached to 0 with QPS 30501
Available CPUs: ['1']
Job radix