In [None]:
import logging, uuid, time, csv, os

from kubernetes import client, config, watch

logging.basicConfig(level=logging.INFO)
config.load_kube_config("/home/goughes/k8s/configs/erikdev-admin.yaml")
core_api = client.CoreV1Api()
batch_api = client.BatchV1Api()
apps_api = client.AppsV1Api()
# create results directory structure
os.makedirs("results", exist_ok=True)
experiments = ["E1", "E2", "E3", "E4", "E5"]
for e in experiments:
    os.makedirs("results/" + e, exist_ok=True)

## Kubernetes API class

In [None]:
# based on https://medium.com/@aris.david/how-to-create-a-job-using-kubernetes-python-client-ed00ac2b791d
class Kubernetes:
    def __init__(self):

        # Init Kubernetes
        self.core_api = client.CoreV1Api()
        self.batch_api = client.BatchV1Api()

    def get_all_namespaces(self):
        namespaces = self.core_api.list_namespace()
        all_namespaces = []
        for ns in namespaces.items:
            all_namespaces.append(ns.metadata.name)
        return all_namespaces
            
    def create_namespace(self, namespace):

        all_namespaces = self.get_all_namespaces()

        if namespace in all_namespaces:
            logging.info(f"Namespace {namespace} already exists. Reusing.")
        else:
            namespace_metadata = client.V1ObjectMeta(name=namespace)
            self.core_api.create_namespace(
                client.V1Namespace(metadata=namespace_metadata)
            )
            logging.info(f"Created namespace {namespace}.")

        return namespace
    
    def delete_namespace(self, namespace):
        all_namespaces = self.get_all_namespaces()
        
        if namespace not in all_namespaces:
            logging.info(f"Namespace {namespace} does not exist.")
        else:
            self.core_api.delete_namespace(name=namespace)
            logging.info(f"Deleted namespace {namespace}.")

    @staticmethod
    def create_container(image, name, pull_policy, cpu_limit, mem_limit, sleep_time):

        resources = client.V1ResourceRequirements(
            requests={"cpu": cpu_limit, "memory": mem_limit},
            limits={"cpu": cpu_limit, "memory": mem_limit}
        )
            
        container = client.V1Container(
            image=image,
            name=name,
            resources=resources,
            image_pull_policy=pull_policy,
            args=[sleep_time],
            command=["sleep"],
        )

        logging.info(
            f"Created sleep container with name: {container.name}, "
            f"image: {container.image} and args: {container.args}"
        )

        return container

    @staticmethod
    def create_pod_template(namespace, pod_name, container, scheduler_name):
        labels={"pod_name": pod_name}
        # add queue and appid labels required for yunikorn 
        if scheduler_name == "yunikorn":
            labels["applicationId"] = pod_name.split("-")[-1]
            labels["queue"] = "root.tenants." + namespace
        if namespace == "backfill":
            pod_template = client.V1PodTemplateSpec(
                spec=client.V1PodSpec(restart_policy="Never", containers=[container], scheduler_name=scheduler_name, priority_class_name="backfill"),
                metadata=client.V1ObjectMeta(name=pod_name, labels=labels),
            )
        else:
            pod_template = client.V1PodTemplateSpec(
                spec=client.V1PodSpec(restart_policy="Never", containers=[container], scheduler_name=scheduler_name),
                metadata=client.V1ObjectMeta(name=pod_name, labels=labels),
            )

        return pod_template

    @staticmethod
    def create_job(job_name, pod_template):
        metadata = client.V1ObjectMeta(name=job_name, labels={"job_name": job_name})

        job = client.V1Job(
            api_version="batch/v1",
            kind="Job",
            metadata=metadata,
            #spec=client.V1JobSpec(backoff_limit=6, template=pod_template),
            spec=client.V1JobSpec(template=pod_template),
        )

        return job
    
    @staticmethod
    def get_all_pods(namespace):
        pods = core_api.list_namespaced_pod(namespace, pretty=True, timeout_seconds=60)
        print("number of pods: " + str(len(pods.items)))
        return pods
    
    @staticmethod
    def get_all_jobs(namespace):
        jobs = batch_api.list_namespaced_job(namespace, pretty=True, timeout_seconds=60)
        print("number of jobs: " + str(len(jobs.items)))
        return jobs
    
    @staticmethod
    def delete_all_jobs(namespace):
        jobs = batch_api.list_namespaced_job(namespace, pretty=True, timeout_seconds=60)
        deleteoptions = client.V1DeleteOptions()
        for job in jobs.items:
            print("Deleting job " + job.metadata.name)
            jobname = job.metadata.name
            api_response = batch_api.delete_namespaced_job(jobname,
                                                           namespace,
                                                           grace_period_seconds=0, 
                                                           propagation_policy='Background')
            logging.debug(api_response)
    
    """
        interval: time to wait/sleep between each job submission
    """
    @staticmethod
    def submit_burst(namespace, scheduler_name, cpu_limit, mem_limit, total_jobs, sleep_time):
        try:
            image = "busybox:1.36"
            container_name = "burst-sleep-" + namespace
            pull_policy = "Never"
            print("bursting", total_jobs, "sleep", sleep_time)
            burst_submitted = 0
            while burst_submitted < total_jobs:
                container = k8s.create_container(image, container_name, pull_policy, cpu_limit, mem_limit, sleep_time)

                pod_id = uuid.uuid4()
                job_id = pod_id
                # create template
                _pod_name = f"{namespace}-burst-pod-{pod_id}"
                if namespace == "backfill":
                        _pod_name = "tenant-" + _pod_name
                _pod_spec = k8s.create_pod_template(namespace, _pod_name, container, scheduler_name)

                # create job
                _job_name = f"{namespace}-burst-{job_id}"
                if namespace == "backfill":
                        _job_name = "tenant-" + _job_name
                _job = k8s.create_job(_job_name, _pod_spec)

                # execute job
                batch_api = client.BatchV1Api()
                batch_api.create_namespaced_job(namespace, _job)
                burst_submitted = burst_submitted + 1
                print("burst jobs", burst_submitted)
        except Exception as e:
            print(e)
            
    @staticmethod
    def submit_workflow(namespace, scheduler_name, cpu_limit, mem_limit, total_jobs, sleep_time, bursts, interval, experiment):
        try:
            image = "busybox:1.36"
            container_name = "sleep-" + namespace
            pull_policy = "Never"
            execution_time = 0
            jobs_submitted = 0
            # some logic to delay the start of tenant3 jobs
            if namespace == "tenant3":
                time.sleep(60)
                execution_time = 60
                if experiment == "E5":
                    time.sleep(30)
                    execution_time += 30
            while jobs_submitted < total_jobs:
                for burst in bursts:
                    if execution_time == burst[0]:
                        burst_cpu_limit = burst[1]
                        burst_mem_limit = burst[2]
                        burst_total_jobs = burst[3]
                        burst_sleep_time = burst[4]
                        print("Submitting burst at " + str(execution_time))
                        k8s.submit_burst(namespace, scheduler_name, burst_cpu_limit, burst_mem_limit, burst_total_jobs, burst_sleep_time)

                if int(cpu_limit) > 0:
                    container = k8s.create_container(image, container_name, pull_policy, cpu_limit, mem_limit, sleep_time)

                    pod_id = uuid.uuid4()
                    job_id = pod_id
                    # create template
                    _pod_name = f"{namespace}-pod-{pod_id}"
                    if namespace == "backfill":
                        _pod_name = "tenant-" + _pod_name
                    _pod_spec = k8s.create_pod_template(namespace, _pod_name, container, scheduler_name)

                    # create job
                    _job_name = f"{namespace}-{job_id}"
                    if namespace == "backfill":
                        _job_name = "tenant-" + _job_name
                    _job = k8s.create_job(_job_name, _pod_spec)

                    # execute job
                    batch_api = client.BatchV1Api()
                    batch_api.create_namespaced_job(namespace, _job)
                jobs_submitted = jobs_submitted + 1
                execution_time = execution_time + interval
                time.sleep(interval)
        except Exception as e:
            print(e)

