Skip to content

Commit

Permalink
Feat/non wave scheduler (#633)
Browse files Browse the repository at this point in the history
* feat: code complete on task_topological_generations_without_scheduler.  Getting 200 tasks complete per min with 50 workers on macbook pro- 4 tasks per second.
* fixing spoke execution mode using reporting role
  • Loading branch information
eamonnfaherty committed Feb 11, 2023
1 parent a283eb0 commit bd2330e
Show file tree
Hide file tree
Showing 28 changed files with 1,011 additions and 956 deletions.
3 changes: 3 additions & 0 deletions docs/tips.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
## export all env vars from a codebuild build id in your local
aws codebuild batch-get-builds --ids servicecatalog-puppet-deploy-in-spoke:0568a289-ebb6-4189-9bc0-5d6a4ff4879d | jq -r '.builds[0].environment.environmentVariables[]| "export \(.name)=\"\(.value)\""'

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

[tool.poetry]
name = "aws-service-catalog-puppet"
version = "0.217.0"
version = "0.218.0"
description = "Making it easier to deploy ServiceCatalog products"
classifiers = ["Development Status :: 5 - Production/Stable", "Intended Audience :: Developers", "Programming Language :: Python :: 3", "License :: OSI Approved :: Apache Software License", "Operating System :: OS Independent", "Natural Language :: English"]
homepage = "https://service-catalog-tools-workshop.com/"
Expand Down
4 changes: 4 additions & 0 deletions servicecatalog_puppet/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,10 @@ def setup_config(
] = remote_config.get_scheduler_threads_or_processes(
puppet_account_id_to_use, home_region
)
if not os.environ.get(environmental_variables.SCHEDULER_ALGORITHM):
os.environ[
environmental_variables.SCHEDULER_ALGORITHM
] = remote_config.get_scheduler_algorithm(puppet_account_id_to_use, home_region)


@cli.command()
Expand Down
5 changes: 3 additions & 2 deletions servicecatalog_puppet/commands/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@ def cli(info, info_line_numbers):

if info:
logging.basicConfig(
format="%(levelname)s %(threadName)s %(message)s", level=logging.INFO
format="%(levelname)s %(process)s %(threadName)s %(message)s",
level=logging.INFO,
)
if info_line_numbers:
logging.basicConfig(
format="%(asctime)s %(levelname)s %(pid)s %(threadName)s [%(filename)s:%(lineno)d] %(message)s",
format="%(asctime)s %(levelname)s %(process)s %(threadName)s [%(filename)s:%(lineno)d] %(message)s",
datefmt="%Y-%m-%d:%H:%M:%S",
level=logging.INFO,
)
Expand Down
22 changes: 21 additions & 1 deletion servicecatalog_puppet/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
# SPDX-License-Identifier: Apache-2.0

import functools
import json
from servicecatalog_puppet import serialisation_utils
import logging
import os
Expand Down Expand Up @@ -268,5 +267,26 @@ def get_scheduler_threads_or_processes():
)


def get_scheduler_algorithm():
return os.environ.get(
environmental_variables.SCHEDULER_ALGORITHM,
constants.SCHEDULER_ALGORITHM_DEFAULT,
)


def get_reporting_role_arn(puppet_account_id):
return get_role_arn(puppet_account_id, constants.REPORTING_ROLE_NAME)


def get_spoke_scheduler_threads_or_processes():
return os.environ.get(
environmental_variables.SPOKE_SCHEDULER_THREADS_OR_PROCESSES,
constants.SPOKE_SCHEDULER_THREADS_OR_PROCESSES_DEFAULT,
)


def get_spoke_scheduler_algorithm():
return os.environ.get(
environmental_variables.SPOKE_SCHEDULER_ALGORITHM,
constants.SPOKE_SCHEDULER_ALGORITHM_DEFAULT,
)
5 changes: 5 additions & 0 deletions servicecatalog_puppet/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,3 +385,8 @@
SHARE_PRINCIPALS_DEFAULT = False

DESCRIBE_PORTFOLIO_SHARES = "describe-portfolio-shares"


SCHEDULER_ALGORITHM_DEFAULT = "topological_generations"
SPOKE_SCHEDULER_THREADS_OR_PROCESSES_DEFAULT = SCHEDULER_THREADS_OR_PROCESSES_DEFAULT
SPOKE_SCHEDULER_ALGORITHM_DEFAULT = SCHEDULER_ALGORITHM_DEFAULT
3 changes: 3 additions & 0 deletions servicecatalog_puppet/environmental_variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,7 @@
ON_COMPLETE_URL = "SCT_ON_COMPLETE_URL"
SPOKE_EXECUTION_MODE_DEPLOY_ENV = "SCT_SPOKE_EXECUTION_MODE_DEPLOY_ENV"
SCHEDULER_THREADS_OR_PROCESSES = "SCT_SCHEDULER_THREADS_OR_PROCESSES"
SCHEDULER_ALGORITHM = "SCT_SCHEDULER_ALGORITHM"
SPOKE_SCHEDULER_THREADS_OR_PROCESSES = "SCT_SPOKE_SCHEDULER_THREADS_OR_PROCESSES"
SPOKE_SCHEDULER_ALGORITHM = "SCT_SPOKE_SCHEDULER_ALGORITHM"
GLOBAL_SHARE_PRINCIPALS = "SCT_GLOBAL_SHARE_PRINCIPALS"
9 changes: 9 additions & 0 deletions servicecatalog_puppet/remote_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,3 +199,12 @@ def get_scheduler_threads_or_processes(puppet_account_id, default_region):
"scheduler_threads_or_processes",
constants.SCHEDULER_THREADS_OR_PROCESSES_DEFAULT,
)


