Skip to content

Commit

Permalink
Fetching Distribution and Metadata for plugins (#1108)
Browse files Browse the repository at this point in the history
* Adding plugin ETL base

* Clean up

* Handle pypi data source fetch

* clean up

* Adding tests for pypi_adapter

* Adding tests for dynamo

* Updating models

* refactor funsies

* Refactoring type hints

* Adding metadata fetch and formatting

* Adding github tokens for data-workflow

* Making tests more environment agnostic

* Clean up and optimizations

* Apply suggestions from code review

Co-authored-by: Ashley Anderson <aganders3@gmail.com>

* Refactoring

* Refactoring logging for info

* Updating requirements

* Adding more tests

* bug fixes

* Handling labels.ontology None

* Making visibility value consistent

* Clean up

* Removing aws_request_id from logging

---------

Co-authored-by: Ashley Anderson <aganders3@gmail.com>
  • Loading branch information
manasaV3 and aganders3 committed Jun 14, 2023
1 parent 9d7843e commit 8562260
Show file tree
Hide file tree
Showing 51 changed files with 1,973 additions and 108 deletions.
32 changes: 22 additions & 10 deletions .happy/terraform/modules/ecs-stack/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -336,16 +336,19 @@ module data_workflows_lambda {
}

environment = {
"SNOWFLAKE_PASSWORD" = local.snowflake_password
"SNOWFLAKE_USER" = local.snowflake_user
"STACK_NAME" = local.custom_stack_name
"BUCKET" = local.data_bucket_name
"BUCKET_PATH" = var.env == "dev" ? local.custom_stack_name : ""
"SNOWFLAKE_PASSWORD" = local.snowflake_password
"SNOWFLAKE_USER" = local.snowflake_user
"STACK_NAME" = local.custom_stack_name
"BUCKET" = local.data_bucket_name
"BUCKET_PATH" = var.env == "dev" ? local.custom_stack_name : ""
"GITHUB_CLIENT_ID" = local.github_client_id
"GITHUB_CLIENT_SECRET" = local.github_client_secret
"PLUGINS_LAMBDA_NAME" = local.plugins_function_name
}

log_retention_in_days = local.log_retention_period
timeout = 300
memory_size = 256
memory_size = 512
ephemeral_storage_size = 512
}

