Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create maintenance activity ETL workflow #939

Merged
merged 69 commits into from
Apr 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
686b478
added the maintenance activity etl model in model.py
klai95 Mar 10, 2023
3a713e8
added the MaintenanceActivity class in model.py
klai95 Mar 10, 2023
12c2537
refactored naming of classes to GitHub in model.py
klai95 Mar 10, 2023
4a9fba6
updated table name in model.py
klai95 Mar 10, 2023
c57c671
added logic to update github activity to handler.py
klai95 Mar 10, 2023
5ace12f
added the update_github_activity method to handle updates of github d…
klai95 Mar 10, 2023
4aa3480
added _write_activity_to_dynamo method for github etl
klai95 Mar 10, 2023
f6740ef
refactored comment in update_github_activity
klai95 Mar 10, 2023
121fe70
added a new method in the GitHubActivity class
klai95 Mar 20, 2023
f19001b
deleted handler.py since it will exist after Manasa's PR is merged in
klai95 Mar 20, 2023
22af835
added more details to class and method
klai95 Mar 20, 2023
685b5fd
update class GitHubActivityType values for variables: LATEST, MONTH, …
klai95 Mar 24, 2023
129c0af
added methods to github_activity_model.py
klai95 Mar 24, 2023
45429e8
added update_github_activity method to processor.py
klai95 Mar 24, 2023
b5a4973
added code related to github data on handler.py and processor.py
klai95 Mar 27, 2023
d7a50fd
modified format for LATEST data store
klai95 Mar 28, 2023
76311f7
import github_activity_model to be used in snowflake_adapter.py
klai95 Mar 28, 2023
6d18de5
added get_plugins_with_commits_in_window method and called the method…
klai95 Mar 29, 2023
8bcfb60
modified class GitHubActivityType to reflect the schema of the mainte…
klai95 Mar 30, 2023
b6e90f0
update the get_query_timestamp_projection method
klai95 Mar 31, 2023
4f2fdc7
pushed code for debugging
klai95 Mar 31, 2023
0bd02ef
added mode sql code for fetching github commits and updated the model
klai95 Mar 31, 2023
dd5a10f
added comment to address testing code changes
klai95 Mar 31, 2023
0327d38
Testing etl workflow
klai95 Apr 1, 2023
dc67ddc
Tested the GitHubActivityType.TOTAL workflow and verified that all at…
klai95 Apr 1, 2023
0180122
Verified that the attributes saved in each item for LATEST and TOTAL …
klai95 Apr 2, 2023
30b93a6
Verified data stored in each item for all github_activity_model types…
klai95 Apr 4, 2023
b4aa881
refactored table_name and column name commit_count
klai95 Apr 4, 2023
d0caedd
moved shared methods between install_activity and github_activity to …
klai95 Apr 4, 2023
82e5a96
removed testing changes
klai95 Apr 4, 2023
7c78f73
code cleanup
klai95 Apr 4, 2023
893226d
added comment to clarify the need to convert plugin name to case sens…
klai95 Apr 4, 2023
5447019
refactored import statements and variables
klai95 Apr 4, 2023
b125fcf
addressed feedback with respect to refactoring variables
klai95 Apr 6, 2023
b757bae
addressed code review feedback
klai95 Apr 7, 2023
22b307e
fixed test errors
klai95 Apr 7, 2023
9b32c37
fixed test errors and refactored code for readability
klai95 Apr 7, 2023
b3d6e22
fixed test errors
klai95 Apr 7, 2023
d184808
added more details to comments in accumulator methods in snowflake_ad…
klai95 Apr 7, 2023
edb9c79
testing if changing timestamp to utc fixes the errors
klai95 Apr 7, 2023
98659c6
revert the previous commit since the errors persist
klai95 Apr 7, 2023
6652f4c
modified _format_timestamp to return timestamp in utc
klai95 Apr 7, 2023
6bf1d58
testing changes to see if errors are fixed
klai95 Apr 7, 2023
d9710fc
testing changes to see if errors are fixed
klai95 Apr 7, 2023
abcd710
reverted changes since tests are still failing due to time difference…
klai95 Apr 7, 2023
0fab8d0
reverted changes due to test errors persisting
klai95 Apr 7, 2023
117aa87
removed unused import statements
klai95 Apr 7, 2023
1cf72e7
addressed all code review feedback
klai95 Apr 7, 2023
011e255
refactored get_query to improve code readability
klai95 Apr 7, 2023
15c777d
updated install activity tests to reflect changes in variable renaming
klai95 Apr 7, 2023
e682c39
refactored handler.py to separate updates for install and github acti…
klai95 Apr 7, 2023
6501770
added changes such that commit data for hidden plugins is included in…
klai95 Apr 10, 2023
ef4555b
Merge branch 'main' into maintenance-etl-workflow
klai95 Apr 10, 2023
9918c80
added changes to include commit data to excluded plugins
klai95 Apr 10, 2023
5b34897
added import statement
klai95 Apr 10, 2023
1e39083
refactored code to capture hidden plugins since there is no data for …
klai95 Apr 11, 2023
5030424
revert changes adding hidden plugins
klai95 Apr 11, 2023
607ccd8
adding terraform changes for accessing s3 in the data-workflows lambda
klai95 Apr 11, 2023
b68e480
refactored terraform code
klai95 Apr 11, 2023
b52e098
change if statement to elif statement since only one would execute
klai95 Apr 11, 2023
328b3c7
addressed code review feedback and a few more optimizations
klai95 Apr 12, 2023
805e063
refactored test_snowflake_adapter.py
klai95 Apr 12, 2023
42d32f3
more code cleanup
klai95 Apr 12, 2023
6ddf071
converted plugin_name to lower case to be stored in dynamoDB to maint…
klai95 Apr 13, 2023
e7cdafe
refactored helpers.py and github_activity_model.py such that the dict…
klai95 Apr 13, 2023
d917516
addressed feedback and refactored github_activity_model.py even further
klai95 Apr 14, 2023
dd71781
added docstring to transform_and_write_to_dynamo in github_activity_m…
klai95 Apr 24, 2023
fdd6496
added docstring to snowflake_adapter method to get plugins commit count
klai95 Apr 24, 2023
209bee2
addressed docstring nits
klai95 Apr 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .happy/terraform/modules/ecs-stack/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,8 @@ module data_workflows_lambda {
"SNOWFLAKE_PASSWORD" = local.snowflake_password
"SNOWFLAKE_USER" = local.snowflake_user
"STACK_NAME" = local.custom_stack_name
"BUCKET" = local.data_bucket_name
"BUCKET_PATH" = var.env == "dev" ? local.custom_stack_name : ""
klai95 marked this conversation as resolved.
Show resolved Hide resolved
}

