Skip to content

Commit

Permalink
Create maintenance activity ETL workflow (#939)
Browse files Browse the repository at this point in the history
* added the maintenance activity etl model in model.py

* added the MaintenanceActivity class in model.py

* refactored naming of classes to GitHub in model.py

* updated table name in model.py

* added logic to update github activity to handler.py

* added the update_github_activity method to handle updates of github data with respect to dyanamodb

* added _write_activity_to_dynamo method for github etl

* refactored comment in update_github_activity

* added a new method in the GitHubActivity class

* deleted handler.py since it will exist after Manasa's PR is merged in

* added more details to class and method

* update class GitHubActivityType values for variables: LATEST, MONTH, TOTAL

* added methods to github_activity_model.py

* added update_github_activity method to processor.py

* added code related to github data on handler.py and processor.py

* modified format for LATEST data store

* import github_activity_model to be used in snowflake_adapter.py

* added get_plugins_with_commits_in_window method and called the method in update_github_activity method

* modified class GitHubActivityType to reflect the schema of the maintenance activity ETL workflow

* update the get_query_timestamp_projection method

* pushed code for debugging

* added mode sql code for fetching github commits and updated the model

* added comment to address testing code changes

* Testing etl workflow

* Tested the GitHubActivityType.TOTAL workflow and verified that all attributes of all items in github_activity_model.py have the correct data prior to writing data to table

* Verified that the attributes saved in each item for LATEST and TOTAL are correct prior to saving data to DynamoDB

* Verified data stored in each item for all github_activity_model types are correct prior to storing in DynamoDN

* refactored table_name and column name commit_count

* moved shared methods between install_activity and github_activity to utils.py and modified install_activity_model.py and github_activity_model.py to reflect the changes

* removed testing changes

* code cleanup

* added comment to clarify the need to convert plugin name to case sensitive form

* refactored import statements and variables

* addressed feedback with respect to refactoring variables

* addressed code review feedback

* fixed test errors

* fixed test errors and refactored code for readability

* fixed test errors

* added more details to comments in accumulator methods in snowflake_adapter.py

* testing if changing timestamp to utc fixes the errors

* revert the previous commit since the errors persist

* modified _format_timestamp to return timestamp in utc

* testing changes to see if errors are fixed

* testing changes to see if errors are fixed

* reverted changes since tests are still failing due to time difference in tests

* reverted changes due to test errors persisting

* removed unused import statements

* addressed all code review feedback

* refactored get_query to improve code readability

* updated install activity tests to reflect changes in variable renaming

* refactored handler.py to separate updates for install and github activities

* added changes such that commit data for hidden plugins is included in DynamoDB

* added changes to include commit data to excluded plugins

* added import statement

* refactored code to capture hidden plugins since there is no data for non-hidden plugins in the exclusion list

* revert changes adding hidden plugins

* adding terraform changes for accessing s3 in the data-workflows lambda

* refactored terraform code

* change if statement to elif statement since only one would execute

* addressed code review feedback and a few more optimizations

* refactored test_snowflake_adapter.py

* more code cleanup

* converted plugin_name to lower case to be stored in dynamoDB to maintain parity with the implementation of install activity

* refactored helpers.py and github_activity_model.py such that the dictionary contains lower case value of plugin name, and therefore there is no need for plugin_name.lower()

* addressed feedback and refactored github_activity_model.py even further

* added docstring to transform_and_write_to_dynamo in github_activity_model.py

* added docstring to snowflake_adapter method to get plugins commit count

* addressed docstring nits
  • Loading branch information
klai95 committed Apr 26, 2023
1 parent 84bfc32 commit e036ab2
Show file tree
Hide file tree
Showing 11 changed files with 303 additions and 35 deletions.
9 changes: 9 additions & 0 deletions .happy/terraform/modules/ecs-stack/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,8 @@ module data_workflows_lambda {
"SNOWFLAKE_PASSWORD" = local.snowflake_password
"SNOWFLAKE_USER" = local.snowflake_user
"STACK_NAME" = local.custom_stack_name
"BUCKET" = local.data_bucket_name
"BUCKET_PATH" = var.env == "dev" ? local.custom_stack_name : ""
}

log_retention_in_days = 14
Expand Down Expand Up @@ -410,6 +412,13 @@ data aws_iam_policy_document backend_policy {
}

data aws_iam_policy_document data_workflows_policy {
statement {
actions = [
"s3:GetObject",
]

resources = ["${local.data_bucket_arn}/*"]
}
statement {
actions = [
"dynamodb:Query",
Expand Down
130 changes: 130 additions & 0 deletions data-workflows/activity/github_activity_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import logging
import time
from datetime import datetime
from enum import Enum, auto
from typing import List, Union
import os

from pynamodb.models import Model
from pynamodb.attributes import UnicodeAttribute, NumberAttribute

from utils.utils import get_current_timestamp, date_to_utc_timestamp_in_millis, datetime_to_utc_timestamp_in_millis
from plugin.helpers import _get_cache, _get_repo_to_plugin_dict


LOGGER = logging.getLogger()
TIMESTAMP_FORMAT = "TO_TIMESTAMP('{0:%Y-%m-%d %H:%M:%S}')"


class GitHubActivityType(Enum):
def __new__(cls, timestamp_formatter, type_identifier_formatter, query_projection, query_sorting):
github_activity_type = object.__new__(cls)
github_activity_type._value = auto()
github_activity_type.timestamp_formatter = timestamp_formatter
github_activity_type.type_identifier_formatter = type_identifier_formatter
github_activity_type.query_projection = query_projection
github_activity_type.query_sorting = query_sorting
return github_activity_type

LATEST = (datetime_to_utc_timestamp_in_millis, 'LATEST:{0}',
'repo AS name, to_timestamp(max(commit_author_date)) as latest_commit', 'name')
MONTH = (date_to_utc_timestamp_in_millis, 'MONTH:{1:%Y%m}:{0}',
'repo AS name, date_trunc("month", to_date(commit_author_date)) as month, count(*) as commit_count',
'name, month')
TOTAL = (lambda timestamp: None, 'TOTAL:{0}', 'repo AS name, count(*) as commit_count', 'name')

def format_to_timestamp(self, timestamp: datetime) -> Union[int, None]:
return self.timestamp_formatter(timestamp)

def format_to_type_identifier(self, repo_name: str, identifier_timestamp: str) -> str:
return self.type_identifier_formatter.format(repo_name, identifier_timestamp)

def _create_subquery(self, plugins_by_earliest_ts: dict[str, datetime]) -> str:
if self is GitHubActivityType.MONTH:
return " OR ".join(
[
f"repo = '{name}' AND to_timestamp(commit_author_date) >= "
f"{TIMESTAMP_FORMAT.format(ts.replace(day=1))}"
for name, ts in plugins_by_earliest_ts.items()
]
)
return f"""repo IN ({','.join([f"'{plugin}'" for plugin in plugins_by_earliest_ts.keys()])})"""

def get_query(self, plugins_by_earliest_ts: dict[str, datetime]) -> str:
return f"""
SELECT
{self.query_projection}
FROM
imaging.github.commits
WHERE
repo_type = 'plugin'
AND {self._create_subquery(plugins_by_earliest_ts)}
GROUP BY {self.query_sorting}
ORDER BY {self.query_sorting}
"""


class GitHubActivity(Model):
class Meta:
host = os.getenv('LOCAL_DYNAMO_HOST')
region = os.getenv('AWS_REGION', 'us-west-2')
table_name = f"{os.getenv('STACK_NAME', 'local')}-github-activity"

plugin_name = UnicodeAttribute(hash_key=True)
type_identifier = UnicodeAttribute(range_key=True)
granularity = UnicodeAttribute(attr_name='type')
timestamp = NumberAttribute(null=True)
commit_count = NumberAttribute(null=True)
repo = UnicodeAttribute()
last_updated_timestamp = NumberAttribute(default_for_new=get_current_timestamp)

def __eq__(self, other):
if isinstance(other, GitHubActivity):
return (
self.plugin_name == other.plugin_name and
self.type_identifier == other.type_identifier and
self.granularity == other.granularity and
self.timestamp == other.timestamp and
self.commit_count == other.commit_count and
self.repo == other.repo
)
return False


def transform_and_write_to_dynamo(data: dict[str, List], activity_type: GitHubActivityType) -> None:
"""Transforms plugin commit data generated by get_plugins_commit_count_since_timestamp to the expected format
and then writes the formatted data to the corresponding github-activity dynamo table in each environment
:param dict[str, list] data: plugin commit data of type dictionary in which the key is plugin name
of type str and the value is Github activities of type list
:param GitHubActivityType activity_type:
"""
LOGGER.info(f'Starting item creation for github-activity type={activity_type.name}')

batch = GitHubActivity.batch_write()

start = time.perf_counter()
count = 0
repo_to_plugin_dict = _get_repo_to_plugin_dict()
for repo, github_activities in data.items():
plugin_name = repo_to_plugin_dict.get(repo)
if plugin_name is None:
continue
for activity in github_activities:
identifier_timestamp = activity.get('timestamp', '')
timestamp = activity.get('timestamp')
commit_count = activity.get('count')
item = GitHubActivity(
plugin_name,
activity_type.format_to_type_identifier(repo, identifier_timestamp),
granularity=activity_type.name,
timestamp=activity_type.format_to_timestamp(timestamp),
commit_count=commit_count,
repo=repo)
batch.save(item)
count += 1

batch.commit()
duration = (time.perf_counter() - start) * 1000

LOGGER.info(f'Items github-activity type={activity_type.name} count={count}')
LOGGER.info(f'Transform and write to github-activity type={activity_type.name} timeTaken={duration}ms')
12 changes: 4 additions & 8 deletions data-workflows/activity/install_activity_model.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,18 @@
import logging
import time
from datetime import datetime, timezone
from datetime import datetime
from enum import Enum, auto
from typing import List, Union
import os

from pynamodb.models import Model
from pynamodb.attributes import UnicodeAttribute, NumberAttribute

from utils.utils import get_current_timestamp
from utils.utils import get_current_timestamp, datetime_to_utc_timestamp_in_millis

LOGGER = logging.getLogger()


def to_utc_timestamp_in_millis(timestamp: datetime) -> int:
return int(timestamp.replace(tzinfo=timezone.utc).timestamp() * 1000)


class InstallActivityType(Enum):

def __new__(cls, timestamp_formatter, type_timestamp_formatter):
Expand All @@ -26,8 +22,8 @@ def __new__(cls, timestamp_formatter, type_timestamp_formatter):
install_activity_type.type_timestamp_formatter = type_timestamp_formatter
return install_activity_type

DAY = (to_utc_timestamp_in_millis, 'DAY:{0:%Y%m%d}')
MONTH = (to_utc_timestamp_in_millis, 'MONTH:{0:%Y%m}')
DAY = (datetime_to_utc_timestamp_in_millis, 'DAY:{0:%Y%m%d}')
MONTH = (datetime_to_utc_timestamp_in_millis, 'MONTH:{0:%Y%m}')
TOTAL = (lambda timestamp: None, 'TOTAL:')

def format_to_timestamp(self, timestamp: datetime) -> Union[int, None]:
Expand Down
25 changes: 21 additions & 4 deletions data-workflows/activity/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,38 @@

from activity.install_activity_model import InstallActivityType
import activity.install_activity_model as install_model
from activity.github_activity_model import GitHubActivityType
import activity.github_activity_model as github_model
import activity.snowflake_adapter as snowflake_adapter

LOGGER = logging.getLogger()


def _fetch_data_and_write_to_dynamo(data: dict[str, datetime], install_activity_type: InstallActivityType):
def _fetch_install_data_and_write_to_dynamo(data: dict[str, datetime], install_activity_type: InstallActivityType):
plugin_install_data = snowflake_adapter.get_plugins_install_count_since_timestamp(data, install_activity_type)
install_model.transform_and_write_to_dynamo(plugin_install_data, install_activity_type)


def _fetch_github_data_and_write_to_dynamo(data: dict[str, datetime], github_activity_type: GitHubActivityType):
plugin_commit_data = snowflake_adapter.get_plugins_commit_count_since_timestamp(data, github_activity_type)
github_model.transform_and_write_to_dynamo(plugin_commit_data, github_activity_type)


def update_install_activity(start_time: int, end_time: int):
updated_plugins = snowflake_adapter.get_plugins_with_installs_in_window(start_time, end_time)
LOGGER.info(f'Plugins with new install activity count={len(updated_plugins)}')
if len(updated_plugins) == 0:
return
_fetch_data_and_write_to_dynamo(updated_plugins, InstallActivityType.DAY)
_fetch_data_and_write_to_dynamo(updated_plugins, InstallActivityType.MONTH)
_fetch_data_and_write_to_dynamo(updated_plugins, InstallActivityType.TOTAL)
_fetch_install_data_and_write_to_dynamo(updated_plugins, InstallActivityType.DAY)
_fetch_install_data_and_write_to_dynamo(updated_plugins, InstallActivityType.MONTH)
_fetch_install_data_and_write_to_dynamo(updated_plugins, InstallActivityType.TOTAL)


def update_github_activity(start_time: int, end_time: int):
updated_plugins = snowflake_adapter.get_plugins_with_commits_in_window(start_time, end_time)
LOGGER.info(f'Plugins with new github activity count={len(updated_plugins)}')
if len(updated_plugins) == 0:
return
_fetch_github_data_and_write_to_dynamo(updated_plugins, GitHubActivityType.LATEST)
_fetch_github_data_and_write_to_dynamo(updated_plugins, GitHubActivityType.MONTH)
_fetch_github_data_and_write_to_dynamo(updated_plugins, GitHubActivityType.TOTAL)
104 changes: 87 additions & 17 deletions data-workflows/activity/snowflake_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from snowflake.connector.cursor import SnowflakeCursor

from activity.install_activity_model import InstallActivityType
from activity.github_activity_model import GitHubActivityType

LOGGER = logging.getLogger()
TIMESTAMP_FORMAT = "TO_TIMESTAMP('{0:%Y-%m-%d %H:%M:%S}')"
Expand All @@ -17,27 +18,26 @@
def get_plugins_with_installs_in_window(start_millis: int, end_millis: int) -> dict[str, datetime]:
query = f"""
SELECT
LOWER(file_project) AS plugin, DATE_TRUNC('DAY', MIN(timestamp)) AS earliest_timestamp
LOWER(file_project) AS name, DATE_TRUNC('DAY', MIN(timestamp)) AS earliest_timestamp
FROM
imaging.pypi.labeled_downloads
WHERE
download_type = 'pip'
AND project_type = 'plugin'
AND TO_TIMESTAMP(ingestion_timestamp) > {_format_timestamp(timestamp_millis=start_millis)}
AND TO_TIMESTAMP(ingestion_timestamp) <= {_format_timestamp(timestamp_millis=end_millis)}
GROUP BY file_project
ORDER BY file_project
GROUP BY name
ORDER BY name
"""

LOGGER.info(f'Querying for plugins added between start_timestamp={start_millis} end_timestamp={end_millis}')
return _mapped_query_results(query, "PYPI", {}, _cursor_to_timestamp_by_plugin_mapper)
return _mapped_query_results(query, "PYPI", {}, _cursor_to_timestamp_by_name_mapper)


def get_plugins_install_count_since_timestamp(plugins_by_earliest_ts: dict[str, datetime],
install_activity_type: InstallActivityType) -> dict[str, List]:
query = f"""
SELECT
LOWER(file_project) AS plugin,
LOWER(file_project) AS name,
{install_activity_type.get_query_timestamp_projection()} AS timestamp,
COUNT(*) AS count
FROM
Expand All @@ -46,13 +46,55 @@ def get_plugins_install_count_since_timestamp(plugins_by_earliest_ts: dict[str,
download_type = 'pip'
AND project_type = 'plugin'
AND ({_generate_subquery_by_type(plugins_by_earliest_ts, install_activity_type)})
GROUP BY 1, 2
ORDER BY 1, 2
GROUP BY name, timestamp
ORDER BY name, timestamp
"""
LOGGER.info(f'Fetching data for granularity={install_activity_type.name}')
return _mapped_query_results(query, 'PYPI', {}, _cursor_to_plugin_activity_mapper)


def get_plugins_with_commits_in_window(start_millis: int, end_millis: int) -> dict[str, datetime]:
query = f"""
SELECT
repo AS name, DATE_TRUNC('DAY', TO_DATE(min(commit_author_date))) AS earliest_timestamp
FROM
imaging.github.commits
WHERE
repo_type = 'plugin'
AND TO_TIMESTAMP(ingestion_time) > {_format_timestamp(timestamp_millis=start_millis)}
AND TO_TIMESTAMP(ingestion_time) <= {_format_timestamp(timestamp_millis=end_millis)}
GROUP BY name
ORDER BY name
"""
LOGGER.info(f'Querying for plugins added between start_timestamp={start_millis} end_timestamp={end_millis}')
return _mapped_query_results(query, 'GITHUB', {}, _cursor_to_timestamp_by_name_mapper)


def get_plugins_commit_count_since_timestamp(plugins_by_earliest_ts: dict[str, datetime],
github_activity_type: GitHubActivityType) -> dict[str, List]:
"""This method gets the commit data since a specific starting point for each plugin.
If GitHubActivityType.LATEST, fetch the latest commit timestamp, so construct query without commit count constraint
If GitHubActivityType.MONTH, fetch the sum of commits from the beginning of the month of the timestamp specified
for each of the plugin.
If GitHubActivityType.TOTAL, fetch the sum of commits over all time, so construct query without timestamp constraint
:param dict[str, datetime] plugins_by_earliest_ts: plugin name by earliest timestamp of commit record added
:param GitHubActivityType github_activity_type:
"""
if github_activity_type is GitHubActivityType.LATEST:
accumulator_updater = _cursor_to_plugin_github_activity_latest_mapper
elif github_activity_type is GitHubActivityType.MONTH:
accumulator_updater = _cursor_to_plugin_activity_mapper
else:
accumulator_updater = _cursor_to_plugin_github_activity_total_mapper
LOGGER.info(f'Fetching data for granularity={github_activity_type.name}')
return _mapped_query_results(
query=github_activity_type.get_query(plugins_by_earliest_ts),
schema="GITHUB",
accumulator={},
accumulator_updater=accumulator_updater,
)


def _generate_subquery_by_type(plugins_by_timestamp: dict[str, datetime], install_activity_type: InstallActivityType):
"""
Returns subquery clause generated from the plugins_by_timestamp data based on the InstallActivityType. It is used to
Expand Down Expand Up @@ -82,30 +124,58 @@ def _format_timestamp(timestamp_millis):
return TIMESTAMP_FORMAT.format(datetime.utcfromtimestamp(timestamp_millis / 1000.0))


def _cursor_to_timestamp_by_plugin_mapper(accumulator: dict[str, datetime], cursor) -> dict[str, datetime]:
def _cursor_to_timestamp_by_name_mapper(accumulator: dict[str, datetime], cursor) -> dict[str, datetime]:
"""
Updates the accumulator with data from the cursor. Timestamp is added to the accumulator keyed on plugin name.
The cursor contains the fields plugin_name and earliest_timestamp
Updates the accumulator with data from the cursor. Timestamp is added to the accumulator keyed on name.
The cursor contains the fields name and earliest_timestamp
:param dict[str, datetime] accumulator: Accumulator that will be updated with new data
:param SnowflakeCursor cursor:
:returns: Accumulator after data from cursor has been added
"""
for plugin, earliest_timestamp in cursor:
accumulator[plugin] = earliest_timestamp
for name, earliest_timestamp in cursor:
accumulator[name] = earliest_timestamp
return accumulator


def _cursor_to_plugin_activity_mapper(accumulator: dict[str, List], cursor) -> dict[str, List]:
"""
Updates the accumulator with data from the cursor. Object with timestamp and count attributes are created from the
cursor record and added to the accumulator keyed on plugin name.
The cursor contains the fields plugin, timestamp, and count
cursor record and added to the accumulator keyed on name.
The cursor contains the fields name, timestamp, and count
:param dict[str, List] accumulator: Accumulator that will be updated with new data
:param SnowflakeCursor cursor:
:returns: Accumulator after data from cursor has been added
"""
for name, timestamp, count in cursor:
accumulator.setdefault(name, []).append({'timestamp': timestamp, 'count': count})
return accumulator


def _cursor_to_plugin_github_activity_latest_mapper(accumulator: dict[str, List], cursor) -> dict[str, List]:
"""
Updates the accumulator with data from the cursor for GitHubActivityType.LATEST.
Object with timestamp is created from the cursor record and added to the accumulator keyed on repo name.
The cursor contains the fields repo name and timestamp
:param dict[str, List] accumulator: Accumulator that will be updated with new data
:param SnowflakeCursor cursor:
:returns: Accumulator after data from cursor has been added
"""
for name, timestamp in cursor:
accumulator.setdefault(name, []).append({'timestamp': timestamp})
return accumulator


def _cursor_to_plugin_github_activity_total_mapper(accumulator: dict[str, List], cursor) -> dict[str, List]:
"""
Updates the accumulator with data from the cursor for GitHubActivityType.TOTAL.
Object with count are created from the cursor record and added to the accumulator keyed on repo name.
The cursor contains the fields repo name and count
:param dict[str, List] accumulator: Accumulator that will be updated with new data
:param SnowflakeCursor cursor:
:returns: Accumulator after data from cursor has been added
"""
for plugin, timestamp, count in cursor:
accumulator.setdefault(plugin, []).append({'timestamp': timestamp, 'count': count})
for name, count in cursor:
accumulator.setdefault(name, []).append({'count': count})
return accumulator


Expand Down
Loading

0 comments on commit e036ab2

Please sign in to comment.