## Experiment class

In [None]:
from multiprocess import Pool
import pandas as pd
import pprint
import csv
import time


class Experiment():
    def __init__(self, experiment_name, experiment_number, namespaces, scheduler_name, params):
        self.experiment_name = experiment_name
        self.experiment_number = experiment_number
        self.namespaces = namespaces
        self.scheduler_name = scheduler_name
        self.params = params
        self.exp_data = {}
        self.k8s = Kubernetes()
    
    def calculate_total_completed(self):
        total = 0
        for param in self.params:
            cores = int(param[1])
            num_jobs = param[3]
            tenant_total = cores * num_jobs
            bursts = param[5]
            burst_total = 0
            for burst in bursts:
                burst_cores = int(burst[1])
                burst_num_jobs = burst[3]
                burst_total = burst_total + (burst_cores * burst_num_jobs)
            total += tenant_total
            total += burst_total
        return total

    def submit_parallel_workflows(self, params):
        try:
            namespace  = params[0]
            cpu_limit  = params[1]
            mem_limit  = params[2]
            num_jobs   = params[3]
            sleep_time = params[4]
            bursts     = params[5]
            interval   = params[6]
            experiment = params[7]
            self.k8s.submit_workflow(namespace, self.scheduler_name, cpu_limit, mem_limit, num_jobs, sleep_time, bursts, interval, experiment)
        except Exception as e:
            print(e)
    
    def start(self):
        # yunikorn needs a restart due to some bug?
        if self.scheduler_name == "yunikorn":
            api_response = apps_api.patch_namespaced_deployment_scale("yunikorn-scheduler", "yunikorn", {"spec": {"replicas": 0}})
            api_response = apps_api.patch_namespaced_deployment_scale("yunikorn-scheduler", "yunikorn", {"spec": {"replicas": 1}})
            time.sleep(2)
        try:
            # submit multithreaded workflows, one for each tenant
            p = Pool(len(self.params))
            result = p.map_async(self.submit_parallel_workflows, self.params)
            return result
        except Exception as e:
            print(e)
    
    def cleanup_all_namespaces(self):
        namespaces = ["tenant1", "tenant2", "tenant3", "tenant4", "backfill"]

        # clean each namespace of any leftover jobs
        for namespace in namespaces:
            #k8s.create_namespace(namespace)
            self.k8s.delete_all_jobs(namespace)
            #k8s.delete_namespace(namespace)

        # wait until all the pods are deleted before starting run
        cleaned_up = False
        while not cleaned_up:
            print("waiting for pod clean up...")
            all_pods = core_api.list_pod_for_all_namespaces()
            all_tenant_pods = [ pod for pod in all_pods.items if "tenant" in pod.metadata.name]
            if not all_tenant_pods:
                print("pods are cleaned up!")
                cleaned_up = True
            time.sleep(1)

                # returns total pending/running/completed cores across all namespaces
    def get_totals(self, namespaces):
        total_running = 0
        total_pending = 0
        total_completed = 0
        total_preempted = 0
        for namespace in namespaces:
            if not self.exp_data[namespace+'_pending'] or not self.exp_data[namespace+'_running'] or not self.exp_data[namespace+'_completed'] or not self.exp_data[namespace+'_preempted']:
                return 0, 0, 0, 0
            total_pending += self.exp_data[namespace+'_pending'][-1] 
            total_running += self.exp_data[namespace+'_running'][-1]
            total_completed += self.exp_data[namespace+'_completed'][-1]
            total_preempted += self.exp_data[namespace+'_preempted'][-1]

        return total_pending, total_running, total_completed, total_preempted

    def monitor(self):
        try:
            completed_target = self.calculate_total_completed()
            # initialize data dictionary
            self.exp_data['timestamp'] = [] # empty array for timestamps

            for namespace in self.namespaces:
                self.exp_data[namespace+'_pending'] = [] # empty array for each namespace's pending jobs
                self.exp_data[namespace+'_running'] = [] # empty array for each namespace's running jobs
                self.exp_data[namespace+'_completed'] = [] # empty array for each namespace's completed jobs
                self.exp_data[namespace+'_preempted'] = [] # empty array for each namespace's preempted jobs

            finished = False
            timestamp = round(time.time())
            while not finished:
                #if timestamp == 60:
                    #finished = True
                # get pods from all namespaces
                all_pods = core_api.list_pod_for_all_namespaces()
                # get jobs from all namespaces
                all_jobs = batch_api.list_job_for_all_namespaces()

                # filter for pods with "tenant" in the name
                all_tenant_pods = [ pod for pod in all_pods.items if "tenant" in pod.metadata.name]
                all_tenant_jobs = [ job for job in all_jobs.items if "tenant" in job.metadata.name]

                # insert the timestamp
                #exp_data['timestamp'].append(int(time.time())) # epoch time
                _time = round(time.time()) - timestamp
                self.exp_data['timestamp'].append(_time)

                # iterate through namespaces and collect info on pending/running/completed jobs
                for namespace in self.namespaces:
                    # get pods for the tenant of current namespace
                    tenant_pods = [ pod for pod in all_tenant_pods if namespace in pod.metadata.name]
                    tenant_jobs = [ job for job in all_tenant_jobs if namespace in job.metadata.name]

                    running_cores = 0
                    pending_cores = 0
                    completed_cores = 0
                    preempted_cores = 0

                    # loop through pods 
                    for pod in tenant_pods:
                        cores = int(pod.spec.containers[0].resources.limits['cpu'])
                        if pod.status.phase == "Pending":
                            pending_cores = pending_cores + cores
                        elif pod.status.phase == "Running":
                            running_cores = running_cores + cores
                        elif pod.status.phase == "Succeeded":
                            completed_cores = completed_cores + cores

                    # check for pending jobs - jobs that are unable to submit a pod due to quota limits
                    for job in tenant_jobs:
                        has_pod = False
                        job_name = job.metadata.name
                        for pod in tenant_pods:
                            if job_name in pod.metadata.name:
                                has_pod = True
                        job_cores = int(job.spec.template.spec.containers[0].resources.limits['cpu'])
                        if not has_pod and job.status.active is None and job.status.terminating is None and job.status.succeeded is None and job.status.completion_time is None and job.status.ready == 0 and job.status.conditions is None :
                            pending_cores = pending_cores + job_cores
                        if job.status.failed is not None:
                            preempted_cores += job_cores

                    self.exp_data[namespace+'_pending'].append(pending_cores)
                    self.exp_data[namespace+'_running'].append(running_cores)
                    self.exp_data[namespace+'_completed'].append(completed_cores)
                    self.exp_data[namespace+'_preempted'].append(preempted_cores)


                # check to see if there are still any running or pending jobs
                total_pending, total_running, total_completed, total_preempted = self.get_totals(self.namespaces)
                print("time", round(time.time()), "timestamp", _time, "pending", total_pending, "running", total_running, "completed", total_completed, "preempted", total_preempted)
                #if total_pending == 0 and total_running == 0 and total_completed > 0:
                if total_completed >= completed_target:
                    finished = True
                #timestamp += 1
                time.sleep(1)
            print("all done!")
            return self.exp_data
        except Exception as e:
            print(e)
            
    def graph(self):

        results_location = "results" + os.path.sep + self.experiment_name.split("-")[0] + os.path.sep + self.experiment_name + "-" + self.scheduler_name + "-run" + self.experiment_number
        exp_full_dataframe = pd.DataFrame(self.exp_data)
        exp_full_dataframe.to_csv(results_location + "-rawdata.csv", index=False)

        exp_data_copy = self.exp_data.copy()
        # for graph, remove completed pods
        for namespace in self.namespaces:
            self.exp_data.pop(namespace+'_completed')

        exp_dataframe = pd.DataFrame(self.exp_data)

        color = ["blue", "blue", "blue", "green", "green", "green", "red", "red", "red", "orange", "orange", "orange"]
        style = ["--",   "-",    ".-",   "--",    "-",     ".-",    "--",  "-",   ".-",  "--",     "-",      ".-"]

        plot = exp_dataframe.plot(x="timestamp", figsize=(20,10), color=color, style=style, fontsize="14", lw=2)
        plot.set_xlabel("Run time (seconds)", fontsize=16)
        plot.set_ylabel("Number of Cores", fontsize=16)
        #plot.legend(fontsize=14, bbox_to_anchor=(1.19, 0.6), loc="center right")
        plot.legend(fontsize=14)

        figure = plot.figure
        #figure.tight_layout()
        figure.savefig(results_location + "-graph-preemption.png")

        # for graph, remove completed pods
        for namespace in self.namespaces:
            self.exp_data.pop(namespace+'_preempted')

        exp_dataframe = pd.DataFrame(self.exp_data)

        color = ["blue", "blue", "green", "green", "red", "red", "orange", "orange"]
        style = ["--",   "-",    "--",    "-",     "--",  "-",   "--",     "-"]

        plot = exp_dataframe.plot(x="timestamp", figsize=(20,10), color=color, style=style, fontsize="14", lw=2)
        plot.set_xlabel("Run time (seconds)", fontsize=16)
        plot.set_ylabel("Number of Cores", fontsize=16)
        plot.legend(fontsize=14)
        #plot.legend(fontsize=14, bbox_to_anchor=(1.19, 0.6), loc="center right")

        figure = plot.figure
        figure.savefig(results_location + "-graph.png")
        
    def save_results(self):
        all_pods = core_api.list_pod_for_all_namespaces()
        all_jobs = batch_api.list_job_for_all_namespaces()
        # filter for pods with "tenant" in the name
        all_tenant_pods = [ pod for pod in all_pods.items if "tenant" in pod.metadata.name]
        all_tenant_jobs = [ job for job in all_jobs.items if "tenant" in job.metadata.name]   

        # set up output csv for dependent variables
        results_location = "results" + os.path.sep + self.experiment_name.split("-")[0] + os.path.sep + self.experiment_name + "-" + self.scheduler_name + "-run" + self.experiment_number
        output_csv = results_location + "-depvars.csv"
        output_fields = ["experiment", "run", "tenant", "total cores", "total queue time", "avg queue time", "total job run time", "total queue + job run time", "workflow run time (makespan)"]
        output_rows = []

        for namespace in self.namespaces:
            tenant_pods = [ pod for pod in all_tenant_pods if namespace in pod.metadata.name]
            tenant_jobs = [ job for job in all_tenant_jobs if namespace in job.metadata.name]

            total_cores = 0
            total_queue_time = 0
            total_run_time = 0
            total_time = 0

            # start/end used to calculate makespan
            start = None
            end = None
            makespan = 0
            for pod in tenant_pods:
                cores = int(pod.spec.containers[0].resources.limits['cpu'])
                total_cores += cores
                pod_schedule_time = pod.metadata.creation_timestamp
                if namespace == "backfill":
                    job_name = pod.metadata.name[:-5]
                else:
                    job_name = pod.metadata.name[:-6]
                for job in tenant_jobs:
                    if job.metadata.name == job_name:
                        schedule_time = job.metadata.creation_timestamp
                #pod_start_time = pod.status.start_time
                pod_start_time = pod.status.container_statuses[0].state.terminated.started_at
                pod_end_time = pod.status.container_statuses[0].state.terminated.finished_at
                pod_queue_time = pod_start_time - schedule_time
                pod_run_time = pod_end_time - pod_start_time
                pod_total_time = pod_end_time - schedule_time
                total_queue_time = total_queue_time + int(pod_queue_time.total_seconds())
                total_run_time = total_run_time + int(pod_run_time.total_seconds())
                total_time = total_time + int(pod_total_time.total_seconds())
                # calculate makespan
                if not start or schedule_time < start:
                    start = schedule_time
                if not end or pod_end_time > end:
                    end = pod_end_time
            average_queue_time = round(total_queue_time / len(tenant_pods))
            print(namespace, "Total queue time:", total_queue_time)
            print(namespace, "Average queue time:", average_queue_time)
            print(namespace, "Total run time for all jobs:", total_run_time)
            print(namespace, "Total queue + run time for all jobs:", total_time)
            makespan = round((end - start).total_seconds())
            print(namespace, "Total workflow run time (makespan):", makespan)
            output_rows.append([self.experiment_name, self.experiment_number, namespace, total_cores, total_queue_time, average_queue_time, total_run_time, total_time, makespan])

        with open(output_csv, 'w') as csvfile:
            csvwriter = csv.writer(csvfile)
            csvwriter.writerow(output_fields)
            csvwriter.writerows(output_rows)
        
        

