Skip to content

Commit

Permalink
trim down ewms_sidecar.py
Browse files Browse the repository at this point in the history
  • Loading branch information
ric-evans committed Dec 19, 2023
1 parent 44335c2 commit 85512e9
Showing 1 changed file with 26 additions and 92 deletions.
118 changes: 26 additions & 92 deletions ewms_sidecar/ewms_sidecar.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""The central module."""
"""The EWMS Sidecar."""


import argparse
Expand All @@ -7,14 +7,14 @@

from wipac_dev_tools import argparse_tools, logging_tools

from . import condor, k8s
from . import condor
from .config import ENV, LOGGER


def main() -> None:
"""Main."""
parser = argparse.ArgumentParser(
description="Manage Skymap Scanner client workers",
description="Handle EWMS requests beside a Skymap Scanner central server",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)

Expand All @@ -24,32 +24,8 @@ def main() -> None:
help="the uuid for the cluster",
)

# orchestrator
orch_subparsers = parser.add_subparsers(
required=True,
dest="orchestrator",
help="the resource orchestration tool to use for worker scheduling",
)
OrchestratorArgs.condor(
orch_condor_parser := orch_subparsers.add_parser(
"condor", help="orchestrate with HTCondor"
)
)
OrchestratorArgs.k8s(
orch_k8s_parser := orch_subparsers.add_parser(
"k8s", help="orchestrate with Kubernetes"
)
)

# action -- add sub-parser to each sub-parser (can't add multiple sub-parsers)
for p in [orch_condor_parser, orch_k8s_parser]:
act_subparsers = p.add_subparsers(
required=True,
dest="action",
help="the action to perform on the worker cluster",
)
ActionArgs.starter(act_subparsers.add_parser("start", help="start workers"))
ActionArgs.stopper(act_subparsers.add_parser("stop", help="stop workers"))
SidecarArgs.condor(parser)
SidecarArgs.starter(parser)

# parse args & set up logging
args = parser.parse_args()
Expand All @@ -63,60 +39,27 @@ def main() -> None:
logging_tools.log_argparse_args(args, logger=LOGGER, level="WARNING")

# Go!
match args.orchestrator:
case "condor":
condor.act(args)
case "k8s":
k8s.act(args)
case other:
raise RuntimeError(f"Orchestrator not supported: {other}")
condor.act(args)


class OrchestratorArgs:
class SidecarArgs:
@staticmethod
def condor(sub_parser: argparse.ArgumentParser) -> None:
"""Add args to subparser."""
sub_parser.add_argument(
def condor(parser: argparse.ArgumentParser) -> None:
"""Add args to parser."""
parser.add_argument(
"--collector",
default="",
help="the full URL address of the HTCondor collector server. Ex: foo-bar.icecube.wisc.edu",
)
sub_parser.add_argument(
parser.add_argument(
"--schedd",
default="",
help="the full DNS name of the HTCondor Schedd server. Ex: baz.icecube.wisc.edu",
)

@staticmethod
def k8s(sub_parser: argparse.ArgumentParser) -> None:
"""Add args to subparser."""
sub_parser.add_argument(
"--host",
required=True,
help="the host server address to connect to for running workers",
)
sub_parser.add_argument(
"--namespace",
required=True,
help="the k8s namespace to use for running workers",
)
sub_parser.add_argument(
"--cpu-arch",
default="x64",
help="which CPU architecture to use for running workers",
)
sub_parser.add_argument(
"--job-config-stub",
type=Path,
default=Path("resources/worker_k8s_job_stub.json"),
help="worker k8s job config file to dynamically complete, then run (json)",
)


class ActionArgs:
@staticmethod
def starter(sub_parser: argparse.ArgumentParser) -> None:
"""Add args to subparser."""
def starter(parser: argparse.ArgumentParser) -> None:
"""Add args to parser."""

def wait_for_file(waitee: Path, wait_time: int) -> Path:
"""Wait for `waitee` to exist, then return fullly-resolved path."""
Expand All @@ -133,58 +76,58 @@ def wait_for_file(waitee: Path, wait_time: int) -> Path:
return waitee.resolve()

# helper args
sub_parser.add_argument(
parser.add_argument(
"--dryrun",
default=False,
action="store_true",
help="does everything except submitting the worker(s)",
)
sub_parser.add_argument(
parser.add_argument(
"--spool",
default=False,
action="store_true",
help="whether to spool (persist) logs -- if not given, logs are not kept",
)

# worker args
sub_parser.add_argument(
parser.add_argument(
"--worker-memory-bytes",
required=True,
type=int,
help="amount of worker memory (bytes)",
)
sub_parser.add_argument(
parser.add_argument(
"--worker-disk-bytes",
required=True,
type=int,
help="amount of worker disk (bytes)",
)
sub_parser.add_argument(
parser.add_argument(
"--n-cores",
default=1,
type=int,
help="number of cores per worker",
)
sub_parser.add_argument(
parser.add_argument(
"--n-workers",
required=True,
type=int,
help="number of worker to start",
)
sub_parser.add_argument(
parser.add_argument(
"--max-worker-runtime",
required=True,
type=int,
help="how long each worker is allowed to run -- condor only", # TODO - set for k8s?
help="how long each worker is allowed to run",
)
sub_parser.add_argument(
parser.add_argument(
"--priority",
required=True,
help="relative priority of this job/jobs -- condor only", # TODO - set for k8s?
help="relative priority of this job/jobs",
)

# client args
sub_parser.add_argument(
parser.add_argument(
"--client-args",
required=False,
nargs="*",
Expand All @@ -195,25 +138,16 @@ def wait_for_file(waitee: Path, wait_time: int) -> Path:
),
help="n 'key:value' pairs containing the python CL arguments to pass to skymap_scanner.client",
)
sub_parser.add_argument(
parser.add_argument(
"--client-startup-json",
help="The 'startup.json' file to startup each client",
type=lambda x: wait_for_file(
Path(x),
ENV.CLIENT_STARTER_WAIT_FOR_STARTUP_JSON,
),
)
sub_parser.add_argument(
parser.add_argument(
"--image",
required=True,
help="a path or url to the workers' image",
)

@staticmethod
def stopper(sub_parser: argparse.ArgumentParser) -> None:
"""Add args to subparser."""
sub_parser.add_argument(
"--cluster-id",
required=True,
help="the cluster id of the workers to be stopped/removed",
)

0 comments on commit 85512e9

Please sign in to comment.