Skip to content

Commit

Permalink
Recomputing plugin with plugin_blocked trigger (#1184)
Browse files Browse the repository at this point in the history
* Enable streaming for plugins_blocked

* Setting plugins_blocked to trigger aggregation

* Adding processing for plugins without version

* Fixing formatting

* Adding aggregator tests
  • Loading branch information
manasaV3 committed Aug 2, 2023
1 parent d7d5e5a commit 8242248
Show file tree
Hide file tree
Showing 11 changed files with 704 additions and 222 deletions.
7 changes: 7 additions & 0 deletions .happy/terraform/modules/ecs-stack/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,12 @@ resource aws_lambda_event_source_mapping data_workflow_plugin_metadata_event_sou
maximum_batching_window_in_seconds = 60
}

resource aws_lambda_event_source_mapping data_workflow_plugin_blocked_event_source_mapping {
event_source_arn = module.plugin_blocked_dynamodb_table.stream_arn
function_name = module.data_workflows_lambda.function_name
starting_position = "LATEST"
}

module api_gateway_proxy_stage {
source = "../api-gateway-proxy-stage"
lambda_function_name = local.backend_function_name
Expand Down Expand Up @@ -585,6 +591,7 @@ data aws_iam_policy_document data_workflows_policy {
"dynamodb:ListStreams",
]
resources = [
module.plugin_blocked_dynamodb_table.stream_arn,
module.plugin_metadata_dynamodb_table.stream_arn,
]
}
Expand Down
19 changes: 12 additions & 7 deletions data-workflows/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,21 @@
)
logger = logging.getLogger(__name__)


def _categories_processor(event: dict) -> None:
categories.processor.seed_s3_categories_workflow(
event.get("version"), event.get("categories_path")
)


EVENT_TYPE_BY_PROCESSOR = {
"activity": lambda event: activity.processor.update_activity(),
"seed-s3-categories": lambda event: categories.processor.seed_s3_categories_workflow(
event.get("version"), event.get("categories_path")
),
"seed-s3-categories": _categories_processor,
"plugin": lambda event: plugin.processor.update_plugin(),
}


def handle_sqs_message(body: str) -> None:
def _handle_sqs_message(body: str) -> None:
logger.info(f"Received message with body: {body}")
event = json.loads(body)
event_type = event.get("type", "").lower()
Expand All @@ -38,15 +43,15 @@ def _get_plugin_version(dynamodb_dict: dict) -> tuple:
keys = dynamodb_dict.get("Keys", {})
name = keys.get("name", {}).get("S")
version_type = keys.get("version_type", {}).get("S")
version = version_type[0:version_type.rfind(":")]
version = version_type[0 : version_type.rfind(":")] if version_type else None
return name, version


def handle(event: dict, context) -> None:
def handle(event: dict, _) -> None:
updated_plugins = set()
for record in event.get("Records", []):
if "body" in record:
handle_sqs_message(record.get("body"))
_handle_sqs_message(record.get("body"))
elif "dynamodb" in record:
dynamodb = record.get("dynamodb")
updated_plugins.add(_get_plugin_version(dynamodb))
Expand Down
85 changes: 48 additions & 37 deletions data-workflows/plugin/aggregator.py
Original file line number Diff line number Diff line change
@@ -1,89 +1,100 @@
import logging
from typing import Any, Optional

from nhcommons.models import plugin_metadata
from nhcommons.models.plugin import put_plugin
from nhcommons.models.plugin_utils import PluginMetadataType, PluginVisibility
from nhcommons.models.plugins_blocked import get_all_blocked_plugins
from nhcommons.models import plugins_blocked, plugin, plugin_metadata
from plugin.manifest import get_formatted_manifest

PLUGIN_FIELDS = {
"authors", "code_repository", "display_name", "first_released",
"release_date", "summary",
"authors",
"code_repository",
"display_name",
"first_released",
"release_date",
"summary",
}

logger = logging.getLogger(__name__)

def aggregate_plugins(updated_plugins: set[tuple[str, str]]) -> None:
blocked_plugins = get_all_blocked_plugins()
for plugin, version in updated_plugins:
metadata_by_type = _get_metadata_by_type(plugin, version)

