From e34a96b1862a0ccf59a6cbeb5b2cfd3ef7ad3b7d Mon Sep 17 00:00:00 2001 From: Manasa Venkatakrishnan <14958785+manasaV3@users.noreply.github.com> Date: Wed, 6 Sep 2023 10:41:16 -0700 Subject: [PATCH] Updating maintenance metrics workflow to support overwrite (#1222) * Adding expiry for github table * Updating dynamo model to add expiry * Updating github_activity_models * Updating data generation * Removing timestamp criteria for commit query * Fixing datetime -> date for month and refactoring * Updating Tests * Update data-workflows/activity/tests/test_github_activity_model.py Co-authored-by: Ashley Anderson * Minor refactor --------- Co-authored-by: Ashley Anderson --- .happy/terraform/modules/ecs-stack/main.tf | 2 + data-workflows/__init__.py | 0 .../activity/github_activity_model.py | 95 +++--- data-workflows/activity/snowflake_adapter.py | 153 ++++++--- data-workflows/activity/tests/conftest.py | 61 ++++ data-workflows/activity/tests/test_fixture.py | 22 ++ .../tests/test_github_activity_model.py | 138 ++------- .../tests/test_github_activity_type.py | 135 ++++++++ .../tests/test_github_snowflake_adapter.py | 268 ++++++++++++++++ .../activity/tests/test_snowflake_adapter.py | 293 ++++++++++++------ data-workflows/activity/utils.py | 21 ++ data-workflows/utils/utils.py | 7 +- .../src/nhcommons/models/github_activity.py | 8 +- .../tests/models/test_github_activity.py | 39 +-- 14 files changed, 922 insertions(+), 320 deletions(-) delete mode 100644 data-workflows/__init__.py create mode 100644 data-workflows/activity/tests/conftest.py create mode 100644 data-workflows/activity/tests/test_fixture.py create mode 100644 data-workflows/activity/tests/test_github_activity_type.py create mode 100644 data-workflows/activity/tests/test_github_snowflake_adapter.py create mode 100644 data-workflows/activity/utils.py diff --git a/.happy/terraform/modules/ecs-stack/main.tf b/.happy/terraform/modules/ecs-stack/main.tf index 7faf783eb..ec4ad09cb 100644 --- a/.happy/terraform/modules/ecs-stack/main.tf +++ b/.happy/terraform/modules/ecs-stack/main.tf @@ -146,6 +146,8 @@ module github_dynamodb_table { type = "S" } ] + ttl_enabled = true + ttl_attribute_name = "expiry" autoscaling_enabled = var.env == "dev" ? false : true create_table = true tags = var.tags diff --git a/data-workflows/__init__.py b/data-workflows/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/data-workflows/activity/github_activity_model.py b/data-workflows/activity/github_activity_model.py index 40137b46d..e05e19f2b 100644 --- a/data-workflows/activity/github_activity_model.py +++ b/data-workflows/activity/github_activity_model.py @@ -1,72 +1,85 @@ import logging import time -from datetime import datetime +from datetime import date, datetime, time as dt_time from enum import Enum, auto -from typing import Union, Optional +from typing import Callable, Optional -from nhcommons.models.github_activity import batch_write -from utils.utils import ( - date_to_utc_timestamp_in_millis, datetime_to_utc_timestamp_in_millis -) +from dateutil.relativedelta import relativedelta +from nhcommons.models.github_activity import batch_write +from utils.utils import datetime_to_utc_timestamp_in_millis, to_datetime logger = logging.getLogger(__name__) TIMESTAMP_FORMAT = "TO_TIMESTAMP('{0:%Y-%m-%d %H:%M:%S}')" class GitHubActivityType(Enum): - def __new__(cls, timestamp_formatter, type_id_formatter, projection, sort): + def __new__( + cls, + timestamp_formatter: Callable[[date], Optional[int]], + type_id_formatter: str, + projection: str, + sort: str, + expiry_formatter: Callable[[date], Optional[int]], + ): github_activity_type = object.__new__(cls) github_activity_type._value = auto() github_activity_type.timestamp_formatter = timestamp_formatter github_activity_type.type_identifier_formatter = type_id_formatter github_activity_type.query_projection = projection github_activity_type.query_sorting = sort + github_activity_type.expiry_formatter = expiry_formatter return github_activity_type LATEST = ( datetime_to_utc_timestamp_in_millis, "LATEST:{repo}", "TO_TIMESTAMP(MAX(commit_author_date)) AS latest_commit", - "name" + "name", + lambda timestamp: None, ) MONTH = ( - date_to_utc_timestamp_in_millis, + lambda ts: datetime_to_utc_timestamp_in_millis(to_datetime(ts)), "MONTH:{timestamp:%Y%m}:{repo}", "DATE_TRUNC('month', TO_DATE(commit_author_date)) AS month, " "COUNT(*) AS commit_count", - "name, month" + "name, month", + lambda ts: int((to_datetime(ts) + relativedelta(months=14)).timestamp()), ) TOTAL = ( lambda timestamp: None, "TOTAL:{repo}", "COUNT(*) AS commit_count", - "name" + "name", + lambda timestamp: None, ) - def format_to_timestamp(self, timestamp: datetime) -> Union[int, None]: + def format_to_timestamp(self, timestamp: Optional[date]) -> Optional[int]: return self.timestamp_formatter(timestamp) - def format_to_type_identifier(self, - repo_name: str, - timestamp: Optional[datetime]) -> str: + def format_to_type_identifier( + self, repo_name: str, timestamp: Optional[date] + ) -> str: return self.type_identifier_formatter.format( repo=repo_name, timestamp=timestamp ) - def _create_subquery( - self, plugins_by_earliest_ts: dict[str, datetime] - ) -> str: - if self is GitHubActivityType.MONTH: - return " OR ".join( - [ - f"repo = '{name}' AND TO_TIMESTAMP(commit_author_date) >= " - f"{TIMESTAMP_FORMAT.format(ts.replace(day=1))}" - for name, ts in plugins_by_earliest_ts.items() - ] - ) + def to_expiry(self, timestamp: Optional[date]) -> Optional[int]: + return self.expiry_formatter(timestamp) + + def _create_subquery(self, plugins_by_earliest_ts: dict[str, datetime]) -> str: plugins = [f"'{plugin}'" for plugin in plugins_by_earliest_ts.keys()] - return f"repo IN ({','.join(plugins)})" + plugins_subquery = f"repo IN ({','.join(plugins)})" + + if self is not GitHubActivityType.MONTH: + return plugins_subquery + + earliest_date = (date.today() - relativedelta(months=14)).replace(day=1) + timestamp = datetime.combine(earliest_date, dt_time.min) + return ( + f"{plugins_subquery} AND " + f"TO_TIMESTAMP(commit_author_date) >= {TIMESTAMP_FORMAT.format(timestamp)}" + ) def get_query(self, plugins_by_earliest_ts: dict[str, datetime]) -> str: return f""" @@ -83,14 +96,14 @@ def get_query(self, plugins_by_earliest_ts: dict[str, datetime]) -> str: """ -def transform_and_write_to_dynamo(data: dict[str, list], - activity_type: GitHubActivityType, - plugin_name_by_repo: dict[str, str]) -> None: - """Transforms data generated by get_plugins_commit_count_since_timestamp to - the expected format and then writes the formatted data to the corresponding - github-activity dynamo table in each environment - :param dict[str, list] data: plugin commit data in which the key is plugin - name and the value is GitHub activities +def transform_and_write_to_dynamo( + data: dict[str, list], + activity_type: GitHubActivityType, + plugin_name_by_repo: dict[str, str], +) -> None: + """Transforms data to the json of _GitHubActivity model and then batch writes the + formatted data to github-activity dynamo table + :param dict[str, list] data: plugin commit activities data keyed on plugin name :param GitHubActivityType activity_type: :param dict[str, str] plugin_name_by_repo: dict mapping repo to plugin name """ @@ -105,11 +118,10 @@ def transform_and_write_to_dynamo(data: dict[str, list], if plugin_name is None: logger.warning(f"Unable to find plugin name for repo={repo}") continue + for activity in github_activities: timestamp = activity.get("timestamp") - type_identifier = activity_type.format_to_type_identifier( - repo, timestamp - ) + type_identifier = activity_type.format_to_type_identifier(repo, timestamp) item = { "plugin_name": plugin_name.lower(), "type_identifier": type_identifier, @@ -117,10 +129,13 @@ def transform_and_write_to_dynamo(data: dict[str, list], "timestamp": activity_type.format_to_timestamp(timestamp), "commit_count": activity.get("count"), "repo": repo, + "expiry": activity_type.to_expiry(timestamp), } batch.append(item) batch_write(batch) duration = (time.perf_counter() - start) * 1000 - logger.info(f"Completed processing for github-activity type={granularity} " - f"count={len(batch)} timeTaken={duration}ms") + logger.info( + f"Completed processing for github-activity type={granularity} " + f"count={len(batch)} timeTaken={duration}ms" + ) diff --git a/data-workflows/activity/snowflake_adapter.py b/data-workflows/activity/snowflake_adapter.py index cd6dd00d7..29c8bcc9f 100644 --- a/data-workflows/activity/snowflake_adapter.py +++ b/data-workflows/activity/snowflake_adapter.py @@ -1,3 +1,4 @@ +import copy import logging import time from datetime import datetime @@ -10,15 +11,19 @@ from activity.install_activity_model import InstallActivityType from activity.github_activity_model import GitHubActivityType +from activity.utils import generate_months_default_value LOGGER = logging.getLogger(__name__) TIMESTAMP_FORMAT = "TO_TIMESTAMP('{0:%Y-%m-%d %H:%M:%S}')" -def get_plugins_with_installs_in_window(start_millis: int, end_millis: int) -> dict[str, datetime]: +def get_plugins_with_installs_in_window( + start_millis: int, end_millis: int +) -> dict[str, datetime]: query = f""" SELECT - LOWER(file_project) AS name, DATE_TRUNC('DAY', MIN(timestamp)) AS earliest_timestamp + LOWER(file_project) AS name, + DATE_TRUNC('DAY', MIN(timestamp)) AS earliest_timestamp FROM imaging.pypi.labeled_downloads WHERE @@ -29,12 +34,16 @@ def get_plugins_with_installs_in_window(start_millis: int, end_millis: int) -> d GROUP BY name ORDER BY name """ - LOGGER.info(f'Querying for plugins added between start_timestamp={start_millis} end_timestamp={end_millis}') + LOGGER.info( + f"Querying for plugins added between start_timestamp={start_millis} end_timestamp={end_millis}" + ) return _mapped_query_results(query, "PYPI", {}, _cursor_to_timestamp_by_name_mapper) -def get_plugins_install_count_since_timestamp(plugins_by_earliest_ts: dict[str, datetime], - install_activity_type: InstallActivityType) -> dict[str, List]: +def get_plugins_install_count_since_timestamp( + plugins_by_earliest_ts: dict[str, datetime], + install_activity_type: InstallActivityType, +) -> dict[str, List]: query = f""" SELECT LOWER(file_project) AS name, @@ -49,14 +58,19 @@ def get_plugins_install_count_since_timestamp(plugins_by_earliest_ts: dict[str, GROUP BY name, ts ORDER BY name, ts """ - LOGGER.info(f'Fetching data for granularity={install_activity_type.name}') - return _mapped_query_results(query, 'PYPI', {}, _cursor_to_plugin_activity_mapper) + LOGGER.info(f"Fetching data for granularity={install_activity_type.name}") + return _mapped_query_results( + query, "PYPI", {}, _get_cursor_to_plugin_activity_mapper([]) + ) -def get_plugins_with_commits_in_window(start_millis: int, end_millis: int) -> dict[str, datetime]: +def get_plugins_with_commits_in_window( + start_millis: int, end_millis: int +) -> dict[str, datetime]: query = f""" SELECT - repo AS name, DATE_TRUNC('DAY', TO_DATE(min(commit_author_date))) AS earliest_timestamp + repo AS name, + DATE_TRUNC('DAY', TO_DATE(min(commit_author_date))) AS earliest_timestamp FROM imaging.github.commits WHERE @@ -66,27 +80,37 @@ def get_plugins_with_commits_in_window(start_millis: int, end_millis: int) -> di GROUP BY name ORDER BY name """ - LOGGER.info(f'Querying for plugins added between start_timestamp={start_millis} end_timestamp={end_millis}') - return _mapped_query_results(query, 'GITHUB', {}, _cursor_to_timestamp_by_name_mapper) + LOGGER.info( + f"Querying for plugins added between start_timestamp={start_millis} end_timestamp={end_millis}" + ) + return _mapped_query_results( + query, "GITHUB", {}, _cursor_to_timestamp_by_name_mapper + ) -def get_plugins_commit_count_since_timestamp(plugins_by_earliest_ts: dict[str, datetime], - github_activity_type: GitHubActivityType) -> dict[str, List]: +def get_plugins_commit_count_since_timestamp( + plugins_by_earliest_ts: dict[str, datetime], + github_activity_type: GitHubActivityType, +) -> dict[str, List]: """This method gets the commit data since a specific starting point for each plugin. - If GitHubActivityType.LATEST, fetch the latest commit timestamp, so construct query without commit count constraint - If GitHubActivityType.MONTH, fetch the sum of commits from the beginning of the month of the timestamp specified - for each of the plugin. - If GitHubActivityType.TOTAL, fetch the sum of commits over all time, so construct query without timestamp constraint - :param dict[str, datetime] plugins_by_earliest_ts: plugin name by earliest timestamp of commit record added + If GitHubActivityType.LATEST, fetch the latest commit timestamp, so construct query + without commit count constraint + If GitHubActivityType.MONTH, fetch the sum of commits from the beginning of the + month of the timestamp specified for each of the plugin. + If GitHubActivityType.TOTAL, fetch the sum of commits over all time, so construct + query without timestamp constraint + :param dict[str, datetime] plugins_by_earliest_ts: plugin name by earliest timestamp + of commit record added :param GitHubActivityType github_activity_type: """ if github_activity_type is GitHubActivityType.LATEST: accumulator_updater = _cursor_to_plugin_github_activity_latest_mapper elif github_activity_type is GitHubActivityType.MONTH: - accumulator_updater = _cursor_to_plugin_activity_mapper + default_value = generate_months_default_value(14) + accumulator_updater = _get_cursor_to_plugin_activity_mapper(default_value) else: accumulator_updater = _cursor_to_plugin_github_activity_total_mapper - LOGGER.info(f'Fetching data for granularity={github_activity_type.name}') + LOGGER.info(f"Fetching data for granularity={github_activity_type.name}") return _mapped_query_results( query=github_activity_type.get_query(plugins_by_earliest_ts), schema="GITHUB", @@ -95,7 +119,10 @@ def get_plugins_commit_count_since_timestamp(plugins_by_earliest_ts: dict[str, d ) -def _generate_subquery_by_type(plugins_by_timestamp: dict[str, datetime], install_activity_type: InstallActivityType): +def _generate_subquery_by_type( + plugins_by_timestamp: dict[str, datetime], + install_activity_type: InstallActivityType, +): """ Returns subquery clause generated from the plugins_by_timestamp data based on the InstallActivityType. It is used to get the install count since a specific starting point for each plugin. @@ -112,19 +139,28 @@ def _generate_subquery_by_type(plugins_by_timestamp: dict[str, datetime], instal return f"""LOWER(file_project) IN ({','.join([f"'{plugin}'" for plugin in plugins_by_timestamp.keys()])})""" if install_activity_type is InstallActivityType.MONTH: - plugins_by_formatted_timestamp = {plugin: ts.replace(day=1) for plugin, ts in plugins_by_timestamp.items()} + plugins_by_formatted_timestamp = { + plugin: ts.replace(day=1) for plugin, ts in plugins_by_timestamp.items() + } else: plugins_by_formatted_timestamp = plugins_by_timestamp - return ' OR '.join([f"LOWER(file_project) = '{name}' AND timestamp >= "f"{TIMESTAMP_FORMAT.format(ts)}" - for name, ts in plugins_by_formatted_timestamp.items()]) + return " OR ".join( + [ + f"LOWER(file_project) = '{name}' AND timestamp >= " + f"{TIMESTAMP_FORMAT.format(ts)}" + for name, ts in plugins_by_formatted_timestamp.items() + ] + ) def _format_timestamp(timestamp_millis): return TIMESTAMP_FORMAT.format(datetime.utcfromtimestamp(timestamp_millis / 1000.0)) -def _cursor_to_timestamp_by_name_mapper(accumulator: dict[str, datetime], cursor) -> dict[str, datetime]: +def _cursor_to_timestamp_by_name_mapper( + accumulator: dict[str, datetime], cursor +) -> dict[str, datetime]: """ Updates the accumulator with data from the cursor. Timestamp is added to the accumulator keyed on name. The cursor contains the fields name and earliest_timestamp @@ -137,21 +173,36 @@ def _cursor_to_timestamp_by_name_mapper(accumulator: dict[str, datetime], cursor return accumulator -def _cursor_to_plugin_activity_mapper(accumulator: dict[str, List], cursor) -> dict[str, List]: - """ - Updates the accumulator with data from the cursor. Object with timestamp and count attributes are created from the - cursor record and added to the accumulator keyed on name. - The cursor contains the fields name, timestamp, and count - :param dict[str, List] accumulator: Accumulator that will be updated with new data - :param SnowflakeCursor cursor: - :returns: Accumulator after data from cursor has been added - """ - for name, timestamp, count in cursor: - accumulator.setdefault(name, []).append({'timestamp': timestamp, 'count': count}) - return accumulator +def _get_cursor_to_plugin_activity_mapper( + default_val: List, +) -> Callable[[dict[str, List], Any], dict[str, List]]: + def _mapper(accumulator: dict[str, List], cursor: Any) -> dict[str, List]: + """ + Updates the accumulator with data from the cursor. Object with timestamp and + count attributes are created from the cursor record and added to the accumulator + keyed on name. + The cursor contains the fields name, timestamp, and count + :param dict[str, List] accumulator: Accumulator that will be updated with new data + :param SnowflakeCursor cursor: + :returns: Accumulator after data from cursor has been added + """ + for name, timestamp, count in cursor: + entry = accumulator.setdefault(name, copy.deepcopy(default_val)) + match = next( + (item for item in entry if item["timestamp"] == timestamp), None + ) + if match: + match["count"] = count + else: + entry.append({"timestamp": timestamp, "count": count}) + return accumulator + + return _mapper -def _cursor_to_plugin_github_activity_latest_mapper(accumulator: dict[str, List], cursor) -> dict[str, List]: +def _cursor_to_plugin_github_activity_latest_mapper( + accumulator: dict[str, List], cursor +) -> dict[str, List]: """ Updates the accumulator with data from the cursor for GitHubActivityType.LATEST. Object with timestamp is created from the cursor record and added to the accumulator keyed on repo name. @@ -159,13 +210,15 @@ def _cursor_to_plugin_github_activity_latest_mapper(accumulator: dict[str, List] :param dict[str, List] accumulator: Accumulator that will be updated with new data :param SnowflakeCursor cursor: :returns: Accumulator after data from cursor has been added - """ + """ for name, timestamp in cursor: - accumulator.setdefault(name, []).append({'timestamp': timestamp}) + accumulator.setdefault(name, []).append({"timestamp": timestamp}) return accumulator -def _cursor_to_plugin_github_activity_total_mapper(accumulator: dict[str, List], cursor) -> dict[str, List]: +def _cursor_to_plugin_github_activity_total_mapper( + accumulator: dict[str, List], cursor +) -> dict[str, List]: """ Updates the accumulator with data from the cursor for GitHubActivityType.TOTAL. Object with count are created from the cursor record and added to the accumulator keyed on repo name. @@ -173,30 +226,32 @@ def _cursor_to_plugin_github_activity_total_mapper(accumulator: dict[str, List], :param dict[str, List] accumulator: Accumulator that will be updated with new data :param SnowflakeCursor cursor: :returns: Accumulator after data from cursor has been added - """ + """ for name, count in cursor: - accumulator.setdefault(name, []).append({'count': count}) + accumulator.setdefault(name, []).append({"count": count}) return accumulator def _execute_query(schema: str, query: str) -> Iterable[SnowflakeCursor]: connection = snowflake.connector.connect( - user=os.getenv('SNOWFLAKE_USER'), - password=os.getenv('SNOWFLAKE_PASSWORD'), + user=os.getenv("SNOWFLAKE_USER"), + password=os.getenv("SNOWFLAKE_PASSWORD"), account="CZI-IMAGING", warehouse="IMAGING", database="IMAGING", - schema=schema + schema=schema, ) start = time.perf_counter() try: return connection.execute_string(query) except Exception: - LOGGER.exception(f'Exception when executing query={query}') + LOGGER.exception(f"Exception when executing query={query}") finally: duration = time.perf_counter() - start - LOGGER.info(f'Query execution time={duration * 1000}ms') + LOGGER.info(f"Query execution time={duration * 1000}ms") -def _mapped_query_results(query: str, schema: str, accumulator: Any, accumulator_updater: Callable) -> Any: +def _mapped_query_results( + query: str, schema: str, accumulator: Any, accumulator_updater: Callable +) -> Any: return reduce(accumulator_updater, _execute_query(schema, query), accumulator) diff --git a/data-workflows/activity/tests/conftest.py b/data-workflows/activity/tests/conftest.py new file mode 100644 index 000000000..d0968ab3c --- /dev/null +++ b/data-workflows/activity/tests/conftest.py @@ -0,0 +1,61 @@ +from datetime import datetime, timezone +from typing import Callable +from unittest.mock import Mock + +import pytest +import snowflake.connector + + +@pytest.fixture +def snowflake_user(): + return "super-secret-username" + + +@pytest.fixture +def snowflake_password(): + return "a-password-that-cant-be-shared" + + +@pytest.fixture(autouse=True) +def setup_connection(monkeypatch, snowflake_user, snowflake_password): + monkeypatch.setenv("SNOWFLAKE_USER", snowflake_user) + monkeypatch.setenv("SNOWFLAKE_PASSWORD", snowflake_password) + + def _setup_connection(connection_config, expected_cursor_result): + connection_mock = Mock() + + def _get_mock_snowflake_connect(*_, **kwargs): + if connection_config == kwargs: + connection_mock.execute_string.return_value = expected_cursor_result + return connection_mock + return None + + monkeypatch.setattr(snowflake.connector, "connect", _get_mock_snowflake_connect) + return connection_mock + + return _setup_connection + + +@pytest.fixture +def connection_params(snowflake_user, snowflake_password) -> dict[str, str]: + return { + "user": snowflake_user, + "password": snowflake_password, + "account": "CZI-IMAGING", + "warehouse": "IMAGING", + "database": "IMAGING", + } + + +@pytest.fixture +def to_ts() -> Callable[[int], datetime]: + return lambda epoch: datetime.fromtimestamp(epoch, tz=timezone.utc) + + +@pytest.fixture +def plugins_by_earliest_ts(to_ts) -> dict[str, datetime]: + return { + "foo": to_ts(1615680000), + "bar": to_ts(1656979200), + "baz": to_ts(1687737600), + } diff --git a/data-workflows/activity/tests/test_fixture.py b/data-workflows/activity/tests/test_fixture.py new file mode 100644 index 000000000..ee3247d61 --- /dev/null +++ b/data-workflows/activity/tests/test_fixture.py @@ -0,0 +1,22 @@ +class MockSnowflakeCursor: + def __init__(self, data, row_field_count): + self._data = data + self._size = len(data) + self._index = -1 + self._row_field_count = row_field_count + + def __iter__(self): + return self + + def __next__(self): + if self._index < self._size - 1: + self._index += 1 + if self._row_field_count == 3: + return ( + self._data[self._index][0], + self._data[self._index][1], + self._data[self._index][2], + ) + else: + return self._data[self._index][0], self._data[self._index][1] + raise StopIteration diff --git a/data-workflows/activity/tests/test_github_activity_model.py b/data-workflows/activity/tests/test_github_activity_model.py index f9ed27db0..d4e7f456b 100644 --- a/data-workflows/activity/tests/test_github_activity_model.py +++ b/data-workflows/activity/tests/test_github_activity_model.py @@ -1,5 +1,4 @@ -import re -from datetime import datetime, timezone +from datetime import datetime, timezone, time from unittest.mock import Mock import pytest @@ -7,24 +6,23 @@ import activity from activity.github_activity_model import ( - GitHubActivityType, transform_and_write_to_dynamo + transform_and_write_to_dynamo, + GitHubActivityType, ) REPO1 = "demo/FOO" REPO2 = "org2/bar" PLUGIN_BY_REPO = {REPO1: "foo", REPO2: "bar"} -PLUGIN_BY_EARLIEST_TS = { - REPO1: datetime.strptime("05/16/2023 10:24:20", "%m/%d/%Y %H:%M:%S"), - REPO2: datetime.strptime("06/26/2023 20:30:00", "%m/%d/%Y %H:%M:%S"), -} -FORMATTED_PLUGIN_BY_TS = { - repo: ts.replace(day=1) for repo, ts in PLUGIN_BY_EARLIEST_TS.items() -} -def generate_expected(data, granularity, type_id, ts_formatter): +def generate_expected( + data, granularity, type_id, ts_formatter, *, include_expiry=False +): expected = [] + for repo, values in data.items(): + if repo not in PLUGIN_BY_REPO: + continue for val in values: ts = val.get("timestamp") item = { @@ -34,92 +32,31 @@ def generate_expected(data, granularity, type_id, ts_formatter): "timestamp": ts_formatter(ts), "commit_count": val.get("count"), "repo": repo, + "expiry": expiry_format(ts, months=14) if include_expiry else None, } expected.append(item) return expected -def get_relative_timestamp(**args): +def get_relative_datetime(**args): return datetime.now() - relativedelta(**args) -def ts_day_format(timestamp): - date = timestamp.replace(hour=0, minute=0, second=0, microsecond=0) - return ts_format(date) +def ts_day_format(date): + return ts_format(datetime.combine(date, time.min)) -def ts_format(timestamp): - return int(timestamp.replace(tzinfo=timezone.utc).timestamp() * 1000) +def expiry_format(*args, **kwargs): + formatted_date = datetime.combine(args[0], time.min, timezone.utc) + return int((formatted_date + relativedelta(**kwargs)).timestamp()) -def remove_whitespace(formatted_str: str) -> str: - return re.compile(r"[ \t]+").sub(" ", formatted_str).strip() - - -def get_subquery(activity_type) -> str: - if activity_type != GitHubActivityType.MONTH: - filters = [f"'{repo}'" for repo in FORMATTED_PLUGIN_BY_TS.keys()] - return f"repo IN ({','.join(filters)})" - - filters = [ - f"repo = '{repo}' AND TO_TIMESTAMP(commit_author_date) >= " \ - f"TO_TIMESTAMP('{ts}')" - for repo, ts in FORMATTED_PLUGIN_BY_TS.items() - ] - return " OR ".join(filters) - - -@pytest.mark.parametrize( - "activity_type, timestamp, type_id, projection, group_by", [ - ( - GitHubActivityType.LATEST, - 1679394260000, - f"LATEST:{REPO1}", - "TO_TIMESTAMP(MAX(commit_author_date)) AS latest_commit", - "name", - ), - ( - GitHubActivityType.MONTH, - 1679356800000, - f"MONTH:202303:{REPO1}", - "DATE_TRUNC('month', TO_DATE(commit_author_date)) AS month, " - "COUNT(*) AS commit_count", - "name, month", - ), - ( - GitHubActivityType.TOTAL, - None, - f"TOTAL:{REPO1}", - "COUNT(*) AS commit_count", - "name", - ) - ] -) -def test_github_activity_type( - activity_type, timestamp, type_id, projection, group_by -): - input_ts = datetime.strptime("03/21/2023 10:24:20", "%m/%d/%Y %H:%M:%S") - assert activity_type.format_to_timestamp(input_ts) == timestamp - assert activity_type.format_to_type_identifier(REPO1, input_ts) == type_id - expected_query = f""" - SELECT - repo AS name, - {projection} - FROM - imaging.github.commits - WHERE - repo_type = 'plugin' - AND ({get_subquery(activity_type)}) - GROUP BY {group_by} - ORDER BY {group_by} - """ - actual = activity_type.get_query(PLUGIN_BY_EARLIEST_TS) - assert remove_whitespace(actual) == remove_whitespace(expected_query) +def ts_format(timestamp): + return int(timestamp.replace(tzinfo=timezone.utc).timestamp() * 1000) class TestGitHubActivityModels: - @pytest.fixture def mock_batch_write(self, monkeypatch): mock_batch_write = Mock() @@ -130,40 +67,34 @@ def mock_batch_write(self, monkeypatch): def test_transform_to_dynamo_records_for_latest(self, mock_batch_write): data = { - "demo/FOO": [{"timestamp": get_relative_timestamp(days=30)}], - "org1/baz": [{"timestamp": get_relative_timestamp(days=1)}], - "org2/bar": [{"timestamp": get_relative_timestamp(days=23)}], + "demo/FOO": [{"timestamp": get_relative_datetime(days=30)}], + "org1/baz": [{"timestamp": get_relative_datetime(days=1)}], + "org2/bar": [{"timestamp": get_relative_datetime(days=23)}], } - transform_and_write_to_dynamo( - data, GitHubActivityType.LATEST, PLUGIN_BY_REPO - ) + transform_and_write_to_dynamo(data, GitHubActivityType.LATEST, PLUGIN_BY_REPO) - data.pop("org1/baz") expected = generate_expected(data, "LATEST", "LATEST:{repo}", ts_format) mock_batch_write.assert_called_once_with(expected) def test_transform_to_dynamo_records_for_month(self, mock_batch_write): data = { "demo/FOO": [ - {"timestamp": get_relative_timestamp(months=24), "count": 2}, - {"timestamp": get_relative_timestamp(months=13), "count": 3} + {"timestamp": get_relative_datetime(months=24).date(), "count": 2}, + {"timestamp": get_relative_datetime(months=13).date(), "count": 3}, ], "org1/baz": [ - {"timestamp": get_relative_timestamp(months=15), "count": 8}, - {"timestamp": get_relative_timestamp(months=14), "count": 7}, - {"timestamp": get_relative_timestamp(months=12), "count": 8}, - {"timestamp": get_relative_timestamp(months=11), "count": 7} + {"timestamp": get_relative_datetime(months=15).date(), "count": 8}, + {"timestamp": get_relative_datetime(months=14).date(), "count": 7}, + {"timestamp": get_relative_datetime(months=12).date(), "count": 8}, + {"timestamp": get_relative_datetime(months=11).date(), "count": 7}, ], } - transform_and_write_to_dynamo( - data, GitHubActivityType.MONTH, PLUGIN_BY_REPO - ) + transform_and_write_to_dynamo(data, GitHubActivityType.MONTH, PLUGIN_BY_REPO) - data.pop("org1/baz") expected = generate_expected( - data, "MONTH", "MONTH:{ts:%Y%m}:{repo}", ts_day_format + data, "MONTH", "MONTH:{ts:%Y%m}:{repo}", ts_day_format, include_expiry=True ) mock_batch_write.assert_called_once_with(expected) @@ -174,12 +105,7 @@ def test_transform_to_dynamo_records_for_total(self, mock_batch_write): "org2/bar": [{"count": 65}], } - transform_and_write_to_dynamo( - data, GitHubActivityType.TOTAL, PLUGIN_BY_REPO - ) + transform_and_write_to_dynamo(data, GitHubActivityType.TOTAL, PLUGIN_BY_REPO) - data.pop("org1/baz") - expected = generate_expected( - data, "TOTAL", "TOTAL:{repo}", lambda ts: None - ) + expected = generate_expected(data, "TOTAL", "TOTAL:{repo}", lambda ts: None) mock_batch_write.assert_called_once_with(expected) diff --git a/data-workflows/activity/tests/test_github_activity_type.py b/data-workflows/activity/tests/test_github_activity_type.py new file mode 100644 index 000000000..ad335abd6 --- /dev/null +++ b/data-workflows/activity/tests/test_github_activity_type.py @@ -0,0 +1,135 @@ +import re +from datetime import datetime, date, time +from typing import Optional, Callable, Union + +import pytest +from dateutil.relativedelta import relativedelta + +from activity.github_activity_model import GitHubActivityType + +REPO = "demo/FOO" + + +def get_input_ts() -> datetime: + return datetime.strptime("03/21/2023 10:24:20", "%m/%d/%Y %H:%M:%S") + + +REPO1 = "demo/FOO" +REPO2 = "org2/bar" + + +@pytest.fixture +def plugin_by_earliest_ts() -> dict[str, datetime]: + return { + REPO1: datetime.strptime("05/16/2023 10:24:20", "%m/%d/%Y %H:%M:%S"), + REPO2: datetime.strptime("06/26/2023 20:30:00", "%m/%d/%Y %H:%M:%S"), + } + + +@pytest.fixture +def remove_whitespace() -> Callable[[str], str]: + return lambda formatted_str: re.compile(r"[ \t]+").sub(" ", formatted_str).strip() + + +@pytest.fixture +def get_maintenance_subquery( + plugin_by_earliest_ts: dict[str, datetime] +) -> Callable[[GitHubActivityType], str]: + def _get_maintenance_subquery(activity_type: GitHubActivityType) -> str: + filters = [f"'{repo}'" for repo in plugin_by_earliest_ts.keys()] + if activity_type != GitHubActivityType.MONTH: + return f"repo IN ({','.join(filters)})" + + ts = datetime.combine(datetime.now() - relativedelta(months=14), time.min) + return ( + f"repo IN ({','.join(filters)}) AND TO_TIMESTAMP(commit_author_date) >= " + f"TO_TIMESTAMP('{ts.replace(day=1)}')" + ) + + return _get_maintenance_subquery + + +@pytest.mark.parametrize( + "activity_type, projection, group_by", + [ + ( + GitHubActivityType.LATEST, + "TO_TIMESTAMP(MAX(commit_author_date)) AS latest_commit", + "name", + ), + ( + GitHubActivityType.MONTH, + "DATE_TRUNC('month', TO_DATE(commit_author_date)) AS month, " + "COUNT(*) AS commit_count", + "name, month", + ), + ( + GitHubActivityType.TOTAL, + "COUNT(*) AS commit_count", + "name", + ), + ], +) +def test_github_activity_type_query_generation( + activity_type: GitHubActivityType, + projection: str, + group_by: str, + plugin_by_earliest_ts: dict[str, datetime], + get_maintenance_subquery: Callable[[GitHubActivityType], str], + remove_whitespace: Callable[[str], str], +): + expected_query = f""" + SELECT + repo AS name, + {projection} + FROM + imaging.github.commits + WHERE + repo_type = 'plugin' + AND ({get_maintenance_subquery(activity_type)}) + GROUP BY {group_by} + ORDER BY {group_by} + """ + actual = activity_type.get_query(plugin_by_earliest_ts) + assert remove_whitespace(actual) == remove_whitespace(expected_query) + + +@pytest.mark.parametrize( + "activity_type, input_ts, timestamp, type_id, expiry", + [ + ( + GitHubActivityType.LATEST, + get_input_ts(), + 1679394260000, + f"LATEST:{REPO}", + None, + ), + ( + GitHubActivityType.MONTH, + get_input_ts().date(), + 1679356800000, + f"MONTH:202303:{REPO}", + 1716249600, + ), + ( + GitHubActivityType.MONTH, + get_input_ts(), + 1679356800000, + f"MONTH:202303:{REPO}", + 1716249600, + ), + (GitHubActivityType.TOTAL, None, None, f"TOTAL:{REPO}", None), + (GitHubActivityType.TOTAL, get_input_ts().date(), None, f"TOTAL:{REPO}", None), + (GitHubActivityType.TOTAL, get_input_ts(), None, f"TOTAL:{REPO}", None), + ], +) +def test_github_activity_type( + activity_type: GitHubActivityType, + input_ts: Union[date, datetime, None], + timestamp: Optional[int], + type_id: str, + expiry: Optional[int], +): + assert activity_type.format_to_timestamp(input_ts) == timestamp + assert activity_type.format_to_type_identifier(REPO, input_ts) == type_id + assert activity_type.to_expiry(input_ts) == expiry diff --git a/data-workflows/activity/tests/test_github_snowflake_adapter.py b/data-workflows/activity/tests/test_github_snowflake_adapter.py new file mode 100644 index 000000000..1b8c19887 --- /dev/null +++ b/data-workflows/activity/tests/test_github_snowflake_adapter.py @@ -0,0 +1,268 @@ +import pytest +from datetime import datetime, timezone, time +from dateutil.relativedelta import relativedelta + +from activity.github_activity_model import GitHubActivityType +from activity.snowflake_adapter import ( + get_plugins_with_commits_in_window, + get_plugins_commit_count_since_timestamp, +) +from activity.tests.test_fixture import MockSnowflakeCursor + +START_TIME = 1615705553000 +END_TIME = datetime.now().replace(tzinfo=timezone.utc) +END_TIMESTAMP = int(END_TIME.timestamp() * 1000) + + +def to_datetime(epoch) -> datetime: + return datetime.fromtimestamp(epoch, tz=timezone.utc) + + +def relative_datetime(*args, **kwargs) -> datetime: + first_day_of_month = datetime.now().replace( + day=1, hour=0, minute=0, second=0, microsecond=0, tzinfo=timezone.utc + ) + return first_day_of_month - relativedelta(**kwargs) + + +def generate_expected(matching_values): + expected = [] + for i in range(13, -1, -1): + ts = relative_datetime(months=i) + expected.append({"timestamp": ts.date(), "count": matching_values.get(ts, 0)}) + return expected + + +class TestGithubSnowflakeAdapter: + @pytest.fixture + def plugins_with_commits_in_window_query(self): + end_time = END_TIME.strftime("%Y-%m-%d %H:%M:%S") + query = f""" + SELECT + repo AS name, + DATE_TRUNC('DAY', TO_DATE(min(commit_author_date))) AS earliest_timestamp + FROM + imaging.github.commits + WHERE + repo_type = 'plugin' + AND TO_TIMESTAMP(ingestion_time) > TO_TIMESTAMP('2021-03-14 07:05:53') + AND TO_TIMESTAMP(ingestion_time) <= TO_TIMESTAMP('{end_time}') + GROUP BY name + ORDER BY name + """ + return query + + @pytest.fixture + def get_plugins_commit_count_since_timestamp_query(self): + return ( + lambda projection, subquery, grouping: f""" + SELECT + repo AS name, + {projection} + FROM + imaging.github.commits + WHERE + repo_type = 'plugin' + AND {subquery} + GROUP BY {grouping} + ORDER BY {grouping} + """ + ) + + @pytest.fixture + def github_connection_params(self, connection_params) -> dict[str, str]: + connection_params["schema"] = "GITHUB" + return connection_params + + def test_get_plugins_with_commits_in_window_no_result( + self, + github_connection_params, + setup_connection, + plugins_with_commits_in_window_query, + ): + connection_mock = setup_connection( + github_connection_params, [MockSnowflakeCursor([], 2)] + ) + + actual = get_plugins_with_commits_in_window(START_TIME, END_TIMESTAMP) + + assert {} == actual + connection_mock.execute_string.assert_called_once_with( + plugins_with_commits_in_window_query + ) + + def test_get_plugins_with_commits_in_window_with_result( + self, + github_connection_params, + setup_connection, + to_ts, + plugins_with_commits_in_window_query, + plugins_by_earliest_ts, + ): + expected_cursor_result = [ + MockSnowflakeCursor( + [["foo", to_datetime(1615680000)], ["bar", to_datetime(1656979200)]], + 2, + ), + MockSnowflakeCursor([["baz", to_datetime(1687737600)]], 2), + ] + connection_mock = setup_connection( + github_connection_params, expected_cursor_result + ) + + actual = get_plugins_with_commits_in_window(START_TIME, END_TIMESTAMP) + + assert plugins_by_earliest_ts == actual + connection_mock.execute_string.assert_called_once_with( + plugins_with_commits_in_window_query + ) + + @pytest.mark.parametrize( + "expected_cursor_result, expected", + [ + ([MockSnowflakeCursor([], 2)], {}), + ( + [ + MockSnowflakeCursor( + [ + ["foo", to_datetime(1629072000)], + ["bar", to_datetime(1666656000)], + ], + 2, + ), + MockSnowflakeCursor([["baz", to_datetime(1662940800)]], 2), + ], + { + "foo": [{"timestamp": to_datetime(1629072000)}], + "bar": [{"timestamp": to_datetime(1666656000)}], + "baz": [{"timestamp": to_datetime(1662940800)}], + }, + ), + ], + ) + def test_get_plugins_commit_count_since_timestamp_for_latest_commit( + self, + github_connection_params, + setup_connection, + plugins_by_earliest_ts, + get_plugins_commit_count_since_timestamp_query, + expected_cursor_result, + expected, + ): + connection_mock = setup_connection( + github_connection_params, expected_cursor_result + ) + + actual = get_plugins_commit_count_since_timestamp( + plugins_by_earliest_ts, GitHubActivityType.LATEST + ) + + assert expected == actual + query = get_plugins_commit_count_since_timestamp_query( + "TO_TIMESTAMP(MAX(commit_author_date)) AS latest_commit", + "(repo IN ('foo','bar','baz'))", + "name", + ) + connection_mock.execute_string.assert_called_once_with(query) + + @pytest.mark.parametrize( + "expected_cursor_result, expected", + [ + ([MockSnowflakeCursor([], 3)], {}), + ( + [ + MockSnowflakeCursor( + [ + ["foo", relative_datetime(months=1).date(), 2], + ["bar", relative_datetime(months=5).date(), 8], + ], + 3, + ), + MockSnowflakeCursor( + [["foo", relative_datetime(months=12).date(), 3]], 3 + ), + ], + { + "foo": generate_expected( + { + relative_datetime(months=1): 2, + relative_datetime(months=12): 3, + } + ), + "bar": generate_expected({relative_datetime(months=5): 8}), + }, + ), + ], + ) + def test_get_plugins_commit_count_since_timestamp_for_month( + self, + github_connection_params, + setup_connection, + plugins_by_earliest_ts, + get_plugins_commit_count_since_timestamp_query, + expected_cursor_result, + expected, + ): + connection_mock = setup_connection( + github_connection_params, expected_cursor_result + ) + + actual = get_plugins_commit_count_since_timestamp( + plugins_by_earliest_ts, GitHubActivityType.MONTH + ) + + assert expected == actual + + timestamp = datetime.combine( + datetime.now() - relativedelta(months=14), time.min + ).replace(day=1) + subquery = ( + "(repo IN ('foo','bar','baz') AND TO_TIMESTAMP(commit_author_date)" + f" >= TO_TIMESTAMP('{timestamp}'))" + ) + query = get_plugins_commit_count_since_timestamp_query( + "DATE_TRUNC('month', TO_DATE(commit_author_date)) AS month, COUNT(*) AS commit_count", + subquery, + "name, month", + ) + connection_mock.execute_string.assert_called_once_with(query) + + @pytest.mark.parametrize( + "expected_cursor_result, expected", + [ + ([MockSnowflakeCursor([], 2)], {}), + ( + [ + MockSnowflakeCursor([["foo", 2], ["bar", 8]], 2), + MockSnowflakeCursor([["baz", 10]], 2), + ], + { + "foo": [{"count": 2}], + "bar": [{"count": 8}], + "baz": [{"count": 10}], + }, + ), + ], + ) + def test_get_plugins_commit_count_since_timestamp_for_total( + self, + github_connection_params, + setup_connection, + plugins_by_earliest_ts, + get_plugins_commit_count_since_timestamp_query, + expected_cursor_result, + expected, + ): + connection_mock = setup_connection( + github_connection_params, expected_cursor_result + ) + actual = get_plugins_commit_count_since_timestamp( + plugins_by_earliest_ts, GitHubActivityType.TOTAL + ) + + assert expected == actual + subquery = "(repo IN ('foo','bar','baz'))" + query = get_plugins_commit_count_since_timestamp_query( + "COUNT(*) AS commit_count", subquery, "name" + ) + connection_mock.execute_string.assert_called_once_with(query) diff --git a/data-workflows/activity/tests/test_snowflake_adapter.py b/data-workflows/activity/tests/test_snowflake_adapter.py index 9bfcd9a88..6f0ed2188 100644 --- a/data-workflows/activity/tests/test_snowflake_adapter.py +++ b/data-workflows/activity/tests/test_snowflake_adapter.py @@ -3,36 +3,46 @@ import pytest import snowflake.connector +from dateutil.relativedelta import relativedelta from activity.install_activity_model import InstallActivityType - -SNOWFLAKE_USER = 'super-secret-username' -SNOWFLAKE_PASSWORD = 'a-password-that-cant-be-shared' +from activity.snowflake_adapter import ( + get_plugins_with_installs_in_window, + get_plugins_install_count_since_timestamp, +) +from activity.tests.test_fixture import MockSnowflakeCursor + +SNOWFLAKE_USER = "super-secret-username" +SNOWFLAKE_PASSWORD = "a-password-that-cant-be-shared" START_TIME = 1615705553000 -END_TIME = 1647241553000 - -CONNECTION_PARAMS = {'user': SNOWFLAKE_USER, 'password': SNOWFLAKE_PASSWORD, 'account': "CZI-IMAGING", - 'warehouse': "IMAGING", 'database': "IMAGING", 'schema': "PYPI", } +END_TIME = datetime.now().replace(tzinfo=timezone.utc) +END_TIMESTAMP = int(END_TIME.timestamp() * 1000) -def to_ts(epoch): +def to_ts(epoch: int) -> datetime: return datetime.fromtimestamp(epoch, tz=timezone.utc) -PLUGINS_BY_EARLIEST_TS = {'foo': to_ts(1615680000), 'bar': to_ts(1656979200), 'baz': to_ts(1687737600)} +def to_datetime(*args, **kwargs) -> datetime: + first_day_of_month = datetime.now().replace( + day=1, hour=0, minute=0, second=0, microsecond=0, tzinfo=timezone.utc + ) + return first_day_of_month - relativedelta(**kwargs) def get_plugins_with_installs_in_window_query(): - return """ + end_time = END_TIME.strftime("%Y-%m-%d %H:%M:%S") + return f""" SELECT - LOWER(file_project) AS name, DATE_TRUNC('DAY', MIN(timestamp)) AS earliest_timestamp + LOWER(file_project) AS name, + DATE_TRUNC('DAY', MIN(timestamp)) AS earliest_timestamp FROM imaging.pypi.labeled_downloads WHERE download_type = 'pip' AND project_type = 'plugin' AND TO_TIMESTAMP(ingestion_timestamp) > TO_TIMESTAMP('2021-03-14 07:05:53') - AND TO_TIMESTAMP(ingestion_timestamp) <= TO_TIMESTAMP('2022-03-14 07:05:53') + AND TO_TIMESTAMP(ingestion_timestamp) <= TO_TIMESTAMP('{end_time}') GROUP BY name ORDER BY name """ @@ -55,126 +65,209 @@ def get_plugins_install_count_since_timestamp_query(projection, subquery): """ -class MockSnowflakeCursor: - - def __init__(self, data, row_field_count): - self._data = data - self._size = len(data) - self._index = -1 - self._row_field_count = row_field_count - - def __iter__(self): - return self - - def __next__(self): - if self._index < self._size - 1: - self._index += 1 - if self._row_field_count == 3: - return self._data[self._index][0], self._data[self._index][1], self._data[self._index][2] - else: - return self._data[self._index][0], self._data[self._index][1] - raise StopIteration +def get_plugins_commit_count_since_timestamp_query(projection, subquery, grouping): + return f""" + SELECT + repo AS name, + {projection} + FROM + imaging.github.commits + WHERE + repo_type = 'plugin' + AND {subquery} + GROUP BY {grouping} + ORDER BY {grouping} + """ + + +def _generate_expected(matching_values): + expected = [] + for i in range(13, -1, -1): + ts = to_datetime(months=i) + expected.append({"timestamp": ts, "count": matching_values.get(ts, 0)}) + return expected class TestSnowflakeAdapter: def _get_mock_snowflake_connect(self, *_, **kwargs): - if CONNECTION_PARAMS == kwargs: + if self._connection_params == kwargs: self._connection_mock = Mock() - self._connection_mock.execute_string.return_value = self._expected_cursor_result + self._connection_mock.execute_string.return_value = ( + self._expected_cursor_result + ) return self._connection_mock return None @pytest.fixture(autouse=True) def _setup_method(self, monkeypatch): - monkeypatch.setenv('SNOWFLAKE_USER', SNOWFLAKE_USER) - monkeypatch.setenv('SNOWFLAKE_PASSWORD', SNOWFLAKE_PASSWORD) - monkeypatch.setattr(snowflake.connector, 'connect', self._get_mock_snowflake_connect) + monkeypatch.setenv("SNOWFLAKE_USER", SNOWFLAKE_USER) + monkeypatch.setenv("SNOWFLAKE_PASSWORD", SNOWFLAKE_PASSWORD) + monkeypatch.setattr( + snowflake.connector, "connect", self._get_mock_snowflake_connect + ) + + @pytest.fixture(autouse=True) + def connection_params(self) -> None: + self._connection_params = { + "user": SNOWFLAKE_USER, + "password": SNOWFLAKE_PASSWORD, + "account": "CZI-IMAGING", + "warehouse": "IMAGING", + "database": "IMAGING", + } + + @pytest.fixture + def plugins_by_earliest_ts(self) -> dict[str, datetime]: + return { + "foo": to_ts(1615680000), + "bar": to_ts(1656979200), + "baz": to_ts(1687737600), + } def test_get_plugins_with_installs_in_window_no_result(self): + self._connection_params["schema"] = "PYPI" self._expected_cursor_result = [MockSnowflakeCursor([], 2)] - from activity.snowflake_adapter import get_plugins_with_installs_in_window - actual = get_plugins_with_installs_in_window(START_TIME, END_TIME) - assert {} == actual - self._connection_mock.execute_string.assert_called_once_with(get_plugins_with_installs_in_window_query()) + actual = get_plugins_with_installs_in_window(START_TIME, END_TIMESTAMP) - def test_get_plugins_with_installs_in_window_with_result(self): + assert {} == actual + self._connection_mock.execute_string.assert_called_once_with( + get_plugins_with_installs_in_window_query() + ) + + def test_get_plugins_with_installs_in_window_with_result( + self, plugins_by_earliest_ts + ): + self._connection_params["schema"] = "PYPI" self._expected_cursor_result = [ - MockSnowflakeCursor([['foo', to_ts(1615680000)], ['bar', to_ts(1656979200)]], 2), - MockSnowflakeCursor([['baz', to_ts(1687737600)]], 2) + MockSnowflakeCursor( + [["foo", to_ts(1615680000)], ["bar", to_ts(1656979200)]], 2 + ), + MockSnowflakeCursor([["baz", to_ts(1687737600)]], 2), ] - from activity.snowflake_adapter import get_plugins_with_installs_in_window - actual = get_plugins_with_installs_in_window(START_TIME, END_TIME) - - assert PLUGINS_BY_EARLIEST_TS == actual - self._connection_mock.execute_string.assert_called_once_with(get_plugins_with_installs_in_window_query()) - - @pytest.mark.parametrize('expected_cursor_result,expected', [ - ([MockSnowflakeCursor([], 3)], {}), - ([ - MockSnowflakeCursor([['foo', to_ts(1629072000), 2], ['bar', to_ts(1666656000), 8]], 3), - MockSnowflakeCursor([['foo', to_ts(1662940800), 3]], 3) - ], - { - 'foo': [{'timestamp': to_ts(1629072000), 'count': 2}, {'timestamp': to_ts(1662940800), 'count': 3}], - 'bar': [{'timestamp': to_ts(1666656000), 'count': 8}], - }) - ]) - def test_get_plugins_install_count_since_timestamp_for_day(self, expected_cursor_result, expected): + actual = get_plugins_with_installs_in_window(START_TIME, END_TIMESTAMP) + + assert plugins_by_earliest_ts == actual + self._connection_mock.execute_string.assert_called_once_with( + get_plugins_with_installs_in_window_query() + ) + + @pytest.mark.parametrize( + "expected_cursor_result,expected", + [ + ([MockSnowflakeCursor([], 3)], {}), + ( + [ + MockSnowflakeCursor( + [["foo", to_ts(1629072000), 2], ["bar", to_ts(1666656000), 8]], + 3, + ), + MockSnowflakeCursor([["foo", to_ts(1662940800), 3]], 3), + ], + { + "foo": [ + {"timestamp": to_ts(1629072000), "count": 2}, + {"timestamp": to_ts(1662940800), "count": 3}, + ], + "bar": [{"timestamp": to_ts(1666656000), "count": 8}], + }, + ), + ], + ) + def test_get_plugins_install_count_since_timestamp_for_day( + self, expected_cursor_result, expected, plugins_by_earliest_ts + ): + self._connection_params["schema"] = "PYPI" self._expected_cursor_result = expected_cursor_result - from activity.snowflake_adapter import get_plugins_install_count_since_timestamp - actual = get_plugins_install_count_since_timestamp(PLUGINS_BY_EARLIEST_TS, InstallActivityType.DAY) + actual = get_plugins_install_count_since_timestamp( + plugins_by_earliest_ts, InstallActivityType.DAY + ) assert expected == actual - subquery = "LOWER(file_project) = 'foo' AND timestamp >= TO_TIMESTAMP('2021-03-14 00:00:00') OR " \ - "LOWER(file_project) = 'bar' AND timestamp >= TO_TIMESTAMP('2022-07-05 00:00:00') OR " \ - "LOWER(file_project) = 'baz' AND timestamp >= TO_TIMESTAMP('2023-06-26 00:00:00')" - query = get_plugins_install_count_since_timestamp_query("DATE_TRUNC('DAY', timestamp)", subquery) + subquery = ( + "LOWER(file_project) = 'foo' AND timestamp >= TO_TIMESTAMP('2021-03-14 00:00:00') OR " + "LOWER(file_project) = 'bar' AND timestamp >= TO_TIMESTAMP('2022-07-05 00:00:00') OR " + "LOWER(file_project) = 'baz' AND timestamp >= TO_TIMESTAMP('2023-06-26 00:00:00')" + ) + query = get_plugins_install_count_since_timestamp_query( + "DATE_TRUNC('DAY', timestamp)", subquery + ) self._connection_mock.execute_string.assert_called_once_with(query) - @pytest.mark.parametrize('expected_cursor_result,expected', [ - ([MockSnowflakeCursor([], 3)], {}), - ([ - MockSnowflakeCursor([['foo', to_ts(1629072000), 2], ['bar', to_ts(1666656000), 8]], 3), - MockSnowflakeCursor([['foo', to_ts(1662940800), 3]], 3) - ], - { - 'foo': [{'timestamp': to_ts(1629072000), 'count': 2}, {'timestamp': to_ts(1662940800), 'count': 3}], - 'bar': [{'timestamp': to_ts(1666656000), 'count': 8}], - }) - ]) - def test_get_plugins_install_count_since_timestamp_for_month(self, expected_cursor_result, expected): + @pytest.mark.parametrize( + "expected_cursor_result,expected", + [ + ([MockSnowflakeCursor([], 3)], {}), + ( + [ + MockSnowflakeCursor( + [["foo", to_ts(1629072000), 2], ["bar", to_ts(1666656000), 8]], + 3, + ), + MockSnowflakeCursor([["foo", to_ts(1662940800), 3]], 3), + ], + { + "foo": [ + {"timestamp": to_ts(1629072000), "count": 2}, + {"timestamp": to_ts(1662940800), "count": 3}, + ], + "bar": [{"timestamp": to_ts(1666656000), "count": 8}], + }, + ), + ], + ) + def test_get_plugins_install_count_since_timestamp_for_month( + self, expected_cursor_result, expected, plugins_by_earliest_ts + ): + self._connection_params["schema"] = "PYPI" self._expected_cursor_result = expected_cursor_result - from activity.snowflake_adapter import get_plugins_install_count_since_timestamp - actual = get_plugins_install_count_since_timestamp(PLUGINS_BY_EARLIEST_TS, InstallActivityType.MONTH) + actual = get_plugins_install_count_since_timestamp( + plugins_by_earliest_ts, InstallActivityType.MONTH + ) assert expected == actual - subquery = "LOWER(file_project) = 'foo' AND timestamp >= TO_TIMESTAMP('2021-03-01 00:00:00') OR " \ - "LOWER(file_project) = 'bar' AND timestamp >= TO_TIMESTAMP('2022-07-01 00:00:00') OR " \ - "LOWER(file_project) = 'baz' AND timestamp >= TO_TIMESTAMP('2023-06-01 00:00:00')" - query = get_plugins_install_count_since_timestamp_query("DATE_TRUNC('MONTH', timestamp)", subquery) + subquery = ( + "LOWER(file_project) = 'foo' AND timestamp >= TO_TIMESTAMP('2021-03-01 00:00:00') OR " + "LOWER(file_project) = 'bar' AND timestamp >= TO_TIMESTAMP('2022-07-01 00:00:00') OR " + "LOWER(file_project) = 'baz' AND timestamp >= TO_TIMESTAMP('2023-06-01 00:00:00')" + ) + query = get_plugins_install_count_since_timestamp_query( + "DATE_TRUNC('MONTH', timestamp)", subquery + ) self._connection_mock.execute_string.assert_called_once_with(query) - @pytest.mark.parametrize('expected_cursor_result,expected', [ - ([MockSnowflakeCursor([], 3)], {}), - ([ - MockSnowflakeCursor([['foo', to_ts(1629072000), 2], ['bar', to_ts(1666656000), 8]], 3), - MockSnowflakeCursor([['baz', to_ts(1662940800), 10]], 3) - ], - { - 'foo': [{'timestamp': to_ts(1629072000), 'count': 2}], - 'bar': [{'timestamp': to_ts(1666656000), 'count': 8}], - 'baz': [{'timestamp': to_ts(1662940800), 'count': 10}] - }) - ]) - def test_get_plugins_install_count_since_timestamp_for_total(self, expected_cursor_result, expected): + @pytest.mark.parametrize( + "expected_cursor_result,expected", + [ + ([MockSnowflakeCursor([], 3)], {}), + ( + [ + MockSnowflakeCursor( + [["foo", to_ts(1629072000), 2], ["bar", to_ts(1666656000), 8]], + 3, + ), + MockSnowflakeCursor([["baz", to_ts(1662940800), 10]], 3), + ], + { + "foo": [{"timestamp": to_ts(1629072000), "count": 2}], + "bar": [{"timestamp": to_ts(1666656000), "count": 8}], + "baz": [{"timestamp": to_ts(1662940800), "count": 10}], + }, + ), + ], + ) + def test_get_plugins_install_count_since_timestamp_for_total( + self, expected_cursor_result, expected, plugins_by_earliest_ts + ): + self._connection_params["schema"] = "PYPI" self._expected_cursor_result = expected_cursor_result - from activity.snowflake_adapter import get_plugins_install_count_since_timestamp - actual = get_plugins_install_count_since_timestamp(PLUGINS_BY_EARLIEST_TS, InstallActivityType.TOTAL) + actual = get_plugins_install_count_since_timestamp( + plugins_by_earliest_ts, InstallActivityType.TOTAL + ) assert expected == actual subquery = "LOWER(file_project) IN ('foo','bar','baz')" diff --git a/data-workflows/activity/utils.py b/data-workflows/activity/utils.py new file mode 100644 index 000000000..cd4d35af2 --- /dev/null +++ b/data-workflows/activity/utils.py @@ -0,0 +1,21 @@ +from datetime import datetime, date, timezone +from typing import Union + +from dateutil.relativedelta import relativedelta + + +def _to_default_entry(timestamp: date) -> dict[str, Union[date, int]]: + return { + "timestamp": timestamp, + "count": 0, + } + + +def generate_months_default_value(limit: int): + upper = datetime.now().replace( + day=1, hour=0, minute=0, second=0, microsecond=0, tzinfo=timezone.utc + ) + return [ + _to_default_entry((upper - relativedelta(months=i)).date()) + for i in range(limit - 1, -1, -1) + ] diff --git a/data-workflows/utils/utils.py b/data-workflows/utils/utils.py index 0c007ea6a..f93fbae35 100644 --- a/data-workflows/utils/utils.py +++ b/data-workflows/utils/utils.py @@ -1,6 +1,6 @@ import boto3 import json -from datetime import date, datetime, timezone +from datetime import date, datetime, timezone, time from .env import get_required_env @@ -8,9 +8,8 @@ LAST_UPDATED_TIMESTAMP_KEY = "last_activity_fetched_timestamp" -def date_to_utc_timestamp_in_millis(timestamp: date) -> int: - timestamp_datetime = datetime(timestamp.year, timestamp.month, timestamp.day) - return datetime_to_utc_timestamp_in_millis(timestamp_datetime) +def to_datetime(timestamp: date) -> datetime: + return datetime.combine(timestamp, time.min, timezone.utc) def datetime_to_utc_timestamp_in_millis(timestamp: datetime) -> int: diff --git a/napari-hub-commons/src/nhcommons/models/github_activity.py b/napari-hub-commons/src/nhcommons/models/github_activity.py index 207b66a04..b9b08972b 100644 --- a/napari-hub-commons/src/nhcommons/models/github_activity.py +++ b/napari-hub-commons/src/nhcommons/models/github_activity.py @@ -1,9 +1,9 @@ import logging import time -from typing import (Dict, Any, List) +from typing import Dict, Any, List -from pynamodb.attributes import (UnicodeAttribute, NumberAttribute) -from nhcommons.models.helper import (set_ddb_metadata, PynamoWrapper) +from pynamodb.attributes import UnicodeAttribute, NumberAttribute +from nhcommons.models.helper import set_ddb_metadata, PynamoWrapper logger = logging.getLogger(__name__) @@ -19,6 +19,7 @@ class Meta: granularity = UnicodeAttribute(attr_name="type") repo = UnicodeAttribute() timestamp = NumberAttribute(null=True) + expiry = NumberAttribute(null=True) @staticmethod def from_dict(data: Dict[str, Any]): @@ -29,6 +30,7 @@ def from_dict(data: Dict[str, Any]): granularity=data["granularity"], repo=data["repo"], timestamp=data.get("timestamp"), + expiry=data.get("expiry"), ) diff --git a/napari-hub-commons/src/nhcommons/tests/models/test_github_activity.py b/napari-hub-commons/src/nhcommons/tests/models/test_github_activity.py index df8ddcbe8..3f3f39c76 100644 --- a/napari-hub-commons/src/nhcommons/tests/models/test_github_activity.py +++ b/napari-hub-commons/src/nhcommons/tests/models/test_github_activity.py @@ -7,9 +7,7 @@ from nhcommons.models import github_activity -def get_type_identifier(granularity: str, - timestamp: datetime, - repo: str) -> str: +def get_type_identifier(granularity: str, timestamp: datetime, repo: str) -> str: if granularity == "TOTAL": return f"TOTAL:{repo}" elif granularity == "MONTH": @@ -18,29 +16,35 @@ def get_type_identifier(granularity: str, return f"LATEST:{repo}" -def generate_github_activity(name: str, - granularity: str, - repo: str, - is_input: bool, - timestamp: datetime = None, - commit_count: int = None) -> dict: +def generate_github_activity( + name: str, + granularity: str, + repo: str, + is_input: bool, + timestamp: datetime = None, + commit_count: int = None, + expiry: int = None, +) -> dict: type_key = "granularity" if is_input else "type" github_activity_item = { "plugin_name": name if is_input else name.lower(), "type_identifier": get_type_identifier(granularity, timestamp, repo), type_key: granularity, - "repo": repo + "repo": repo, } if timestamp: github_activity_item["timestamp"] = int(timestamp.timestamp() * 1000) if commit_count: github_activity_item["commit_count"] = commit_count + if expiry: + github_activity_item["expiry"] = expiry return github_activity_item def get_relative_timestamp(**args) -> datetime: - return (datetime.now() - relativedelta(**args))\ - .replace(hour=0, minute=0, second=0, microsecond=0) + return (datetime.now() - relativedelta(**args)).replace( + hour=0, minute=0, second=0, microsecond=0 + ) def generate_github_activity_list(is_input: bool) -> List[Dict[str, Any]]: @@ -58,6 +62,7 @@ def generate_github_activity_list(is_input: bool) -> List[Dict[str, Any]]: commit_count=10, timestamp=get_relative_timestamp(months=4), repo="foo/bar", + expiry=int(get_relative_timestamp(months=-14).timestamp()), is_input=is_input, ), generate_github_activity( @@ -71,7 +76,6 @@ def generate_github_activity_list(is_input: bool) -> List[Dict[str, Any]]: class TestGithubActivity: - @pytest.fixture() def table(self, create_dynamo_table): with mock_dynamodb(): @@ -82,12 +86,11 @@ def table(self, create_dynamo_table): def test_batch_write(self, table, verify_table_data): github_activity.batch_write(generate_github_activity_list(True)) - verify_table_data(generate_github_activity_list(False), - table) + verify_table_data(generate_github_activity_list(False), table) - @pytest.mark.parametrize("excluded_field", [ - "plugin_name", "type_identifier", "granularity", "repo" - ]) + @pytest.mark.parametrize( + "excluded_field", ["plugin_name", "type_identifier", "granularity", "repo"] + ) def test_batch_write_for_invalid_data(self, excluded_field, table): input_data = { "plugin_name": "Foo",