def get_scheduler_algorithm(puppet_account_id, default_region):
logger.info(
"getting scheduler_algorithm, default_region: {}".format(default_region)
)
return get_config(puppet_account_id, default_region).get(
"scheduler_algorithm", constants.SCHEDULER_ALGORITHM_DEFAULT,
)
8 changes: 8 additions & 0 deletions servicecatalog_puppet/waluigi/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
COMPLETED = "COMPLETED"
NOT_SET = "NOT_SET"
ERRORED = "ERRORED"
QUEUE_STATUS = "QUEUE_STATUS"
IN_PROGRESS = "IN_PROGRESS"
RESOURCES_REQUIRED = "resources_required"

CONTROL_EVENT__COMPLETE = "CONTROL_EVENT__COMPLETE"
64 changes: 64 additions & 0 deletions servicecatalog_puppet/waluigi/dag_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import logging

import networkx as nx

from servicecatalog_puppet import constants
from servicecatalog_puppet.waluigi.constants import (
COMPLETED,
NOT_SET,
ERRORED,
QUEUE_STATUS,
)

logger = logging.getLogger(constants.PUPPET_SCHEDULER_LOGGER_NAME)


def build_the_dag(tasks_to_run: dict):
g = nx.DiGraph()
#print("-- BUILDING THE DAG!!!")
for uid, task in tasks_to_run.items():
g.add_nodes_from(
[(uid, task),]
)
for duid in task.get("dependencies_by_reference", []):
if tasks_to_run.get(duid):
g.add_edge(uid, duid)
else:
logger.debug(
f"{duid} is not in the task reference - this is fine when running in spoke execution mode and when the task was executed within the hub"
)

for uid, task in tasks_to_run.items():
if task.get(QUEUE_STATUS, NOT_SET) == COMPLETED:
try:
g.remove_node(uid)
except nx.exception.NetworkXError as e:
pass

elif task.get(QUEUE_STATUS, NOT_SET) == ERRORED:
# print(
# f"looking at task {uid} with status {task.get(QUEUE_STATUS, NOT_SET)}"
# )
for n in nx.ancestors(g, uid):
try:
g.remove_node(n)
except nx.exception.NetworkXError as e:
pass
try:
g.remove_node(uid)
except nx.exception.NetworkXError as e:
pass

return g


def make_readable_in_codebuild_logs(input):
numbers = "zero one two three four five six seven eight nine".split()
numbers.extend("ten eleven twelve thirteen fourteen fifteen sixteen".split())
numbers.extend("seventeen eighteen nineteen".split())
numbers.extend(
tens if ones == "zero" else (tens + "-" + ones)
for tens in "twenty thirty forty fifty sixty seventy eighty ninety".split()
for ones in numbers[0:10]
)
return numbers[input]
Empty file.
46 changes: 46 additions & 0 deletions servicecatalog_puppet/waluigi/locks/external.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from servicecatalog_puppet import serialisation_utils
from servicecatalog_puppet.waluigi.constants import RESOURCES_REQUIRED
from servicecatalog_puppet.waluigi.dag_utils import logger


def are_resources_are_free_for_task(task_parameters: dict, resources_file_path: str):
with open(resources_file_path, "rb") as f:
resources_in_use = serialisation_utils.json_loads(f.read())
return are_resources_are_free_for_task_dict(task_parameters, resources_in_use)


def are_resources_are_free_for_task_dict(task_parameters, resources_in_use):
return (
all(
resources_in_use.get(r, False) is False
for r in task_parameters.get(RESOURCES_REQUIRED, [])
),
resources_in_use,
)


def lock_resources_for_task(
task_reference: str,
task_parameters: dict,
resources_in_use: dict,
resources_file_path: str,
):
#print(f"Worker locking {task_reference}")
for r in task_parameters.get(RESOURCES_REQUIRED, []):
resources_in_use[r] = task_reference
with open(resources_file_path, "wb") as f:
f.write(serialisation_utils.json_dumps(resources_in_use))


def unlock_resources_for_task(task_parameters: dict, resources_file_path: str):
with open(resources_file_path, "rb") as f:
resources_in_use = serialisation_utils.json_loads(f.read())
for r in task_parameters.get(RESOURCES_REQUIRED, []):
try:
del resources_in_use[r]
except KeyError:
logger.warn(
f"{task_parameters.get('task_reference')} tried to unlock {r} but it wasn't present"
)
with open(resources_file_path, "wb") as f:
f.write(serialisation_utils.json_dumps(resources_in_use))
167 changes: 167 additions & 0 deletions servicecatalog_puppet/waluigi/processes/runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
import multiprocessing
import os
import time