Expand Down Expand Up @@ -530,10 +533,7 @@ data aws_iam_policy_document backend_policy {

data aws_iam_policy_document data_workflows_policy {
statement {
actions = [
"s3:GetObject",
]

actions = ["s3:GetObject",]
resources = ["${local.data_bucket_arn}/*"]
}
statement {
Expand All @@ -552,6 +552,14 @@ data aws_iam_policy_document data_workflows_policy {
"${module.plugin_dynamodb_table.table_arn}/index/*",
]
}
statement {
actions = ["dynamodb:PutItem"]
resources = [module.plugin_metadata_dynamodb_table.table_arn]
}
statement {
actions = ["dynamodb:Scan"]
resources = [module.plugin_dynamodb_table.table_arn]
}
statement {
actions = [
"ssm:GetParameter",
Expand All @@ -567,6 +575,10 @@ data aws_iam_policy_document data_workflows_policy {
]
resources = [aws_sqs_queue.data_workflows_queue.arn]
}
statement {
actions = ["lambda:InvokeFunction"]
resources = [module.plugins_lambda.function_arn]
}
}

data aws_iam_policy_document plugins_policy {
Expand Down
6 changes: 3 additions & 3 deletions data-workflows/activity/github_activity_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@

from pynamodb.models import Model
from pynamodb.attributes import UnicodeAttribute, NumberAttribute

from utils.utils import get_current_timestamp, date_to_utc_timestamp_in_millis, datetime_to_utc_timestamp_in_millis
from nhcommons.utils.time import get_current_timestamp
from utils.utils import date_to_utc_timestamp_in_millis, datetime_to_utc_timestamp_in_millis
from plugin.helpers import _get_repo_to_plugin_dict


LOGGER = logging.getLogger()
LOGGER = logging.getLogger(__name__)
TIMESTAMP_FORMAT = "TO_TIMESTAMP('{0:%Y-%m-%d %H:%M:%S}')"


Expand Down
6 changes: 3 additions & 3 deletions data-workflows/activity/install_activity_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@

from pynamodb.models import Model
from pynamodb.attributes import UnicodeAttribute, NumberAttribute
from nhcommons.utils.time import get_current_timestamp
from utils.utils import datetime_to_utc_timestamp_in_millis

from utils.utils import get_current_timestamp, datetime_to_utc_timestamp_in_millis

LOGGER = logging.getLogger()
LOGGER = logging.getLogger(__name__)


class InstallActivityType(Enum):
Expand Down
2 changes: 1 addition & 1 deletion data-workflows/activity/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from utils.utils import ParameterStoreAdapter
import nhcommons

LOGGER = logging.getLogger()
LOGGER = logging.getLogger(__name__)


def _fetch_install_data_and_write_to_dynamo(
Expand Down
2 changes: 1 addition & 1 deletion data-workflows/activity/snowflake_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from activity.install_activity_model import InstallActivityType
from activity.github_activity_model import GitHubActivityType

LOGGER = logging.getLogger()
LOGGER = logging.getLogger(__name__)
TIMESTAMP_FORMAT = "TO_TIMESTAMP('{0:%Y-%m-%d %H:%M:%S}')"


Expand Down
1 change: 0 additions & 1 deletion data-workflows/activity/tests/test_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ def test_update_install_activity_with_no_new_updates(self, monkeypatch):
self._setup_snowflake_response(monkeypatch, [])

from activity.processor import update_activity

update_activity()

assert self._install_transform_and_write_mock.call_count == 0
Expand Down
4 changes: 2 additions & 2 deletions data-workflows/categories/category_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@

from pynamodb.attributes import ListAttribute, NumberAttribute, UnicodeAttribute
from pynamodb.models import Model
from utils.utils import get_current_timestamp
from nhcommons.utils.time import get_current_timestamp

STACK_NAME = os.getenv("STACK_NAME", "local")

LOGGER = logging.getLogger()
LOGGER = logging.getLogger(__name__)


class CategoryModel(Model):
Expand Down
2 changes: 1 addition & 1 deletion data-workflows/categories/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

STACK_NAME = os.getenv("STACK_NAME", "local")

LOGGER = logging.getLogger()
LOGGER = logging.getLogger(__name__)


def seed_s3_categories_workflow(version: str, categories_path: str):
Expand Down
14 changes: 7 additions & 7 deletions data-workflows/conftest.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@

import boto3
import os
import pytest

from pynamodb.models import Model


@pytest.fixture(scope="module")
def aws_credentials():
"""Mocked AWS Credentials for moto."""
os.environ["AWS_ACCESS_KEY_ID"] = "testing"
os.environ["AWS_SECRET_ACCESS_KEY"] = "testing"
os.environ["AWS_SECURITY_TOKEN"] = "testing"
os.environ["AWS_SESSION_TOKEN"] = "testing"
os.environ["AWS_DEFAULT_REGION"] = "us-east-1"
monkeypatch = pytest.MonkeyPatch()
monkeypatch.setenv("AWS_ACCESS_KEY_ID", "testing")
monkeypatch.setenv("AWS_SECRET_ACCESS_KEY", "testing")
monkeypatch.setenv("AWS_SECURITY_TOKEN", "testing")
monkeypatch.setenv("AWS_SESSION_TOKEN", "testing")
monkeypatch.setenv("AWS_DEFAULT_REGION", "us-east-1")


def create_dynamo_table(pynamo_ddb_model: Model, table_name: str):
Expand Down
33 changes: 20 additions & 13 deletions data-workflows/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,23 @@

import activity.processor
import categories.processor
import plugin.processor

logging.basicConfig(
level="INFO",
style="{",
format="[{levelname}] {asctime} {threadName} {name}.{funcName} {message}",
force=True,
)
logger = logging.getLogger(__name__)

LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)
EVENT_TYPE_BY_PROCESSOR = {
"activity": lambda event: activity.processor.update_activity(),
"seed-s3-categories": lambda event: categories.processor.seed_s3_categories_workflow(
event.get("version"), event.get("categories_path")
),
"plugin": lambda event: plugin.processor.update_plugin(),
}


def handle(event, context) -> None:
Expand All @@ -15,17 +28,11 @@ def handle(event, context) -> None:
continue

body = record.get("body")
LOGGER.info(f"Received message with body: {body}")
logger.info(f"Received message with body: {body}")
event = json.loads(body)
event_type = event.get("type", "").lower()

# TODO: Create a dict for event_type by method to be called
if event_type == "activity":
activity.processor.update_activity()
LOGGER.info(f"Update successful for type={event_type}")
elif event_type == "seed-s3-categories":
version = event.get("version")
categories_path = event.get("categories_path")

categories.processor.seed_s3_categories_workflow(version, categories_path)
LOGGER.info(f"Update successful for type={event_type}")
processor = EVENT_TYPE_BY_PROCESSOR.get(event_type)
if processor:
processor(event)
logger.info(f"Update successful for type={event_type}")
2 changes: 1 addition & 1 deletion data-workflows/plugin/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import boto3
from botocore.exceptions import ClientError

LOGGER = logging.getLogger()
LOGGER = logging.getLogger(__name__)


def _get_cache(key: str) -> Dict:
Expand Down
28 changes: 28 additions & 0 deletions data-workflows/plugin/lambda_adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import json
import os

import boto3


class LambdaAdapter:
_client = None

@classmethod
def _init_client(cls):
cls._client = boto3.client("lambda")

def __init__(self):
if not self._client:
self._init_client()

def invoke(self, plugin: str, version: str) -> None:
"""
Invoke plugins lambda to generate manifest & write to cache.
:param plugin: name of the plugin to fetch manifest
:param version: plugin version to fetch manifest
"""
self._client.invoke(
FunctionName=os.environ.get("PLUGINS_LAMBDA_NAME"),
InvocationType="Event",
Payload=json.dumps({"plugin": plugin, "version": version}),
)
63 changes: 63 additions & 0 deletions data-workflows/plugin/metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import logging
from collections import defaultdict
from typing import Optional

from nhcommons.models.category import get_category
from nhcommons.utils.custom_parser import render_description
from nhcommons.utils.github_adapter import (
get_github_metadata, is_valid_repo_url
)
from nhcommons.utils.pypi_adapter import get_plugin_pypi_metadata

logger = logging.getLogger(__name__)


def get_formatted_metadata(plugin: str, version: str) -> Optional[dict]:
pypi_metadata = get_plugin_pypi_metadata(plugin, version)
if not pypi_metadata:
return None

metadata = _generate_metadata(pypi_metadata)
return _format_metadata(metadata)


def _format_metadata(metadata: dict) -> dict:
if "description" in metadata:
description = metadata.get("description")
metadata["description_text"] = render_description(description)
if "labels" in metadata:
labels = metadata["labels"]
version = labels.get("ontology")

if version:
cat, cat_hierarchy = _process_for_categories(labels, version)
metadata["category"] = cat
metadata["category_hierarchy"] = cat_hierarchy
else:
logger.warning(f"Invalid version in label labels={labels}")

del metadata["labels"]

return metadata


def _generate_metadata(pypi_metadata: dict) -> dict:
github_repo_url = pypi_metadata.get("code_repository")
if is_valid_repo_url(github_repo_url):
github_metadata = get_github_metadata(github_repo_url)
return {**pypi_metadata, **github_metadata}
return pypi_metadata


def _process_for_categories(labels: dict, version: str) -> (dict, dict):
categories = defaultdict(list)
category_hierarchy = defaultdict(list)
for label_term in labels.get("terms", []):
for category in get_category(label_term, version):
dimension = category.get("dimension")
label = category.get("label")
if label not in categories[dimension]:
categories[dimension].append(label)
category.get("hierarchy")[0] = label
category_hierarchy[dimension].append(category.get("hierarchy"))
return dict(categories), dict(category_hierarchy)
Loading

0 comments on commit 8562260

Please sign in to comment.