From 0666202c223c8ecc1318ee03bee4fe719c9bc4b1 Mon Sep 17 00:00:00 2001 From: Zac Schulwolf Date: Sat, 12 Dec 2020 21:54:06 +0000 Subject: [PATCH] Fix formatting and Add support for colored msg_prefix --- src/popper/log.py | 11 +- src/popper/runner.py | 4 +- src/popper/runner_host.py | 33 ++- src/popper/runner_kubernetes.py | 495 -------------------------------- src/popper/runner_slurm.py | 8 +- 5 files changed, 36 insertions(+), 515 deletions(-) mode change 100644 => 100755 src/popper/log.py mode change 100644 => 100755 src/popper/runner.py mode change 100644 => 100755 src/popper/runner_host.py delete mode 100644 src/popper/runner_kubernetes.py mode change 100644 => 100755 src/popper/runner_slurm.py diff --git a/src/popper/log.py b/src/popper/log.py old mode 100644 new mode 100755 index e248e8d5c..4db5b5e50 --- a/src/popper/log.py +++ b/src/popper/log.py @@ -55,6 +55,7 @@ class PopperFormatter(logging.Formatter): def __init__(self, colors=True): super(PopperFormatter, self).__init__(fmt="%(levelname)s: %(msg)s") self.log_fmt = self.log_format if colors else self.log_format_no_colors + self.colors = colors def format(self, record): """ @@ -72,7 +73,11 @@ def format(self, record): self._fmt = fmt else: self._style._fmt = fmt - result = f"{msg_prefix}{logging.Formatter.format(self, record)}" + + if self.colors: + result = f"{self.BOLD_CYAN}{msg_prefix}{self.RESET}{logging.Formatter.format(self, record)}" + else: + result = f"{msg_prefix}{logging.Formatter.format(self, record)}" return result @@ -183,8 +188,8 @@ def filter(self, record): bool : True/False according to values of pass levels and level number of the record. """ - if not hasattr(record, 'pretag'): - record.pretag = "" + if not hasattr(record, "pretag"): + record.pretag = "" if self.reject: return record.levelno not in self.passlevels else: diff --git a/src/popper/runner.py b/src/popper/runner.py old mode 100644 new mode 100755 index 87eb58864..31a26a67d --- a/src/popper/runner.py +++ b/src/popper/runner.py @@ -126,12 +126,12 @@ def _clone_repos(self, wf): continue if not infoed: - log.info("Cloning step repositories", extra={'pretag':"[popper]"}) + log.info("Cloning step repositories", extra={"pretag": "[popper]"}) infoed = True if f"{user}/{repo}" in cloned: continue - log.info(f"- {url}/{user}/{repo}@{version}", extra={'pretag':"[popper]"}) + log.info(f"- {url}/{user}/{repo}@{version}", extra={"pretag": "[popper]"}) scm.clone(url, user, repo, repo_dir, version) cloned.add(f"{user}/{repo}") diff --git a/src/popper/runner_host.py b/src/popper/runner_host.py old mode 100644 new mode 100755 index 612ac1eef..27d477189 --- a/src/popper/runner_host.py +++ b/src/popper/runner_host.py @@ -40,7 +40,7 @@ def run(self, step): raise AttributeError("Expecting 'runs' attribute in step.") cmd = step.runs + tuple(step.args) - log.info(f"{cmd}", extra={'pretag':f"[{step.id}]"}) + log.info(f"{cmd}", extra={"pretag": f"[{step.id}]"}) if self._config.dry_run: return 0 @@ -146,7 +146,7 @@ def run(self, step): if not container and not self._config.reuse: container = self._create_container(cid, step) - log.info("docker start", extra={'pretag':f"[{step.id}]"}) + log.info("docker start", extra={"pretag": f"[{step.id}]"}) if self._config.dry_run: return 0 @@ -177,7 +177,10 @@ def _create_container(self, cid, step): build, _, img, tag, build_ctx_path = self._get_build_info(step) if build: - log.info(f"docker build {img}:{tag} {build_ctx_path}", extra={'pretag':f"[{step.id}]"}) + log.info( + f"docker build {img}:{tag} {build_ctx_path}", + extra={"pretag": f"[{step.id}]"}, + ) if not self._config.dry_run: streamer = self._d.api.build( decode=True, path=build_ctx_path, tag=f"{img}:{tag}", rm=True, @@ -191,7 +194,7 @@ def _create_container(self, cid, step): log.step_info(line.strip()) elif not self._config.skip_pull and not step.skip_pull: - log.info(f"docker pull {img}:{tag}", extra={'pretag':f"[{step.id}]"}) + log.info(f"docker pull {img}:{tag}", extra={"pretag": f"[{step.id}]"}) if not self._config.dry_run: self._d.images.pull(repository=f"{img}:{tag}") @@ -215,7 +218,7 @@ def _create_container(self, cid, step): msg += f' entrypoint={container_args["entrypoint"]}' if container_args["command"]: msg += f' command={container_args["command"]}' - log.info(msg, extra={'pretag':f"[{step.id}]"}) + log.info(msg, extra={"pretag": f"[{step.id}]"}) container = self._d.containers.create(**container_args) @@ -272,7 +275,7 @@ def run(self, step): if not container and not self._config.reuse: container = self._create_container(cid, step) - log.info("podman start", extra={'pretag':f"[{step.id}]"}) + log.info("podman start", extra={"pretag": f"[{step.id}]"}) if self._config.dry_run: return 0 @@ -309,7 +312,10 @@ def _create_container(self, cid, step): build, _, img, tag, build_ctx_path = self._get_build_info(step) if build: - log.info(f"podman build {img}:{tag} {build_ctx_path}", extra={'pretag':f"[{step.id}]"}) + log.info( + f"podman build {img}:{tag} {build_ctx_path}", + extra={"pretag": f"[{step.id}]"}, + ) if not self._config.dry_run: cmd = [ "podman", @@ -322,7 +328,7 @@ def _create_container(self, cid, step): ] HostRunner._exec_cmd(cmd) elif not self._config.skip_pull and not step.skip_pull: - log.info(f"podman pull {img}:{tag}", extra={'pretag':f"[{step.id}]"}) + log.info(f"podman pull {img}:{tag}", extra={"pretag": f"[{step.id}]"}) if not self._config.dry_run: cmd = ["podman", "pull", f"{img}:{tag}"] HostRunner._exec_cmd(cmd, logging=False) @@ -517,11 +523,16 @@ def _create_container(self, step, cid): build_ctx_path = None if build: - log.info(f"singularity build {cid} {build_ctx_path}", extra={'pretag':f"[{step.id}]"}) + log.info( + f"singularity build {cid} {build_ctx_path}", + extra={"pretag": f"[{step.id}]"}, + ) if not self._config.dry_run: self._build_from_recipe(build_ctx_path, self._singularity_cache, cid) elif not self._config.skip_pull and not step.skip_pull: - log.info(f"singularity pull {cid} {image}", extra={'pretag':f"[{step.id}]"}) + log.info( + f"singularity pull {cid} {image}", extra={"pretag": f"[{step.id}]"} + ) if not self._config.dry_run: self._s.pull(image=image, name=cid, pull_folder=self._singularity_cache) @@ -545,7 +556,7 @@ def _singularity_start(self, step, cid): commands = args start_fn = self._s.run - log.info(info, extra={'pretag':f"[{step.id}]"}) + log.info(info, extra={"pretag": f"[{step.id}]"}) if self._config.dry_run: return 0 diff --git a/src/popper/runner_kubernetes.py b/src/popper/runner_kubernetes.py deleted file mode 100644 index d6054bef4..000000000 --- a/src/popper/runner_kubernetes.py +++ /dev/null @@ -1,495 +0,0 @@ -import os -import base64 -import time -import tarfile - -from kubernetes import config, client -from kubernetes.client import Configuration, V1DeleteOptions -from kubernetes.client.rest import ApiException -from kubernetes.client.api import core_v1_api -from kubernetes.stream import stream - -from popper import utils as pu -from popper.cli import log as log -from popper.runner import StepRunner as StepRunner -from popper.runner_host import DockerRunner as HostDockerRunner - - -class KubernetesRunner(StepRunner): - """Runs steps on a kubernetes cluster.""" - - def __init__(self, **kw): - super(KubernetesRunner, self).__init__(**kw) - - config.load_kube_config() - - c = Configuration() - c.assert_hostname = False - Configuration.set_default(c) - self._kclient = core_v1_api.CoreV1Api() - - config.list_kube_config_contexts() - - self._namespace = self._config.resman_opts.get("namespace", "default") - - self._base_pod_name = pu.sanitized_name(f"pod", self._config.wid) - self._base_pod_name = self._base_pod_name.replace("_", "-") - - self._init_pod_name = pu.sanitized_name("init-pod", self._config.wid) - self._init_pod_name = self._init_pod_name.replace("_", "-") - - self._vol_claim_name = f"{self._base_pod_name}-pvc" - self._vol_size = self._config.resman_opts.get("volume_size", "500Mi") - - self._init_pod_created = False - self._vol_claim_created = False - - def __exit__(self, exc_type, exc_value, exc_traceback): - self._kclient.api_client.rest_client.pool_manager.clear() - self._kclient.api_client.close() - super(KubernetesRunner, self).__exit__(exc_type, exc_value, exc_traceback) - return True - - def run(self, step): - """Execute a step in a kubernetes cluster.""" - self._pod_name = self._base_pod_name + f"-{step.id}" - - needs_build, _, img, tag, _ = self._get_build_info(step) - - if needs_build: - log.fail(f"Cannot build ") - - image = f"{img}:{tag}" - - m = f"[{step.id}] kubernetes run {self._namespace}.{self._pod_name}" - log.info(m) - - if self._config.dry_run: - return 0 - - ecode = 1 - try: - if not self._vol_claim_created: - if not self._vol_claim_exists(): - self._vol_claim_create() - self._vol_claim_created = True - - if not self._init_pod_created: - e, self._pod_host_node = self._init_pod_schedule() - if e: - raise Exception("None of the nodes are schedulable.") - self._copy_ctx() - self._init_pod_delete() - self._init_pod_created = True - - self._pod_create(step, image, self._pod_host_node) - self._pod_read_log() - ecode = self._pod_exit_code() - except Exception as e: - log.fail(e) - finally: - self._pod_delete() - - log.debug(f"returning with {ecode}") - return ecode - - def stop_running_tasks(self): - """Delete the Pod and then the PersistentVolumeClaim upon receiving SIGINT. - """ - log.debug("received SIGINT. deleting pod and volume claim") - self._pod_delete() - - def _init_pod_schedule(self): - """If a node selector is not provided, select a node randomly - and stick to it.""" - e = 0 - pod_host_node = None - - if self._config.resman_opts.get("pod_host_node", None): - e = self._init_pod_create(self._config.resman_opts.pod_host_node) - pod_host_node = self._config.resman_opts.pod_host_node - - elif not self._config.resman_opts.get("persistent_volume_name", None): - nodes = [ - node.metadata.labels["kubernetes.io/hostname"] - for node in self._kclient.list_node().items - ] - for node in nodes: - log.debug(f"trying to schedule init pod on {node}") - e = self._init_pod_create(node) - if not e: - pod_host_node = node - break - else: - self._init_pod_delete() - else: - e = self._init_pod_create() - pod_host_node = None - - return e, pod_host_node - - def _copy_ctx(self): - """Tar up the workspace context and copy the tar file into - the PersistentVolume in the Pod. - """ - source_dir = os.path.join(self._config.cache_dir, "kubernetes") - if not os.path.exists(source_dir): - os.makedirs(source_dir) - - target_file = "ctx_" + str(self._config.wid) + ".tar.gz" - source_file = os.path.join(source_dir, target_file) - destination_file = "/workspace/" + target_file - - files = os.listdir(self._config.workspace_dir) - - with tarfile.open(source_file, mode="w:gz") as archive: - for f in files: - archive.add(f) - - exec_command = ["/bin/sh"] - response = stream( - self._kclient.connect_get_namespaced_pod_exec, - self._init_pod_name, - self._namespace, - command=exec_command, - stderr=True, - stdin=True, - stdout=True, - tty=False, - _preload_content=False, - ) - - with open(source_file, "rb") as image_file: - encoded_string = base64.b64encode(image_file.read()) - - commands = [ - f"echo {encoded_string.decode('utf-8')} | base64 --decode > {destination_file}" - ] - - while response.is_open(): - response.update(timeout=1) - if response.peek_stdout(): - log.debug(f"stdout: {response.read_stdout()}") - if response.peek_stderr(): - log.debug(f"stderr: {response.read_stderr()}") - if commands: - c = commands.pop(0) - log.debug(f"running command... {c}") - response.write_stdin(c) - else: - break - response.close() - - # extract the archive inside the pod - exec_command = ["tar", "-zxvf", destination_file] - response = stream( - self._kclient.connect_get_namespaced_pod_exec, - self._init_pod_name, - self._namespace, - command=exec_command, - stderr=True, - stdin=False, - stdout=True, - tty=False, - ) - - log.debug(response) - - def _init_pod_create(self, pod_host_node=None): - """Create a init Pod mounted on a volume with alpine image so that - the `tar` utility is available by default and the workflow context - can be copied from the local machine into the volume. - """ - ws_vol_mount = f"{self._init_pod_name}-ws" - init_pod_conf = { - "apiVersion": "v1", - "kind": "Pod", - "metadata": {"name": self._init_pod_name}, - "spec": { - "restartPolicy": "Never", - "containers": [ - { - "image": "debian:stable", - "name": self._init_pod_name, - "workingDir": "/workspace", - "command": ["sleep", "infinity"], - "volumeMounts": [ - {"name": ws_vol_mount, "mountPath": "/workspace",} - ], - } - ], - "volumes": [ - { - "name": ws_vol_mount, - "persistentVolumeClaim": {"claimName": self._vol_claim_name,}, - } - ], - }, - } - - if pod_host_node: - init_pod_conf["spec"]["nodeSelector"] = { - "kubernetes.io/hostname": pod_host_node - } - - self._kclient.create_namespaced_pod( - body=init_pod_conf, namespace=self._namespace - ) - - # loop and wait for the init pod to come up - counter = 1 - while True: - response = self._kclient.read_namespaced_pod( - self._init_pod_name, namespace=self._namespace - ) - if response.status.phase != "Pending": - break - - log.debug(f"init pod {self._init_pod_name} not started yet") - - if counter == self._config.resman_opts.get("pod_retry_limit", 60): - return 1 - - time.sleep(1) - counter += 1 - - return 0 - - def _init_pod_delete(self): - """Teardown the init Pod after the context has been copied - into the volume. - """ - log.debug(f"deleting init pod {self._init_pod_name}") - self._kclient.delete_namespaced_pod( - self._init_pod_name, namespace=self._namespace, body=V1DeleteOptions() - ) - - def _vol_create(self, volume_name): - """Create a default PersistentVolume of hostPath type. - """ - hostpathvol_path = "/tmp" - hostpathvol_size = "1Gi" - if self._config.resman_opts.get("hostpathvol_path", None): - hostpathvol_path = self._config.resman_opts.hostpathvol_path - if self._config.resman_opts.get("hostpathvol_size", None): - hostpathvol_size = self._config.resman_opts.hostpathvol_size - vol_conf = { - "kind": "PersistentVolume", - "apiVersion": "v1", - "metadata": {"name": volume_name, "labels": {"type": "host"}}, - "spec": { - "persistentVolumeReclaimPolicy": "Recycle", - "storageClassName": "manual", - "capacity": {"storage": hostpathvol_size,}, - "accessModes": ["ReadWriteMany"], - "hostPath": {"path": hostpathvol_path}, - }, - } - - self._kclient.create_persistent_volume(body=vol_conf) - - counter = 1 - while True: - response = self._kclient.read_persistent_volume(volume_name) - if response.status.phase != "Pending": - break - - log.debug(f"volume {volume_name} not created yet") - - if counter == 60: - raise Exception("Timed out waiting for PersistentVolume creation") - - time.sleep(1) - counter += 1 - - def _vol_claim_create(self): - """Create a PersistentVolumeClaim to claim usable storage space - from a previously created PersistentVolume. - """ - if self._config.resman_opts.get("persistent_volume_name", None): - volume_name = self._config.resman_opts.persistent_volume_name - else: - volume_name = f"pv-hostpath-popper-{self._config.wid}" - if not self._vol_exists(volume_name): - self._vol_create(volume_name) - - vol_claim_conf = { - "apiVersion": "v1", - "kind": "PersistentVolumeClaim", - "metadata": {"name": self._vol_claim_name}, - "spec": { - "storageClassName": "manual", - "accessModes": ["ReadWriteMany"], - "resources": {"requests": {"storage": self._vol_size}}, - "volumeName": volume_name, - }, - } - - self._kclient.create_namespaced_persistent_volume_claim( - namespace=self._namespace, body=vol_claim_conf - ) - - # wait for the volume claim to go into `Bound` state. - counter = 1 - while True: - response = self._kclient.read_namespaced_persistent_volume_claim( - self._vol_claim_name, namespace=self._namespace - ) - if response.status.phase != "Pending": - break - - log.debug(f"volume claim {self._vol_claim_name} not created yet") - - if counter == 60: - raise Exception("Timed out waiting for PersistentVolumeClaim creation") - - time.sleep(1) - counter += 1 - - def _vol_exists(self, volume_name): - vol_exists = False - try: - self._kclient.read_persistent_volume(volume_name) - vol_exists = True - except ApiException as e: - if e.reason != "Not Found": - raise Exception(e) - - return vol_exists - - def _vol_claim_exists(self): - vol_claim_exists = False - try: - self._kclient.read_namespaced_persistent_volume_claim( - self._vol_claim_name, namespace=self._namespace - ) - vol_claim_exists = True - except ApiException as e: - if e.reason != "Not Found": - raise Exception(e) - - return vol_claim_exists - - def _vol_claim_delete(self): - """Delete the PersistentVolumeClaim. - """ - log.debug(f"deleting volume claim {self._vol_claim_name}") - self._kclient.delete_namespaced_persistent_volume_claim( - self._vol_claim_name, namespace=self._namespace, body=V1DeleteOptions() - ) - - def _pod_create(self, step, image, pod_host_node=None): - """Start a Pod for each step. - """ - log.debug(f"trying to start step pod on {pod_host_node}") - env = self._prepare_environment(step) - log.debug(env) - - ws_vol_mount = f"{self._pod_name}-ws" - pod_conf = { - "apiVersion": "v1", - "kind": "Pod", - "metadata": {"name": self._pod_name}, - "spec": { - "restartPolicy": "Never", - "containers": [ - { - "image": image, - "name": f"{step.id}", - "workingDir": "/workspace", - "volumeMounts": [ - {"name": ws_vol_mount, "mountPath": "/workspace",} - ], - } - ], - "volumes": [ - { - "name": ws_vol_mount, - "persistentVolumeClaim": {"claimName": self._vol_claim_name,}, - } - ], - }, - } - - if len(env.keys()) > 0: - pod_conf["spec"]["containers"][0]["env"] = [] - for name, value in env.items(): - pod_conf["spec"]["containers"][0]["env"].append( - {"name": name, "value": value} - ) - - if pod_host_node: - pod_conf["spec"]["nodeSelector"] = {"kubernetes.io/hostname": pod_host_node} - - runs = list(step.runs) if step.runs else None - args = list(step.args) if step.args else None - - if runs: - pod_conf["spec"]["containers"][0]["command"] = runs - - if args: - pod_conf["spec"]["containers"][0]["args"] = args - - self._kclient.create_namespaced_pod(body=pod_conf, namespace=self._namespace) - - counter = 1 - while True: - response = self._kclient.read_namespaced_pod( - self._pod_name, namespace=self._namespace - ) - if response.status.phase != "Pending": - break - - log.debug(f"pod {self._pod_name} not started yet") - - if counter == self._config.resman_opts.get("pod_retry_limit", 60): - raise Exception("Timed out waiting for Pod to start") - - time.sleep(1) - counter += 1 - - def _pod_read_log(self): - """Read logs from the Pod after it moves into `Running` state. - """ - log.debug(f"reading logs from {self._pod_name}") - response = self._kclient.read_namespaced_pod_log( - name=self._pod_name, - namespace=self._namespace, - follow=True, - tail_lines=10, - _preload_content=False, - ) - for line in response: - log.step_info(line.decode().rstrip()) - - def _pod_exit_code(self): - """Read the exit code from the Pod to decide the exit code of the step. - """ - time.sleep(2) - response = self._kclient.read_namespaced_pod( - name=self._pod_name, namespace=self._namespace - ) - log.debug(f"got status {response.status.phase}") - if response.status.phase != "Succeeded": - return 1 - return 0 - - def _pod_delete(self): - """Delete the Pod after it has Completed or Failed. - """ - log.debug(f"deleting pod {self._pod_name}") - self._kclient.delete_namespaced_pod( - self._pod_name, namespace=self._namespace, body=V1DeleteOptions() - ) - - -class DockerRunner(KubernetesRunner, HostDockerRunner): - """Runs steps on kubernetes; builds images locally using docker. - """ - - def __init__(self, **kw): - super(DockerRunner, self).__init__(**kw) - - def __exit__(self, exc_type, exc_value, exc_traceback): - super(DockerRunner, self).__exit__(exc_type, exc_value, exc_traceback) - return True diff --git a/src/popper/runner_slurm.py b/src/popper/runner_slurm.py old mode 100644 new mode 100755 index 2b0f6146c..60eefae1d --- a/src/popper/runner_slurm.py +++ b/src/popper/runner_slurm.py @@ -167,24 +167,24 @@ def run(self, step): if build: recipefile = self._get_recipe_file(build_ctx_path, cid) - log.info(f"srun singularity build {self._container}", extra={'pretag':f"[{step.id}]"}) + log.info(f"srun singularity build {self._container}", extra={"pretag": f"[{step.id}]"}) self._exec_srun( ["singularity", "build", "--fakeroot", self._container, recipefile,], step, cwd=os.path.dirname(recipefile), ) else: - log.info(f"srun singularity pull {self._container}", extra={'pretag':f"[{step.id}]"}) + log.info(f"srun singularity pull {self._container}", extra={"pretag": f"[{step.id}]"}) self._exec_srun(["singularity", "pull", self._container, img], step) cmd = self._create_cmd(step, cid) self._spawned_containers.add(cid) if self._config.resman_opts.get(step.id, {}).get("mpi", True): - log.info(f"sbatch {" ".join(cmd)}", extra={'pretag':f"[{step.id}]"}) + log.info(f"sbatch {" ".join(cmd)}", extra={"pretag": f"[{step.id}]"}) ecode = self._exec_mpi(cmd, step) else: - log.info(f"srun {" ".join(cmd)}", extra={'pretag':f"[{step.id}]"}) + log.info(f"srun {" ".join(cmd)}", extra={"pretag": f"[{step.id}]"}) ecode = self._exec_srun(cmd, step, logging=True) self._spawned_containers.remove(cid)