def aggregate_plugins(updated_plugins: set[tuple[str, Optional[str]]]) -> None:
blocked_plugins = plugins_blocked.get_all_blocked_plugins()
for name, version in updated_plugins:
version = _get_latest_version(name, version)
if not version:
logger.warning(
f"Unable to resolve version for plugin={name} version={version}"
)
continue
metadata_by_type = _get_metadata_by_type(name, version)
if PluginMetadataType.METADATA not in metadata_by_type:
continue

aggregate = _generate_aggregate(metadata_by_type, plugin, version)
aggregate = _generate_aggregate(metadata_by_type, name, version)
if not aggregate:
continue

record = _generate_record(
plugin, metadata_by_type, aggregate, blocked_plugins
)
put_plugin(plugin, version, record)
record = _generate_record(name, metadata_by_type, aggregate, blocked_plugins)
plugin.put_plugin(name, version, record)


def _get_latest_version(name: str, version: Optional[str]):
return version if version else plugin.get_latest_version(name)


def _get_data_from_metadata(
metadata_by_type: dict[PluginMetadataType, dict],
plugin_metadata_type: PluginMetadataType,
default_value: Optional[dict]
metadata_by_type: dict[PluginMetadataType, dict],
plugin_metadata_type: PluginMetadataType,
default_value: Optional[dict],
) -> Optional[dict[str, Any]]:
if plugin_metadata_type not in metadata_by_type:
return default_value
return metadata_by_type.get(plugin_metadata_type).get("data")
return metadata_by_type.get(plugin_metadata_type).get("data", default_value)


def _generate_aggregate(metadata_by_type: dict[PluginMetadataType, dict],
plugin: str,
version: str) -> dict[str, Any]:
def _generate_aggregate(
metadata_by_type: dict[PluginMetadataType, dict], name: str, version: str
) -> dict[str, Any]:
metadata = _get_data_from_metadata(
metadata_by_type, PluginMetadataType.METADATA, {}
)
manifest = _get_data_from_metadata(
metadata_by_type, PluginMetadataType.DISTRIBUTION, None
)
distribution = get_formatted_manifest(manifest, plugin, version)
return {**metadata, **distribution}
formatted_manifest = get_formatted_manifest(manifest, name, version)
return {**metadata, **formatted_manifest}


def _get_metadata_by_type(plugin: str, version: str) -> \
dict[PluginMetadataType, dict]:
return {record["type"]: record
for record in plugin_metadata.query(plugin, version)}
def _get_metadata_by_type(name: str, version: str) -> dict[PluginMetadataType, dict]:
return {record["type"]: record for record in plugin_metadata.query(name, version)}


def _generate_record(plugin: str,
metadata_by_type: dict[PluginMetadataType, dict],
aggregate: dict[str, Any],
blocked_plugins: set[str]) -> dict[str, Any]:
def _generate_record(
name: str,
metadata_by_type: dict[PluginMetadataType, dict],
aggregate: dict[str, Any],
blocked_plugins: set[str],
) -> dict[str, Any]:
plugin_record = {"data": aggregate}
for field in PLUGIN_FIELDS:
value = aggregate.get(field)
if value:
plugin_record[field] = value

visibility = _get_visibility(plugin, aggregate, blocked_plugins)
visibility = _get_visibility(name, aggregate, blocked_plugins)
plugin_record["visibility"] = visibility.name

if metadata_by_type.get(PluginMetadataType.PYPI, {}).get("is_latest"):
plugin_record["is_latest"] = 'true'
plugin_record["is_latest"] = "true"
if visibility != PluginVisibility.PUBLIC:
plugin_record["excluded"] = visibility.name

return plugin_record


def _get_visibility(plugin, aggregate, blocked_plugins) -> PluginVisibility:
if plugin in blocked_plugins:
def _get_visibility(name, aggregate, blocked_plugins) -> PluginVisibility:
if name in blocked_plugins:
return PluginVisibility.BLOCKED
elif not aggregate:
return PluginVisibility.INVALID

visibility = aggregate.get("visibility", "").upper()
if visibility in PluginVisibility:
Expand Down
Loading

0 comments on commit 8242248

Please sign in to comment.