Skip to content

Commit

Permalink
adding sleep to ease memory usage (#582)
Browse files Browse the repository at this point in the history
* adding sleep to ease memory usage
* preparing for release
  • Loading branch information
eamonnfaherty committed Oct 25, 2022
1 parent ba9d300 commit 1244e2a
Show file tree
Hide file tree
Showing 11 changed files with 80 additions and 37 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@ coverage.xml
ignored.html
*.html
state1.json
.fleet
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

[tool.poetry]
name = "aws-service-catalog-puppet"
version = "0.200.0"
version = "0.201.0"
description = "Making it easier to deploy ServiceCatalog products"
classifiers = ["Development Status :: 5 - Production/Stable", "Intended Audience :: Developers", "Programming Language :: Python :: 3", "License :: OSI Approved :: Apache Software License", "Operating System :: OS Independent", "Natural Language :: English"]
homepage = "https://service-catalog-tools-workshop.com/"
Expand Down
21 changes: 18 additions & 3 deletions servicecatalog_puppet/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -621,9 +621,24 @@ def deploy_from_task_reference(
@click.option("--single-account", default=None)
@click.option("--home-region", default=None)
@click.option("--regions", default="")
@click.option("--should-collect-cloudformation-events", default=None, type=bool, envvar="SHOULD_COLLECT_CLOUDFORMATION_EVENTS")
@click.option("--should-forward-events-to-eventbridge", default=None, type=bool, envvar="SHOULD_FORWARD_EVENTS_TO_EVENTBRIDGE")
@click.option("--should-forward-failures-to-opscenter", default=None, type=bool, envvar="SHOULD_FORWARD_FAILURES_TO_OPSCENTER")
@click.option(
"--should-collect-cloudformation-events",
default=None,
type=bool,
envvar="SHOULD_COLLECT_CLOUDFORMATION_EVENTS",
)
@click.option(
"--should-forward-events-to-eventbridge",
default=None,
type=bool,
envvar="SHOULD_FORWARD_EVENTS_TO_EVENTBRIDGE",
)
@click.option(
"--should-forward-failures-to-opscenter",
default=None,
type=bool,
envvar="SHOULD_FORWARD_FAILURES_TO_OPSCENTER",
)
@click.option(
"--output-cache-starting-point",
default="",
Expand Down
25 changes: 21 additions & 4 deletions servicecatalog_puppet/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,21 @@ def get_executor_account_id():


def get_should_use_eventbridge():
return os.environ.get(environmental_variables.SHOULD_FORWARD_EVENTS_TO_EVENTBRIDGE, "FALSE").upper() == "TRUE"
return (
os.environ.get(
environmental_variables.SHOULD_FORWARD_EVENTS_TO_EVENTBRIDGE, "FALSE"
).upper()
== "TRUE"
)


def get_should_forward_failures_to_opscenter():
return os.environ.get(environmental_variables.SHOULD_FORWARD_FAILURES_TO_OPSCENTER, "FALSE").upper() == "TRUE"
return (
os.environ.get(
environmental_variables.SHOULD_FORWARD_FAILURES_TO_OPSCENTER, "FALSE"
).upper()
== "TRUE"
)


def get_regions():
Expand Down Expand Up @@ -227,7 +237,14 @@ def get_spoke_execution_mode_deploy_env():


def get_should_use_sns():
return os.environ.get(environmental_variables.SHOULD_USE_SNS, "FALSE").upper() == "TRUE"
return (
os.environ.get(environmental_variables.SHOULD_USE_SNS, "FALSE").upper()
== "TRUE"
)


def get_scheduler_threads_or_processes():
return os.environ.get(environmental_variables.SCHEDULER_THREADS_OR_PROCESSES, constants.SCHEDULER_THREADS_OR_PROCESSES_DEFAULT)
return os.environ.get(
environmental_variables.SCHEDULER_THREADS_OR_PROCESSES,
constants.SCHEDULER_THREADS_OR_PROCESSES_DEFAULT,
)
2 changes: 1 addition & 1 deletion servicecatalog_puppet/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,4 +371,4 @@
name: DeployInSpokeProject
"""

SCHEDULER_THREADS_OR_PROCESSES_DEFAULT = "threads"
SCHEDULER_THREADS_OR_PROCESSES_DEFAULT = "threads"
19 changes: 5 additions & 14 deletions servicecatalog_puppet/waluigi/processes/topological_generations.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,11 +230,7 @@ def worker_task(


def scheduler_task(
num_workers,
task_queue,
results_queue,
control_queue,
tasks_to_run,
num_workers, task_queue, results_queue, control_queue, tasks_to_run,
):
number_of_target_tasks_in_flight = num_workers
should_be_running = True
Expand Down Expand Up @@ -264,6 +260,7 @@ def scheduler_task(
task_to_run_reference = current_generation.pop()
logger.info(f"sending: {task_to_run_reference}")
task_queue.put(task_to_run_reference)
time.sleep(1)

# now handle a complete jobs from the workers
task_reference, result = results_queue.get()
Expand Down Expand Up @@ -299,7 +296,7 @@ def on_task_processing_time(task_processing_time_queue, complete_event):
"cloudwatch-puppethub",
) as cloudwatch:
while not complete_event.is_set():
# while True:
# while True:
time.sleep(0.1)
try:
duration, task_type, task_params = task_processing_time_queue.get(
Expand Down Expand Up @@ -353,7 +350,7 @@ def on_task_trace(task_trace_queue, complete_event, puppet_account_id):
"s3", config.get_puppet_role_arn(config.get_executor_account_id()), "s3",
) as s3:
while not complete_event.is_set():
# while True:
# while True:
time.sleep(0.1)
try:
t, task_type, task_params, is_start, thread_name = task_trace_queue.get(
Expand Down Expand Up @@ -430,13 +427,7 @@ def run(
scheduler_thread = multiprocessing.Process(
name="scheduler",
target=scheduler_task,
args=(
num_workers,
task_queue,
results_queue,
control_queue,
tasks_to_run,
),
args=(num_workers, task_queue, results_queue, control_queue, tasks_to_run,),
)
on_task_processing_time_thread = multiprocessing.Process(
name="on_task_processing_time",
Expand Down
12 changes: 8 additions & 4 deletions servicecatalog_puppet/waluigi/scheduler_factory.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
from servicecatalog_puppet.waluigi.threads import topological_generations as threads_topological_generations
from servicecatalog_puppet.waluigi.processes import topological_generations as processes_topological_generations
from servicecatalog_puppet.waluigi.threads import (
topological_generations as threads_topological_generations,
)
from servicecatalog_puppet.waluigi.processes import (
topological_generations as processes_topological_generations,
)


def get_scheduler(threads_or_processes, algorithm):
def get_scheduler(threads_or_processes: str, algorithm: str):
name = f"{threads_or_processes}.{algorithm}"
print(f"Using scheduler: {name}")
if name == "threads.topological_generations":
return threads_topological_generations
if name == "processes.topological_generations":
return processes_topological_generations

raise Exception(f"Unsupported scheduler: {name}")
raise Exception(f"Unsupported scheduler: {name}")
14 changes: 9 additions & 5 deletions servicecatalog_puppet/waluigi/threads/topological_generations.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
RESOURCES_REQUIRED = "resources_required"


def build_the_dag(tasks_to_run):
def build_the_dag(tasks_to_run: dict):
g = nx.DiGraph()
print("-- BUILDING THE DAG!!!")
for uid, task in tasks_to_run.items():
Expand Down Expand Up @@ -71,7 +71,7 @@ def build_the_dag(tasks_to_run):
return g


def are_resources_are_free_for_task(task_parameters, resources_file_path):
def are_resources_are_free_for_task(task_parameters: dict, resources_file_path: str):
with open(resources_file_path, "rb") as f:
resources_in_use = serialisation_utils.json_loads(f.read())
return (
Expand All @@ -84,7 +84,10 @@ def are_resources_are_free_for_task(task_parameters, resources_file_path):


def lock_resources_for_task(
task_reference, task_parameters, resources_in_use, resources_file_path
task_reference: str,
task_parameters: dict,
resources_in_use: dict,
resources_file_path: str,
):
print(f"Worker locking {task_reference}")
for r in task_parameters.get(RESOURCES_REQUIRED, []):
Expand All @@ -93,7 +96,7 @@ def lock_resources_for_task(
f.write(serialisation_utils.json_dumps(resources_in_use))


def unlock_resources_for_task(task_parameters, resources_file_path):
def unlock_resources_for_task(task_parameters: dict, resources_file_path: str):
with open(resources_file_path, "rb") as f:
resources_in_use = serialisation_utils.json_loads(f.read())
for r in task_parameters.get(RESOURCES_REQUIRED, []):
Expand Down Expand Up @@ -237,7 +240,7 @@ def scheduler_task(
control_event.set()
return

current_generation = list(generations[-1]) # may need to make list
current_generation = list(generations[-1])
number_of_tasks_in_flight = 0
number_of_tasks_processed = 0
number_of_tasks_in_generation = len(current_generation)
Expand All @@ -255,6 +258,7 @@ def scheduler_task(
task_to_run_reference = current_generation.pop()
logger.info(f"sending: {task_to_run_reference}")
task_queue.put(task_to_run_reference)
time.sleep(0.1)

# now handle a complete jobs from the workers
task_reference, result = results_queue.get()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ def run(self):
"type": "PLAINTEXT",
},
{"name": "HOME_REGION", "value": home_region, "type": "PLAINTEXT",},
{"name": "REGIONS", "value": ",".join(config.get_regions()), "type": "PLAINTEXT",},
{
"name": "REGIONS",
"value": ",".join(config.get_regions()),
"type": "PLAINTEXT",
},
{
"name": "SHOULD_COLLECT_CLOUDFORMATION_EVENTS",
"value": str(config.get_should_use_sns()),
Expand Down
13 changes: 10 additions & 3 deletions servicecatalog_puppet/workflow/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ def run_tasks(
shutil.unpack_archive("GetSSMParamTask.zip", ".", "zip")

threads_or_processes = config.get_scheduler_threads_or_processes()
scheduler = scheduler_factory.get_scheduler(threads_or_processes, "topological_generations")
scheduler = scheduler_factory.get_scheduler(
threads_or_processes, "topological_generations"
)
scheduler.run(
num_workers,
tasks_to_run_filtered,
Expand Down Expand Up @@ -363,11 +365,16 @@ def run_tasks(
Priority=1,
Source=constants.SERVICE_CATALOG_PUPPET_OPS_CENTER_SOURCE,
Tags=[
{"Key": "ServiceCatalogPuppet:Actor", "Value": "ops-item"},
{
"Key": "ServiceCatalogPuppet:Actor",
"Value": "ops-item",
},
],
)
except ssm_client.exceptions.OpsItemLimitExceededException:
logging.error(f"OpsItem: {title} creation failed due to OpsItemLimitExceededException. OperationalData: {operational_data}")
logging.error(
f"OpsItem: {title} creation failed due to OpsItemLimitExceededException. OperationalData: {operational_data}"
)
if "RunDeployInSpoke" in filename:
click.echo(
colorclass.Color(
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@

setup_kwargs = {
'name': 'aws-service-catalog-puppet',
'version': '0.200.0',
'version': '0.201.0',
'description': 'Making it easier to deploy ServiceCatalog products',
'long_description': '# aws-service-catalog-puppet\n\n![logo](./docs/logo.png) \n\n## Badges\n\n[![codecov](https://codecov.io/gh/awslabs/aws-service-catalog-puppet/branch/master/graph/badge.svg?token=e8M7mdsmy0)](https://codecov.io/gh/awslabs/aws-service-catalog-puppet)\n\n\n## What is it?\nThis is a python3 framework that makes it easier to share multi region AWS Service Catalog portfolios and makes it \npossible to provision products into accounts declaratively using a metadata based rules engine.\n\nWith this framework you define your accounts in a YAML file. You give each account a set of tags, a default region and \na set of enabled regions.\n\nOnce you have done this you can define portfolios should be shared with each set of accounts using the tags and you \ncan specify which regions the shares occur in.\n\nIn addition to this, you can also define products that should be provisioned into accounts using the same tag based \napproach. The framework will assume role into the target account and provision the product on your behalf.\n\n\n## Getting started\n\nYou can read the [installation how to](https://service-catalog-tools-workshop.com/30-how-tos/10-installation/30-service-catalog-puppet.html)\nor you can read through the [every day use](https://service-catalog-tools-workshop.com/30-how-tos/50-every-day-use.html)\nguides.\n\nYou can read the [documentation](https://aws-service-catalog-puppet.readthedocs.io/en/latest/) to understand the inner \nworkings. \n\n\n## Going further\n\nThe framework is one of a pair. The other is [aws-service-catalog-factory](https://github.com/awslabs/aws-service-catalog-factory).\nWith Service Catalog Factory you can create pipelines that deploy multi region portfolios very easily. \n\n## License\n\nThis library is licensed under the Apache 2.0 License. \n \n',
'author': 'Eamonn Faherty',
Expand Down

0 comments on commit 1244e2a

Please sign in to comment.