In [None]:
k8s = Kubernetes()

In [None]:
def run_experiment(experiment_name, namespaces, params, schedulers):
    for scheduler in schedulers:
        for experiment_number in range(1,6):
            exp = Experiment(experiment_name, str(experiment_number), namespaces, scheduler, params)
            completed_target = exp.calculate_total_completed()
            logging.info("Starting experiment " + experiment_name + " run " + str(experiment_number) + " with " + str(completed_target) + " total cores requested using " + scheduler)
            exp.cleanup_all_namespaces()
            exp.start()
            exp.monitor()
            exp.graph()
            exp.save_results()

## Section 3.3.1

In [None]:
!kubectl apply -f quotas/tenant1-quota.yaml
!kubectl apply -f quotas/tenant2-quota.yaml
!kubectl apply -f quotas/tenant3-quota.yaml

In [None]:
experiment_name = "E2"
namespaces = ["tenant1", "tenant2", "tenant3"]
schedulers = ["default-scheduler"]
#schedulers = ["scheduler-plugins-scheduler"]
#schedulers = ["yunikorn"]


# [start_time, cpu_limit, mem_limit, num_jobs, sleep_time]
# cms
bursts_tenant1 = [
    [0,  "4", "8G", 96, "180"],
    [90, "4", "8G", 96, "180"]
]