import networkx as nx

from servicecatalog_puppet.waluigi.constants import CONTROL_EVENT__COMPLETE
from servicecatalog_puppet.waluigi.dag_utils import (
logger,
build_the_dag,
make_readable_in_codebuild_logs,
)
from servicecatalog_puppet.waluigi.shared_tasks import task_processing_time
from servicecatalog_puppet.waluigi.shared_tasks import task_trace
from servicecatalog_puppet.waluigi.shared_tasks.task_topological_generations_with_scheduler import (
scheduler_task as task_topological_generations_with_scheduler_scheduler_task,
)
from servicecatalog_puppet.waluigi.shared_tasks.workers.worker_requiring_scheduler import (
worker_task as worker_requiring_scheduler_worker_task,
)
from servicecatalog_puppet.waluigi.shared_tasks.task_topological_generations_without_scheduler import (
worker_task as task_topological_generations_without_scheduler_worker_task,
)

QUEUE_REFILL_SLEEP_DURATION = 1


def get_tasks(scheduling_algorithm):
if scheduling_algorithm == "topological_generations":
return (
worker_requiring_scheduler_worker_task,
task_topological_generations_with_scheduler_scheduler_task,
)
if scheduling_algorithm == "topological_generations_without_scheduler":
return task_topological_generations_without_scheduler_worker_task, None
raise ValueError(f"Unsupported scheduling_algorithm: {scheduling_algorithm}")


def run(
num_workers,
tasks_reference,
manifest_files_path,
manifest_task_reference_file_path,
puppet_account_id,
execution_mode,
scheduling_algorithm,
):
logger.info(
f"Executing {len(tasks_reference.keys())} tasks with {make_readable_in_codebuild_logs(num_workers)} processes in {execution_mode} with scheduling_algorithm {scheduling_algorithm}!"
)

manager = multiprocessing.Manager()
all_tasks = manager.dict(tasks_reference)
resources = manager.dict()
tasks_to_run = manager.list()

dag = build_the_dag(tasks_reference)
generations = list(nx.topological_generations(dag))
if not generations:
raise ValueError("No tasks to run")
while generations:
tasks_to_run.extend(list(generations.pop()))

resources_file_path = f"{manifest_files_path}/resources.json"
start = time.time()
os.environ["SCT_START_TIME"] = str(start)

with open(resources_file_path, "w") as f:
f.write("{}")

QueueKlass = multiprocessing.Queue
EventKlass = multiprocessing.Event
ExecutorKlass = multiprocessing.Process
LockKlass = multiprocessing.Lock

lock = LockKlass()
task_queue = QueueKlass()
results_queue = QueueKlass()
control_event = None
task_processing_time_queue = QueueKlass()
task_trace_queue = QueueKlass()
control_queue = QueueKlass()
complete_event = EventKlass()

worker_task_args = (
lock,
task_queue,
results_queue,
task_processing_time_queue,
task_trace_queue,
control_queue,
control_event,
manifest_files_path,
manifest_task_reference_file_path,
puppet_account_id,
resources_file_path,
all_tasks,
resources,
tasks_to_run,
)
scheduler_task_args = (
num_workers,
task_queue,
results_queue,
control_queue,
control_event,
QUEUE_REFILL_SLEEP_DURATION,
all_tasks,
tasks_to_run,
)
task_processing_time_args = (
task_processing_time_queue,
complete_event,
execution_mode,
)
task_trace_args = (
task_trace_queue,
complete_event,
puppet_account_id,
execution_mode,
)

worker_task_to_use, scheduler_task_to_use = get_tasks(scheduling_algorithm)

processes = [
ExecutorKlass(
name=f"worker#{i}",
target=worker_task_to_use,
args=(str(i),) + worker_task_args,
)
for i in range(num_workers)
]
scheduler_thread = None

if scheduler_task_to_use:
scheduler_thread = ExecutorKlass(
name="scheduler", target=scheduler_task_to_use, args=scheduler_task_args,
)
on_task_processing_time_thread = ExecutorKlass(
name="on_task_processing_time",
target=task_processing_time.on_task_processing_time_task,
args=task_processing_time_args,
)
on_task_trace_thread = ExecutorKlass(
name="on_task_trace", target=task_trace.on_task_trace, args=task_trace_args,
)

on_task_processing_time_thread.start()
on_task_trace_thread.start()

for process in processes:
process.start()
if scheduler_thread:
scheduler_thread.start()
while True:
logger.info("Waiting for shutdown message")
message = control_queue.get()
if message == CONTROL_EVENT__COMPLETE:
logger.info(f"Got the {message}, starting shutdown process")
break

for process in processes:
process.terminate()
time.sleep(10)
on_task_processing_time_thread.terminate()
on_task_trace_thread.terminate()
logger.info(f"Time taken = {time.time() - start:.10f}")

0 comments on commit bd2330e

Please sign in to comment.