log_retention_in_days = 14
Expand Down Expand Up @@ -410,6 +412,13 @@ data aws_iam_policy_document backend_policy {
}

data aws_iam_policy_document data_workflows_policy {
statement {
actions = [
"s3:GetObject",
]

resources = ["${local.data_bucket_arn}/*"]
}
statement {
actions = [
"dynamodb:Query",
Expand Down
130 changes: 130 additions & 0 deletions data-workflows/activity/github_activity_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import logging
import time
from datetime import datetime
from enum import Enum, auto
from typing import List, Union
import os

from pynamodb.models import Model
from pynamodb.attributes import UnicodeAttribute, NumberAttribute

from utils.utils import get_current_timestamp, date_to_utc_timestamp_in_millis, datetime_to_utc_timestamp_in_millis
from plugin.helpers import _get_cache, _get_repo_to_plugin_dict


LOGGER = logging.getLogger()
TIMESTAMP_FORMAT = "TO_TIMESTAMP('{0:%Y-%m-%d %H:%M:%S}')"


class GitHubActivityType(Enum):
def __new__(cls, timestamp_formatter, type_identifier_formatter, query_projection, query_sorting):
github_activity_type = object.__new__(cls)
github_activity_type._value = auto()
github_activity_type.timestamp_formatter = timestamp_formatter
github_activity_type.type_identifier_formatter = type_identifier_formatter
github_activity_type.query_projection = query_projection
github_activity_type.query_sorting = query_sorting
return github_activity_type

LATEST = (datetime_to_utc_timestamp_in_millis, 'LATEST:{0}',
'repo AS name, to_timestamp(max(commit_author_date)) as latest_commit', 'name')
MONTH = (date_to_utc_timestamp_in_millis, 'MONTH:{1:%Y%m}:{0}',
'repo AS name, date_trunc("month", to_date(commit_author_date)) as month, count(*) as commit_count',
'name, month')
TOTAL = (lambda timestamp: None, 'TOTAL:{0}', 'repo AS name, count(*) as commit_count', 'name')

def format_to_timestamp(self, timestamp: datetime) -> Union[int, None]:
return self.timestamp_formatter(timestamp)

def format_to_type_identifier(self, repo_name: str, identifier_timestamp: str) -> str:
return self.type_identifier_formatter.format(repo_name, identifier_timestamp)

def _create_subquery(self, plugins_by_earliest_ts: dict[str, datetime]) -> str:
if self is GitHubActivityType.MONTH:
return " OR ".join(
[
f"repo = '{name}' AND to_timestamp(commit_author_date) >= "
f"{TIMESTAMP_FORMAT.format(ts.replace(day=1))}"
for name, ts in plugins_by_earliest_ts.items()
]
)
return f"""repo IN ({','.join([f"'{plugin}'" for plugin in plugins_by_earliest_ts.keys()])})"""

def get_query(self, plugins_by_earliest_ts: dict[str, datetime]) -> str:
return f"""
SELECT
{self.query_projection}
FROM
imaging.github.commits
WHERE
repo_type = 'plugin'
AND {self._create_subquery(plugins_by_earliest_ts)}
GROUP BY {self.query_sorting}
ORDER BY {self.query_sorting}
"""


class GitHubActivity(Model):
class Meta:
host = os.getenv('LOCAL_DYNAMO_HOST')
region = os.getenv('AWS_REGION', 'us-west-2')
table_name = f"{os.getenv('STACK_NAME', 'local')}-github-activity"

plugin_name = UnicodeAttribute(hash_key=True)
type_identifier = UnicodeAttribute(range_key=True)
granularity = UnicodeAttribute(attr_name='type')
timestamp = NumberAttribute(null=True)
commit_count = NumberAttribute(null=True)
repo = UnicodeAttribute()
last_updated_timestamp = NumberAttribute(default_for_new=get_current_timestamp)

def __eq__(self, other):
if isinstance(other, GitHubActivity):
return (
self.plugin_name == other.plugin_name and
self.type_identifier == other.type_identifier and
self.granularity == other.granularity and
self.timestamp == other.timestamp and
self.commit_count == other.commit_count and
self.repo == other.repo
)
return False


def transform_and_write_to_dynamo(data: dict[str, List], activity_type: GitHubActivityType) -> None:
"""Transforms plugin commit data generated by get_plugins_commit_count_since_timestamp to the expected format
and then writes the formatted data to the corresponding github-activity dynamo table in each environment
:param dict[str, list] data: plugin commit data of type dictionary in which the key is plugin name
of type str and the value is Github activities of type list
:param GitHubActivityType activity_type:
"""
LOGGER.info(f'Starting item creation for github-activity type={activity_type.name}')

batch = GitHubActivity.batch_write()

start = time.perf_counter()
count = 0
repo_to_plugin_dict = _get_repo_to_plugin_dict()
for repo, github_activities in data.items():
plugin_name = repo_to_plugin_dict.get(repo)
if plugin_name is None:
continue
for activity in github_activities:
identifier_timestamp = activity.get('timestamp', '')
timestamp = activity.get('timestamp')
commit_count = activity.get('count')
item = GitHubActivity(
plugin_name,
activity_type.format_to_type_identifier(repo, identifier_timestamp),
granularity=activity_type.name,
timestamp=activity_type.format_to_timestamp(timestamp),
commit_count=commit_count,
repo=repo)
batch.save(item)
count += 1

batch.commit()
duration = (time.perf_counter() - start) * 1000

LOGGER.info(f'Items github-activity type={activity_type.name} count={count}')
LOGGER.info(f'Transform and write to github-activity type={activity_type.name} timeTaken={duration}ms')
12 changes: 4 additions & 8 deletions data-workflows/activity/install_activity_model.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,18 @@
import logging
import time
from datetime import datetime, timezone
from datetime import datetime
from enum import Enum, auto
from typing import List, Union
import os

from pynamodb.models import Model
from pynamodb.attributes import UnicodeAttribute, NumberAttribute

from utils.utils import get_current_timestamp
from utils.utils import get_current_timestamp, datetime_to_utc_timestamp_in_millis

LOGGER = logging.getLogger()


def to_utc_timestamp_in_millis(timestamp: datetime) -> int:
return int(timestamp.replace(tzinfo=timezone.utc).timestamp() * 1000)


class InstallActivityType(Enum):

def __new__(cls, timestamp_formatter, type_timestamp_formatter):
Expand All @@ -26,8 +22,8 @@ def __new__(cls, timestamp_formatter, type_timestamp_formatter):
install_activity_type.type_timestamp_formatter = type_timestamp_formatter
return install_activity_type

DAY = (to_utc_timestamp_in_millis, 'DAY:{0:%Y%m%d}')
MONTH = (to_utc_timestamp_in_millis, 'MONTH:{0:%Y%m}')
DAY = (datetime_to_utc_timestamp_in_millis, 'DAY:{0:%Y%m%d}')
MONTH = (datetime_to_utc_timestamp_in_millis, 'MONTH:{0:%Y%m}')
TOTAL = (lambda timestamp: None, 'TOTAL:')

def format_to_timestamp(self, timestamp: datetime) -> Union[int, None]:
Expand Down
25 changes: 21 additions & 4 deletions data-workflows/activity/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,38 @@

from activity.install_activity_model import InstallActivityType
import activity.install_activity_model as install_model
from activity.github_activity_model import GitHubActivityType
import activity.github_activity_model as github_model
import activity.snowflake_adapter as snowflake_adapter

LOGGER = logging.getLogger()


def _fetch_data_and_write_to_dynamo(data: dict[str, datetime], install_activity_type: InstallActivityType):
def _fetch_install_data_and_write_to_dynamo(data: dict[str, datetime], install_activity_type: InstallActivityType):
plugin_install_data = snowflake_adapter.get_plugins_install_count_since_timestamp(data, install_activity_type)
install_model.transform_and_write_to_dynamo(plugin_install_data, install_activity_type)


def _fetch_github_data_and_write_to_dynamo(data: dict[str, datetime], github_activity_type: GitHubActivityType):
plugin_commit_data = snowflake_adapter.get_plugins_commit_count_since_timestamp(data, github_activity_type)
github_model.transform_and_write_to_dynamo(plugin_commit_data, github_activity_type)


def update_install_activity(start_time: int, end_time: int):
updated_plugins = snowflake_adapter.get_plugins_with_installs_in_window(start_time, end_time)
LOGGER.info(f'Plugins with new install activity count={len(updated_plugins)}')
if len(updated_plugins) == 0:
return
_fetch_data_and_write_to_dynamo(updated_plugins, InstallActivityType.DAY)
_fetch_data_and_write_to_dynamo(updated_plugins, InstallActivityType.MONTH)
_fetch_data_and_write_to_dynamo(updated_plugins, InstallActivityType.TOTAL)
_fetch_install_data_and_write_to_dynamo(updated_plugins, InstallActivityType.DAY)
_fetch_install_data_and_write_to_dynamo(updated_plugins, InstallActivityType.MONTH)
_fetch_install_data_and_write_to_dynamo(updated_plugins, InstallActivityType.TOTAL)


def update_github_activity(start_time: int, end_time: int):
updated_plugins = snowflake_adapter.get_plugins_with_commits_in_window(start_time, end_time)
LOGGER.info(f'Plugins with new github activity count={len(updated_plugins)}')
if len(updated_plugins) == 0:
return
_fetch_github_data_and_write_to_dynamo(updated_plugins, GitHubActivityType.LATEST)
_fetch_github_data_and_write_to_dynamo(updated_plugins, GitHubActivityType.MONTH)
_fetch_github_data_and_write_to_dynamo(updated_plugins, GitHubActivityType.TOTAL)
104 changes: 87 additions & 17 deletions data-workflows/activity/snowflake_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from snowflake.connector.cursor import SnowflakeCursor

from activity.install_activity_model import InstallActivityType
from activity.github_activity_model import GitHubActivityType

LOGGER = logging.getLogger()
TIMESTAMP_FORMAT = "TO_TIMESTAMP('{0:%Y-%m-%d %H:%M:%S}')"
Expand All @@ -17,27 +18,26 @@
def get_plugins_with_installs_in_window(start_millis: int, end_millis: int) -> dict[str, datetime]:
query = f"""
SELECT
LOWER(file_project) AS plugin, DATE_TRUNC('DAY', MIN(timestamp)) AS earliest_timestamp
LOWER(file_project) AS name, DATE_TRUNC('DAY', MIN(timestamp)) AS earliest_timestamp
klai95 marked this conversation as resolved.
Show resolved Hide resolved
FROM
imaging.pypi.labeled_downloads
WHERE
download_type = 'pip'
AND project_type = 'plugin'
AND TO_TIMESTAMP(ingestion_timestamp) > {_format_timestamp(timestamp_millis=start_millis)}
AND TO_TIMESTAMP(ingestion_timestamp) <= {_format_timestamp(timestamp_millis=end_millis)}
GROUP BY file_project
ORDER BY file_project
GROUP BY name
ORDER BY name
"""

LOGGER.info(f'Querying for plugins added between start_timestamp={start_millis} end_timestamp={end_millis}')
return _mapped_query_results(query, "PYPI", {}, _cursor_to_timestamp_by_plugin_mapper)
return _mapped_query_results(query, "PYPI", {}, _cursor_to_timestamp_by_name_mapper)


def get_plugins_install_count_since_timestamp(plugins_by_earliest_ts: dict[str, datetime],
install_activity_type: InstallActivityType) -> dict[str, List]:
query = f"""
SELECT
LOWER(file_project) AS plugin,
LOWER(file_project) AS name,
{install_activity_type.get_query_timestamp_projection()} AS timestamp,
COUNT(*) AS count
FROM
Expand All @@ -46,13 +46,55 @@ def get_plugins_install_count_since_timestamp(plugins_by_earliest_ts: dict[str,
download_type = 'pip'
AND project_type = 'plugin'
AND ({_generate_subquery_by_type(plugins_by_earliest_ts, install_activity_type)})
GROUP BY 1, 2
ORDER BY 1, 2
GROUP BY name, timestamp
ORDER BY name, timestamp
"""
LOGGER.info(f'Fetching data for granularity={install_activity_type.name}')
return _mapped_query_results(query, 'PYPI', {}, _cursor_to_plugin_activity_mapper)


def get_plugins_with_commits_in_window(start_millis: int, end_millis: int) -> dict[str, datetime]:
query = f"""
SELECT
repo AS name, DATE_TRUNC('DAY', TO_DATE(min(commit_author_date))) AS earliest_timestamp
klai95 marked this conversation as resolved.
Show resolved Hide resolved
FROM
imaging.github.commits
WHERE
repo_type = 'plugin'
AND TO_TIMESTAMP(ingestion_time) > {_format_timestamp(timestamp_millis=start_millis)}
AND TO_TIMESTAMP(ingestion_time) <= {_format_timestamp(timestamp_millis=end_millis)}
GROUP BY name
ORDER BY name
"""
LOGGER.info(f'Querying for plugins added between start_timestamp={start_millis} end_timestamp={end_millis}')
return _mapped_query_results(query, 'GITHUB', {}, _cursor_to_timestamp_by_name_mapper)


def get_plugins_commit_count_since_timestamp(plugins_by_earliest_ts: dict[str, datetime],
github_activity_type: GitHubActivityType) -> dict[str, List]:
"""This method gets the commit data since a specific starting point for each plugin.
If GitHubActivityType.LATEST, fetch the latest commit timestamp, so construct query without commit count constraint
If GitHubActivityType.MONTH, fetch the sum of commits from the beginning of the month of the timestamp specified
for each of the plugin.
If GitHubActivityType.TOTAL, fetch the sum of commits over all time, so construct query without timestamp constraint
:param dict[str, datetime] plugins_by_earliest_ts: plugin name by earliest timestamp of commit record added
:param GitHubActivityType github_activity_type:
"""
if github_activity_type is GitHubActivityType.LATEST:
accumulator_updater = _cursor_to_plugin_github_activity_latest_mapper
elif github_activity_type is GitHubActivityType.MONTH:
accumulator_updater = _cursor_to_plugin_activity_mapper
else:
accumulator_updater = _cursor_to_plugin_github_activity_total_mapper
LOGGER.info(f'Fetching data for granularity={github_activity_type.name}')
return _mapped_query_results(
query=github_activity_type.get_query(plugins_by_earliest_ts),
schema="GITHUB",
accumulator={},
accumulator_updater=accumulator_updater,
)


def _generate_subquery_by_type(plugins_by_timestamp: dict[str, datetime], install_activity_type: InstallActivityType):
"""
Returns subquery clause generated from the plugins_by_timestamp data based on the InstallActivityType. It is used to
Expand Down Expand Up @@ -82,30 +124,58 @@ def _format_timestamp(timestamp_millis):
return TIMESTAMP_FORMAT.format(datetime.utcfromtimestamp(timestamp_millis / 1000.0))


def _cursor_to_timestamp_by_plugin_mapper(accumulator: dict[str, datetime], cursor) -> dict[str, datetime]:
def _cursor_to_timestamp_by_name_mapper(accumulator: dict[str, datetime], cursor) -> dict[str, datetime]:
"""
Updates the accumulator with data from the cursor. Timestamp is added to the accumulator keyed on plugin name.
The cursor contains the fields plugin_name and earliest_timestamp
Updates the accumulator with data from the cursor. Timestamp is added to the accumulator keyed on name.
The cursor contains the fields name and earliest_timestamp
:param dict[str, datetime] accumulator: Accumulator that will be updated with new data
:param SnowflakeCursor cursor:
:returns: Accumulator after data from cursor has been added
"""
for plugin, earliest_timestamp in cursor:
accumulator[plugin] = earliest_timestamp
for name, earliest_timestamp in cursor:
accumulator[name] = earliest_timestamp
return accumulator


def _cursor_to_plugin_activity_mapper(accumulator: dict[str, List], cursor) -> dict[str, List]:
"""
Updates the accumulator with data from the cursor. Object with timestamp and count attributes are created from the
cursor record and added to the accumulator keyed on plugin name.
The cursor contains the fields plugin, timestamp, and count
cursor record and added to the accumulator keyed on name.
The cursor contains the fields name, timestamp, and count
:param dict[str, List] accumulator: Accumulator that will be updated with new data
:param SnowflakeCursor cursor:
klai95 marked this conversation as resolved.
Show resolved Hide resolved
:returns: Accumulator after data from cursor has been added
"""
for name, timestamp, count in cursor:
accumulator.setdefault(name, []).append({'timestamp': timestamp, 'count': count})
return accumulator


def _cursor_to_plugin_github_activity_latest_mapper(accumulator: dict[str, List], cursor) -> dict[str, List]:
"""
Updates the accumulator with data from the cursor for GitHubActivityType.LATEST.
Object with timestamp is created from the cursor record and added to the accumulator keyed on repo name.
The cursor contains the fields repo name and timestamp
:param dict[str, List] accumulator: Accumulator that will be updated with new data
:param SnowflakeCursor cursor:
:returns: Accumulator after data from cursor has been added
"""
for name, timestamp in cursor:
accumulator.setdefault(name, []).append({'timestamp': timestamp})
return accumulator


def _cursor_to_plugin_github_activity_total_mapper(accumulator: dict[str, List], cursor) -> dict[str, List]:
"""
Updates the accumulator with data from the cursor for GitHubActivityType.TOTAL.
Object with count are created from the cursor record and added to the accumulator keyed on repo name.
The cursor contains the fields repo name and count
:param dict[str, List] accumulator: Accumulator that will be updated with new data
:param SnowflakeCursor cursor:
:returns: Accumulator after data from cursor has been added
"""
for plugin, timestamp, count in cursor:
accumulator.setdefault(plugin, []).append({'timestamp': timestamp, 'count': count})
for name, count in cursor:
accumulator.setdefault(name, []).append({'count': count})
return accumulator


Expand Down
Loading