diff --git a/.happy/terraform/modules/ecs-stack/main.tf b/.happy/terraform/modules/ecs-stack/main.tf index efc6c27e1..348d9e454 100644 --- a/.happy/terraform/modules/ecs-stack/main.tf +++ b/.happy/terraform/modules/ecs-stack/main.tf @@ -243,6 +243,8 @@ module data_workflows_lambda { "SNOWFLAKE_PASSWORD" = local.snowflake_password "SNOWFLAKE_USER" = local.snowflake_user "STACK_NAME" = local.custom_stack_name + "BUCKET" = local.data_bucket_name + "BUCKET_PATH" = var.env == "dev" ? local.custom_stack_name : "" } log_retention_in_days = 14 @@ -410,6 +412,13 @@ data aws_iam_policy_document backend_policy { } data aws_iam_policy_document data_workflows_policy { + statement { + actions = [ + "s3:GetObject", + ] + + resources = ["${local.data_bucket_arn}/*"] + } statement { actions = [ "dynamodb:Query", diff --git a/data-workflows/activity/github_activity_model.py b/data-workflows/activity/github_activity_model.py new file mode 100644 index 000000000..4661070d1 --- /dev/null +++ b/data-workflows/activity/github_activity_model.py @@ -0,0 +1,130 @@ +import logging +import time +from datetime import datetime +from enum import Enum, auto +from typing import List, Union +import os + +from pynamodb.models import Model +from pynamodb.attributes import UnicodeAttribute, NumberAttribute + +from utils.utils import get_current_timestamp, date_to_utc_timestamp_in_millis, datetime_to_utc_timestamp_in_millis +from plugin.helpers import _get_cache, _get_repo_to_plugin_dict + + +LOGGER = logging.getLogger() +TIMESTAMP_FORMAT = "TO_TIMESTAMP('{0:%Y-%m-%d %H:%M:%S}')" + + +class GitHubActivityType(Enum): + def __new__(cls, timestamp_formatter, type_identifier_formatter, query_projection, query_sorting): + github_activity_type = object.__new__(cls) + github_activity_type._value = auto() + github_activity_type.timestamp_formatter = timestamp_formatter + github_activity_type.type_identifier_formatter = type_identifier_formatter + github_activity_type.query_projection = query_projection + github_activity_type.query_sorting = query_sorting + return github_activity_type + + LATEST = (datetime_to_utc_timestamp_in_millis, 'LATEST:{0}', + 'repo AS name, to_timestamp(max(commit_author_date)) as latest_commit', 'name') + MONTH = (date_to_utc_timestamp_in_millis, 'MONTH:{1:%Y%m}:{0}', + 'repo AS name, date_trunc("month", to_date(commit_author_date)) as month, count(*) as commit_count', + 'name, month') + TOTAL = (lambda timestamp: None, 'TOTAL:{0}', 'repo AS name, count(*) as commit_count', 'name') + + def format_to_timestamp(self, timestamp: datetime) -> Union[int, None]: + return self.timestamp_formatter(timestamp) + + def format_to_type_identifier(self, repo_name: str, identifier_timestamp: str) -> str: + return self.type_identifier_formatter.format(repo_name, identifier_timestamp) + + def _create_subquery(self, plugins_by_earliest_ts: dict[str, datetime]) -> str: + if self is GitHubActivityType.MONTH: + return " OR ".join( + [ + f"repo = '{name}' AND to_timestamp(commit_author_date) >= " + f"{TIMESTAMP_FORMAT.format(ts.replace(day=1))}" + for name, ts in plugins_by_earliest_ts.items() + ] + ) + return f"""repo IN ({','.join([f"'{plugin}'" for plugin in plugins_by_earliest_ts.keys()])})""" + + def get_query(self, plugins_by_earliest_ts: dict[str, datetime]) -> str: + return f""" + SELECT + {self.query_projection} + FROM + imaging.github.commits + WHERE + repo_type = 'plugin' + AND {self._create_subquery(plugins_by_earliest_ts)} + GROUP BY {self.query_sorting} + ORDER BY {self.query_sorting} + """ + + +class GitHubActivity(Model): + class Meta: + host = os.getenv('LOCAL_DYNAMO_HOST') + region = os.getenv('AWS_REGION', 'us-west-2') + table_name = f"{os.getenv('STACK_NAME', 'local')}-github-activity" + + plugin_name = UnicodeAttribute(hash_key=True) + type_identifier = UnicodeAttribute(range_key=True) + granularity = UnicodeAttribute(attr_name='type') + timestamp = NumberAttribute(null=True) + commit_count = NumberAttribute(null=True) + repo = UnicodeAttribute() + last_updated_timestamp = NumberAttribute(default_for_new=get_current_timestamp) + + def __eq__(self, other): + if isinstance(other, GitHubActivity): + return ( + self.plugin_name == other.plugin_name and + self.type_identifier == other.type_identifier and + self.granularity == other.granularity and + self.timestamp == other.timestamp and + self.commit_count == other.commit_count and + self.repo == other.repo + ) + return False + + +def transform_and_write_to_dynamo(data: dict[str, List], activity_type: GitHubActivityType) -> None: + """Transforms plugin commit data generated by get_plugins_commit_count_since_timestamp to the expected format + and then writes the formatted data to the corresponding github-activity dynamo table in each environment + :param dict[str, list] data: plugin commit data of type dictionary in which the key is plugin name + of type str and the value is Github activities of type list + :param GitHubActivityType activity_type: + """ + LOGGER.info(f'Starting item creation for github-activity type={activity_type.name}') + + batch = GitHubActivity.batch_write() + + start = time.perf_counter() + count = 0 + repo_to_plugin_dict = _get_repo_to_plugin_dict() + for repo, github_activities in data.items(): + plugin_name = repo_to_plugin_dict.get(repo) + if plugin_name is None: + continue + for activity in github_activities: + identifier_timestamp = activity.get('timestamp', '') + timestamp = activity.get('timestamp') + commit_count = activity.get('count') + item = GitHubActivity( + plugin_name, + activity_type.format_to_type_identifier(repo, identifier_timestamp), + granularity=activity_type.name, + timestamp=activity_type.format_to_timestamp(timestamp), + commit_count=commit_count, + repo=repo) + batch.save(item) + count += 1 + + batch.commit() + duration = (time.perf_counter() - start) * 1000 + + LOGGER.info(f'Items github-activity type={activity_type.name} count={count}') + LOGGER.info(f'Transform and write to github-activity type={activity_type.name} timeTaken={duration}ms') diff --git a/data-workflows/activity/install_activity_model.py b/data-workflows/activity/install_activity_model.py index 3c9275dee..f396fcfa5 100644 --- a/data-workflows/activity/install_activity_model.py +++ b/data-workflows/activity/install_activity_model.py @@ -1,6 +1,6 @@ import logging import time -from datetime import datetime, timezone +from datetime import datetime from enum import Enum, auto from typing import List, Union import os @@ -8,15 +8,11 @@ from pynamodb.models import Model from pynamodb.attributes import UnicodeAttribute, NumberAttribute -from utils.utils import get_current_timestamp +from utils.utils import get_current_timestamp, datetime_to_utc_timestamp_in_millis LOGGER = logging.getLogger() -def to_utc_timestamp_in_millis(timestamp: datetime) -> int: - return int(timestamp.replace(tzinfo=timezone.utc).timestamp() * 1000) - - class InstallActivityType(Enum): def __new__(cls, timestamp_formatter, type_timestamp_formatter): @@ -26,8 +22,8 @@ def __new__(cls, timestamp_formatter, type_timestamp_formatter): install_activity_type.type_timestamp_formatter = type_timestamp_formatter return install_activity_type - DAY = (to_utc_timestamp_in_millis, 'DAY:{0:%Y%m%d}') - MONTH = (to_utc_timestamp_in_millis, 'MONTH:{0:%Y%m}') + DAY = (datetime_to_utc_timestamp_in_millis, 'DAY:{0:%Y%m%d}') + MONTH = (datetime_to_utc_timestamp_in_millis, 'MONTH:{0:%Y%m}') TOTAL = (lambda timestamp: None, 'TOTAL:') def format_to_timestamp(self, timestamp: datetime) -> Union[int, None]: diff --git a/data-workflows/activity/processor.py b/data-workflows/activity/processor.py index c185d8064..eca7f1589 100644 --- a/data-workflows/activity/processor.py +++ b/data-workflows/activity/processor.py @@ -3,21 +3,38 @@ from activity.install_activity_model import InstallActivityType import activity.install_activity_model as install_model +from activity.github_activity_model import GitHubActivityType +import activity.github_activity_model as github_model import activity.snowflake_adapter as snowflake_adapter LOGGER = logging.getLogger() -def _fetch_data_and_write_to_dynamo(data: dict[str, datetime], install_activity_type: InstallActivityType): +def _fetch_install_data_and_write_to_dynamo(data: dict[str, datetime], install_activity_type: InstallActivityType): plugin_install_data = snowflake_adapter.get_plugins_install_count_since_timestamp(data, install_activity_type) install_model.transform_and_write_to_dynamo(plugin_install_data, install_activity_type) +def _fetch_github_data_and_write_to_dynamo(data: dict[str, datetime], github_activity_type: GitHubActivityType): + plugin_commit_data = snowflake_adapter.get_plugins_commit_count_since_timestamp(data, github_activity_type) + github_model.transform_and_write_to_dynamo(plugin_commit_data, github_activity_type) + + def update_install_activity(start_time: int, end_time: int): updated_plugins = snowflake_adapter.get_plugins_with_installs_in_window(start_time, end_time) LOGGER.info(f'Plugins with new install activity count={len(updated_plugins)}') if len(updated_plugins) == 0: return - _fetch_data_and_write_to_dynamo(updated_plugins, InstallActivityType.DAY) - _fetch_data_and_write_to_dynamo(updated_plugins, InstallActivityType.MONTH) - _fetch_data_and_write_to_dynamo(updated_plugins, InstallActivityType.TOTAL) + _fetch_install_data_and_write_to_dynamo(updated_plugins, InstallActivityType.DAY) + _fetch_install_data_and_write_to_dynamo(updated_plugins, InstallActivityType.MONTH) + _fetch_install_data_and_write_to_dynamo(updated_plugins, InstallActivityType.TOTAL) + + +def update_github_activity(start_time: int, end_time: int): + updated_plugins = snowflake_adapter.get_plugins_with_commits_in_window(start_time, end_time) + LOGGER.info(f'Plugins with new github activity count={len(updated_plugins)}') + if len(updated_plugins) == 0: + return + _fetch_github_data_and_write_to_dynamo(updated_plugins, GitHubActivityType.LATEST) + _fetch_github_data_and_write_to_dynamo(updated_plugins, GitHubActivityType.MONTH) + _fetch_github_data_and_write_to_dynamo(updated_plugins, GitHubActivityType.TOTAL) diff --git a/data-workflows/activity/snowflake_adapter.py b/data-workflows/activity/snowflake_adapter.py index b7b1f0b01..c6bf7293d 100644 --- a/data-workflows/activity/snowflake_adapter.py +++ b/data-workflows/activity/snowflake_adapter.py @@ -9,6 +9,7 @@ from snowflake.connector.cursor import SnowflakeCursor from activity.install_activity_model import InstallActivityType +from activity.github_activity_model import GitHubActivityType LOGGER = logging.getLogger() TIMESTAMP_FORMAT = "TO_TIMESTAMP('{0:%Y-%m-%d %H:%M:%S}')" @@ -17,7 +18,7 @@ def get_plugins_with_installs_in_window(start_millis: int, end_millis: int) -> dict[str, datetime]: query = f""" SELECT - LOWER(file_project) AS plugin, DATE_TRUNC('DAY', MIN(timestamp)) AS earliest_timestamp + LOWER(file_project) AS name, DATE_TRUNC('DAY', MIN(timestamp)) AS earliest_timestamp FROM imaging.pypi.labeled_downloads WHERE @@ -25,19 +26,18 @@ def get_plugins_with_installs_in_window(start_millis: int, end_millis: int) -> d AND project_type = 'plugin' AND TO_TIMESTAMP(ingestion_timestamp) > {_format_timestamp(timestamp_millis=start_millis)} AND TO_TIMESTAMP(ingestion_timestamp) <= {_format_timestamp(timestamp_millis=end_millis)} - GROUP BY file_project - ORDER BY file_project + GROUP BY name + ORDER BY name """ - LOGGER.info(f'Querying for plugins added between start_timestamp={start_millis} end_timestamp={end_millis}') - return _mapped_query_results(query, "PYPI", {}, _cursor_to_timestamp_by_plugin_mapper) + return _mapped_query_results(query, "PYPI", {}, _cursor_to_timestamp_by_name_mapper) def get_plugins_install_count_since_timestamp(plugins_by_earliest_ts: dict[str, datetime], install_activity_type: InstallActivityType) -> dict[str, List]: query = f""" SELECT - LOWER(file_project) AS plugin, + LOWER(file_project) AS name, {install_activity_type.get_query_timestamp_projection()} AS timestamp, COUNT(*) AS count FROM @@ -46,13 +46,55 @@ def get_plugins_install_count_since_timestamp(plugins_by_earliest_ts: dict[str, download_type = 'pip' AND project_type = 'plugin' AND ({_generate_subquery_by_type(plugins_by_earliest_ts, install_activity_type)}) - GROUP BY 1, 2 - ORDER BY 1, 2 + GROUP BY name, timestamp + ORDER BY name, timestamp """ LOGGER.info(f'Fetching data for granularity={install_activity_type.name}') return _mapped_query_results(query, 'PYPI', {}, _cursor_to_plugin_activity_mapper) +def get_plugins_with_commits_in_window(start_millis: int, end_millis: int) -> dict[str, datetime]: + query = f""" + SELECT + repo AS name, DATE_TRUNC('DAY', TO_DATE(min(commit_author_date))) AS earliest_timestamp + FROM + imaging.github.commits + WHERE + repo_type = 'plugin' + AND TO_TIMESTAMP(ingestion_time) > {_format_timestamp(timestamp_millis=start_millis)} + AND TO_TIMESTAMP(ingestion_time) <= {_format_timestamp(timestamp_millis=end_millis)} + GROUP BY name + ORDER BY name + """ + LOGGER.info(f'Querying for plugins added between start_timestamp={start_millis} end_timestamp={end_millis}') + return _mapped_query_results(query, 'GITHUB', {}, _cursor_to_timestamp_by_name_mapper) + + +def get_plugins_commit_count_since_timestamp(plugins_by_earliest_ts: dict[str, datetime], + github_activity_type: GitHubActivityType) -> dict[str, List]: + """This method gets the commit data since a specific starting point for each plugin. + If GitHubActivityType.LATEST, fetch the latest commit timestamp, so construct query without commit count constraint + If GitHubActivityType.MONTH, fetch the sum of commits from the beginning of the month of the timestamp specified + for each of the plugin. + If GitHubActivityType.TOTAL, fetch the sum of commits over all time, so construct query without timestamp constraint + :param dict[str, datetime] plugins_by_earliest_ts: plugin name by earliest timestamp of commit record added + :param GitHubActivityType github_activity_type: + """ + if github_activity_type is GitHubActivityType.LATEST: + accumulator_updater = _cursor_to_plugin_github_activity_latest_mapper + elif github_activity_type is GitHubActivityType.MONTH: + accumulator_updater = _cursor_to_plugin_activity_mapper + else: + accumulator_updater = _cursor_to_plugin_github_activity_total_mapper + LOGGER.info(f'Fetching data for granularity={github_activity_type.name}') + return _mapped_query_results( + query=github_activity_type.get_query(plugins_by_earliest_ts), + schema="GITHUB", + accumulator={}, + accumulator_updater=accumulator_updater, + ) + + def _generate_subquery_by_type(plugins_by_timestamp: dict[str, datetime], install_activity_type: InstallActivityType): """ Returns subquery clause generated from the plugins_by_timestamp data based on the InstallActivityType. It is used to @@ -82,30 +124,58 @@ def _format_timestamp(timestamp_millis): return TIMESTAMP_FORMAT.format(datetime.utcfromtimestamp(timestamp_millis / 1000.0)) -def _cursor_to_timestamp_by_plugin_mapper(accumulator: dict[str, datetime], cursor) -> dict[str, datetime]: +def _cursor_to_timestamp_by_name_mapper(accumulator: dict[str, datetime], cursor) -> dict[str, datetime]: """ - Updates the accumulator with data from the cursor. Timestamp is added to the accumulator keyed on plugin name. - The cursor contains the fields plugin_name and earliest_timestamp + Updates the accumulator with data from the cursor. Timestamp is added to the accumulator keyed on name. + The cursor contains the fields name and earliest_timestamp :param dict[str, datetime] accumulator: Accumulator that will be updated with new data :param SnowflakeCursor cursor: :returns: Accumulator after data from cursor has been added """ - for plugin, earliest_timestamp in cursor: - accumulator[plugin] = earliest_timestamp + for name, earliest_timestamp in cursor: + accumulator[name] = earliest_timestamp return accumulator def _cursor_to_plugin_activity_mapper(accumulator: dict[str, List], cursor) -> dict[str, List]: """ Updates the accumulator with data from the cursor. Object with timestamp and count attributes are created from the - cursor record and added to the accumulator keyed on plugin name. - The cursor contains the fields plugin, timestamp, and count + cursor record and added to the accumulator keyed on name. + The cursor contains the fields name, timestamp, and count + :param dict[str, List] accumulator: Accumulator that will be updated with new data + :param SnowflakeCursor cursor: + :returns: Accumulator after data from cursor has been added + """ + for name, timestamp, count in cursor: + accumulator.setdefault(name, []).append({'timestamp': timestamp, 'count': count}) + return accumulator + + +def _cursor_to_plugin_github_activity_latest_mapper(accumulator: dict[str, List], cursor) -> dict[str, List]: + """ + Updates the accumulator with data from the cursor for GitHubActivityType.LATEST. + Object with timestamp is created from the cursor record and added to the accumulator keyed on repo name. + The cursor contains the fields repo name and timestamp + :param dict[str, List] accumulator: Accumulator that will be updated with new data + :param SnowflakeCursor cursor: + :returns: Accumulator after data from cursor has been added + """ + for name, timestamp in cursor: + accumulator.setdefault(name, []).append({'timestamp': timestamp}) + return accumulator + + +def _cursor_to_plugin_github_activity_total_mapper(accumulator: dict[str, List], cursor) -> dict[str, List]: + """ + Updates the accumulator with data from the cursor for GitHubActivityType.TOTAL. + Object with count are created from the cursor record and added to the accumulator keyed on repo name. + The cursor contains the fields repo name and count :param dict[str, List] accumulator: Accumulator that will be updated with new data :param SnowflakeCursor cursor: :returns: Accumulator after data from cursor has been added """ - for plugin, timestamp, count in cursor: - accumulator.setdefault(plugin, []).append({'timestamp': timestamp, 'count': count}) + for name, count in cursor: + accumulator.setdefault(name, []).append({'count': count}) return accumulator diff --git a/data-workflows/activity/tests/test_snowflake_adapter.py b/data-workflows/activity/tests/test_snowflake_adapter.py index 5aa2abde2..0e77ba906 100644 --- a/data-workflows/activity/tests/test_snowflake_adapter.py +++ b/data-workflows/activity/tests/test_snowflake_adapter.py @@ -25,7 +25,7 @@ def to_ts(epoch): def get_plugins_with_installs_in_window_query(): return """ SELECT - LOWER(file_project) AS plugin, DATE_TRUNC('DAY', MIN(timestamp)) AS earliest_timestamp + LOWER(file_project) AS name, DATE_TRUNC('DAY', MIN(timestamp)) AS earliest_timestamp FROM imaging.pypi.labeled_downloads WHERE @@ -33,15 +33,15 @@ def get_plugins_with_installs_in_window_query(): AND project_type = 'plugin' AND TO_TIMESTAMP(ingestion_timestamp) > TO_TIMESTAMP('2021-03-14 07:05:53') AND TO_TIMESTAMP(ingestion_timestamp) <= TO_TIMESTAMP('2022-03-14 07:05:53') - GROUP BY file_project - ORDER BY file_project + GROUP BY name + ORDER BY name """ def get_plugins_install_count_since_timestamp_query(projection, subquery): return f""" SELECT - LOWER(file_project) AS plugin, + LOWER(file_project) AS name, {projection} AS timestamp, COUNT(*) AS count FROM @@ -50,8 +50,8 @@ def get_plugins_install_count_since_timestamp_query(projection, subquery): download_type = 'pip' AND project_type = 'plugin' AND ({subquery}) - GROUP BY 1, 2 - ORDER BY 1, 2 + GROUP BY name, timestamp + ORDER BY name, timestamp """ diff --git a/data-workflows/handler.py b/data-workflows/handler.py index b05349936..606e18cb9 100644 --- a/data-workflows/handler.py +++ b/data-workflows/handler.py @@ -16,6 +16,7 @@ def _update_activity() -> None: last_updated_timestamp = parameter_store_adapter.get_last_updated_timestamp() current_timestamp = utils.utils.get_current_timestamp() activity.processor.update_install_activity(last_updated_timestamp, current_timestamp) + activity.processor.update_github_activity(last_updated_timestamp, current_timestamp) parameter_store_adapter.set_last_updated_timestamp(current_timestamp) diff --git a/data-workflows/plugin/__init__.py b/data-workflows/plugin/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/data-workflows/plugin/helpers.py b/data-workflows/plugin/helpers.py new file mode 100644 index 000000000..225f1e0ec --- /dev/null +++ b/data-workflows/plugin/helpers.py @@ -0,0 +1,31 @@ +import logging +import json +import os +import os.path +from typing import Dict + +import boto3 +from botocore.exceptions import ClientError + +LOGGER = logging.getLogger() + + +def _get_cache(key: str) -> Dict: + try: + bucket = os.getenv('BUCKET') + bucket_path = os.getenv('BUCKET_PATH', '') + s3_client = boto3.client("s3") + return json.loads(s3_client.get_object(Bucket=bucket, Key=os.path.join(bucket_path, key))['Body'].read()) + except ClientError: + logging.info(f"Not cached: {key}") + return None + + +def _get_repo_to_plugin_dict(): + index_json = _get_cache('cache/index.json') + repo_to_plugin_dict = {} + for public_plugin_obj in index_json: + code_repository = public_plugin_obj.get('code_repository') + if code_repository: + repo_to_plugin_dict[code_repository.replace('https://github.com/', '')] = public_plugin_obj['name'].lower() + return repo_to_plugin_dict diff --git a/data-workflows/tests/test_handler.py b/data-workflows/tests/test_handler.py index 34b8dadf8..f4bc83419 100644 --- a/data-workflows/tests/test_handler.py +++ b/data-workflows/tests/test_handler.py @@ -19,10 +19,13 @@ def _setup(self, monkeypatch): monkeypatch.setattr(handler, 'ParameterStoreAdapter', self._parameter_store_adapter_call) self._update_install_activity_call = Mock() monkeypatch.setattr(activity.processor, 'update_install_activity', self._update_install_activity_call) + self._update_github_activity_call = Mock() + monkeypatch.setattr(activity.processor, 'update_github_activity', self._update_github_activity_call) def _verify(self, call_count): assert self._parameter_store_adapter_call.call_count == call_count assert self._update_install_activity_call.call_count == call_count + assert self._update_github_activity_call.call_count == call_count def test_handle_valid_activity_event(self, monkeypatch): parameter_store_adapter = Mock(get_last_updated_timestamp=lambda: LAST_UPDATED_TIMESTAMP) @@ -34,6 +37,7 @@ def test_handle_valid_activity_event(self, monkeypatch): self._verify(1) self._update_install_activity_call.assert_called_once_with(LAST_UPDATED_TIMESTAMP, CURRENT_TIMESTAMP) + self._update_github_activity_call.assert_called_once_with(LAST_UPDATED_TIMESTAMP, CURRENT_TIMESTAMP) parameter_store_adapter.set_last_updated_timestamp.assert_called_once_with(CURRENT_TIMESTAMP) def test_handle_invalid_json(self): diff --git a/data-workflows/utils/utils.py b/data-workflows/utils/utils.py index ea228e210..ca94c6d95 100644 --- a/data-workflows/utils/utils.py +++ b/data-workflows/utils/utils.py @@ -1,6 +1,7 @@ import json import os import time +from datetime import date, datetime, timezone import boto3 @@ -9,6 +10,15 @@ def get_current_timestamp() -> int: return round(time.time() * 1000) +def date_to_utc_timestamp_in_millis(timestamp: date) -> int: + timestamp_datetime = datetime(timestamp.year, timestamp.month, timestamp.day) + return datetime_to_utc_timestamp_in_millis(timestamp_datetime) + + +def datetime_to_utc_timestamp_in_millis(timestamp: datetime) -> int: + return int(timestamp.replace(tzinfo=timezone.utc).timestamp() * 1000) + + LAST_UPDATED_TIMESTAMP_KEY = 'last_activity_fetched_timestamp'