# ag
bursts_tenant2 = [
    [30,  "4", "8G", 32, "120"],
    [120, "4", "8G", 32, "120"]
] 

# ds
bursts_tenant3 = [
    [150, "2", "4G", 96, "60"]

]

# use "0" for cpu_limit to not submit jobs, only bursts
params = [
    ["tenant1", "0", "4G", 100, "60", bursts_tenant1, 1, experiment_name],
    ["tenant2", "0", "4G", 130, "60", bursts_tenant2, 1, experiment_name],
    ["tenant3", "2", "4G", 25, "60", bursts_tenant3, 5, experiment_name],
]

run_experiment(experiment_name, namespaces, params, schedulers)

In [None]:
!kubectl delete -f quotas/tenant1-quota.yaml
!kubectl delete -f quotas/tenant2-quota.yaml
!kubectl delete -f quotas/tenant3-quota.yaml

## Section 3.3.2

In [None]:
experiment_name = "E4"
namespaces = ["tenant1", "tenant2", "tenant3"]
schedulers = ["default-scheduler"]
#schedulers = ["scheduler-plugins-scheduler"]
#schedulers = ["yunikorn"]

# [start_time, cpu_limit, mem_limit, num_jobs, sleep_time]
# cms
bursts_tenant1 = [
    [0,  "4", "8G", 96, "180"],
    [90, "4", "8G", 96, "180"]
]

# ag
bursts_tenant2 = [
    [30,  "4", "8G", 32, "120"],
    [120, "4", "8G", 32, "120"]
] 

# ds
bursts_tenant3 = [
    [150, "2", "4G", 96, "60"]

]

# use "0" for cpu_limit to not submit jobs, only bursts
params = [
    ["tenant1", "0", "4G", 100, "60", bursts_tenant1, 1, experiment_name],
    ["tenant2", "0", "4G", 130, "60", bursts_tenant2, 1, experiment_name],
    ["tenant3", "2", "4G", 25, "60", bursts_tenant3, 5, experiment_name],
]

run_experiment(experiment_name, namespaces, params, schedulers)
