diff --git a/augur/application/cli/backend.py b/augur/application/cli/backend.py index 27ffb2e8f6..ae94b1f800 100644 --- a/augur/application/cli/backend.py +++ b/augur/application/cli/backend.py @@ -16,8 +16,7 @@ import uuid import traceback from urllib.parse import urlparse -from sqlalchemy import update - +from datetime import datetime from augur import instance_id from augur.tasks.start_tasks import augur_collection_monitor, CollectionState @@ -187,13 +186,15 @@ def augur_stop(signal, logger): """ augur_processes = get_augur_processes() - _broadcast_signal_to_processes(augur_processes, broadcast_signal=signal, given_logger=logger) - # if celery is running, run the cleanup function process_names = [process.name() for process in augur_processes] + + _broadcast_signal_to_processes(augur_processes, broadcast_signal=signal, given_logger=logger) + if "celery" in process_names: cleanup_after_collection_halt(logger) + def cleanup_after_collection_halt(logger): clear_redis_caches() connection_string = "" @@ -441,6 +442,8 @@ def order_repos(repos): return repo_git_urls + + # def initialize_components(augur_app, disable_housekeeper): # master = None # manager = None diff --git a/augur/application/cli/collection.py b/augur/application/cli/collection.py index ad7533eebc..e29a04fe58 100644 --- a/augur/application/cli/collection.py +++ b/augur/application/cli/collection.py @@ -36,8 +36,6 @@ logger = AugurLogger("augur", reset_logfiles=True).get_logger() -def get_page_count() - def check_collection(owner, repo, key_manager, session): diff --git a/augur/application/cli/db.py b/augur/application/cli/db.py index 382d5042a6..f09aaabbd2 100644 --- a/augur/application/cli/db.py +++ b/augur/application/cli/db.py @@ -22,6 +22,9 @@ from augur.application.db.session import DatabaseSession from augur.application.logs import AugurLogger from augur.application.db.engine import DatabaseEngine +from sqlalchemy import update +from datetime import datetime +from augur.application.db.models import Repo logger = logging.getLogger(__name__) @@ -373,7 +376,6 @@ def init_database( def test_db_connection(): pass - # TODO: Fix this function def run_psql_command_in_database(target_type, target): if target_type not in ["-f", "-c"]: @@ -456,3 +458,19 @@ def check_pgpass_credentials(config): pgpass_file.write(credentials_string + "\n") else: print("Credentials found in $HOME/.pgpass") + + +#NOTE: For some reason when I try to add function decorators to this function +#click thinks it's an argument and tries to parse it but it errors since a function +#isn't an iterable. +@cli.command("reset-repo-age") +def reset_repo_age(): + + with DatabaseSession(logger) as session: + update_query = ( + update(Repo) + .values(repo_added=datetime.now()) + ) + + session.execute(update_query) + session.commit() diff --git a/augur/application/db/models/augur_operations.py b/augur/application/db/models/augur_operations.py index ca5845e1f6..17035c856a 100644 --- a/augur/application/db/models/augur_operations.py +++ b/augur/application/db/models/augur_operations.py @@ -10,6 +10,7 @@ import logging import secrets +import traceback import importlib from augur.application.db.models import Repo, RepoGroup @@ -17,6 +18,7 @@ from augur.application.db.models.base import Base + FRONTEND_REPO_GROUP_NAME = "Frontend Repos" logger = logging.getLogger(__name__) @@ -1009,14 +1011,48 @@ class CollectionStatus(Base): facade_status = Column(String,nullable=False, server_default=text("'Pending'")) facade_data_last_collected = Column(TIMESTAMP) facade_task_id = Column(String) + + core_weight = Column(BigInteger) + facade_weight = Column(BigInteger) + secondary_weight = Column(BigInteger) + + issue_pr_sum = Column(BigInteger) + commit_sum = Column(BigInteger) repo = relationship("Repo", back_populates="collection_status") @staticmethod def insert(session, repo_id): + from augur.tasks.github.util.util import get_repo_weight_by_issue + from augur.tasks.util.worker_util import calculate_date_weight_from_timestamps + + repo = Repo.get_by_id(session, repo_id) + repo_git = repo.repo_git collection_status_unique = ["repo_id"] - result = session.insert_data({"repo_id": repo_id}, CollectionStatus, collection_status_unique, on_conflict_update=False) + + try: + pr_issue_count = get_repo_weight_by_issue(session.logger, repo_git) + #session.logger.info(f"date weight: {calculate_date_weight_from_timestamps(repo.repo_added, None)}") + github_weight = pr_issue_count - calculate_date_weight_from_timestamps(repo.repo_added, None) + except Exception as e: + pr_issue_count = None + github_weight = None + session.logger.error( + ''.join(traceback.format_exception(None, e, e.__traceback__))) + + + record = { + "repo_id": repo_id, + "issue_pr_sum": pr_issue_count, + "core_weight": github_weight, + "secondary_weight": github_weight + } + + result = session.insert_data(record, CollectionStatus, collection_status_unique, on_conflict_update=False) + + session.logger.info(f"Trying to insert repo \n issue and pr sum: {record['issue_pr_sum']}") + if not result: return False diff --git a/augur/application/schema/alembic/versions/16_add_weight_data_to_collection_status_to_.py b/augur/application/schema/alembic/versions/16_add_weight_data_to_collection_status_to_.py new file mode 100644 index 0000000000..9126606cb2 --- /dev/null +++ b/augur/application/schema/alembic/versions/16_add_weight_data_to_collection_status_to_.py @@ -0,0 +1,38 @@ +"""Add weight data to collection status to determine collection order of repos + +Revision ID: 16 +Revises: 15 +Create Date: 2023-04-10 18:28:12.460522 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '16' +down_revision = '15' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.add_column('collection_status', sa.Column('core_weight', sa.BigInteger()), schema='augur_operations') + op.add_column('collection_status', sa.Column('facade_weight', sa.BigInteger()), schema='augur_operations') + op.add_column('collection_status', sa.Column('secondary_weight', sa.BigInteger()), schema='augur_operations') + + op.add_column('collection_status', sa.Column('issue_pr_sum', sa.BigInteger()), schema='augur_operations') + op.add_column('collection_status', sa.Column('commit_sum', sa.BigInteger()), schema='augur_operations') + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column('collection_status', 'facade_weight', schema='augur_operations') + op.drop_column('collection_status', 'core_weight', schema='augur_operations') + op.drop_column('collection_status', 'secondary_weight', schema='augur_operations') + + op.drop_column('collection_status', 'issue_pr_sum', schema='augur_operations') + op.drop_column('collection_status', 'commit_sum', schema='augur_operations') + # ### end Alembic commands ### diff --git a/augur/application/schema/alembic/versions/5_add_collection_status_table.py b/augur/application/schema/alembic/versions/5_add_collection_status_table.py index b233049368..bc97278a19 100644 --- a/augur/application/schema/alembic/versions/5_add_collection_status_table.py +++ b/augur/application/schema/alembic/versions/5_add_collection_status_table.py @@ -36,12 +36,7 @@ def upgrade(): # add collection status for any existing repos conn = op.get_bind() - repos = conn.execute(text("""SELECT repo_id from repo""")).fetchall() - - for repo in repos: - repo_id = repo[0] - conn.execute(text(f"""INSERT INTO "augur_operations"."collection_status" ("repo_id") VALUES ({repo_id});""")) - + conn.execute(text(""" UPDATE augur_operations.config SET value = '600' diff --git a/augur/application/schema/alembic/versions/legacy/80.3-sample-data.sql b/augur/application/schema/alembic/versions/legacy/80.3-sample-data.sql index a8a60c8cdc..26a6451ba2 100644 --- a/augur/application/schema/alembic/versions/legacy/80.3-sample-data.sql +++ b/augur/application/schema/alembic/versions/legacy/80.3-sample-data.sql @@ -1072,13 +1072,13 @@ INSERT INTO "augur_data"."contributor_affiliations"("ca_id", "ca_domain", "tool_ INSERT INTO "augur_data"."repo_groups"("repo_group_id", "rg_name", "rg_description", "rg_website", "rg_recache", "rg_last_modified", "rg_type", "tool_source", "tool_version", "data_source", "data_collection_date") VALUES (10, 'Default Repo Group', 'The default repo group created by the schema generation script', '', 0, '2021-06-03 15:55:20', 'GitHub Organization', 'load', 'one', 'git', '2019-06-05 13:36:25'); -INSERT INTO "augur_data"."repo" ("repo_id", "repo_group_id", "repo_git", "repo_path", "repo_name", "repo_added", "repo_status", "repo_type", "url", "owner_id", "description", "primary_language", "created_at", "forked_from", "updated_at", "tool_source", "tool_version", "data_source", "data_collection_date", "repo_archived", "repo_archived_date_collected") VALUES (25452, 10, 'https://github.com/chaoss/whitepaper', 'github.com/chaoss/', 'whitepaper', '2021-04-17 21:40:42', 'Complete', '', NULL, NULL, NULL, NULL, NULL, 'Parent not available', NULL, 'CLI', '1.0', 'Git', '2021-04-17 21:40:42', 0, NULL); -INSERT INTO "augur_data"."repo" ("repo_id", "repo_group_id", "repo_git", "repo_path", "repo_name", "repo_added", "repo_status", "repo_type", "url", "owner_id", "description", "primary_language", "created_at", "forked_from", "updated_at", "tool_source", "tool_version", "data_source", "data_collection_date", "repo_archived", "repo_archived_date_collected") VALUES (24441, 10, 'https://github.com/operate-first/operate-first-twitter', 'github.com/operate-first/', 'operate-first-twitter', '2021-08-25 16:47:47', 'Complete', '', NULL, NULL, NULL, NULL, NULL, 'Parent not available', NULL, 'CLI', '1.0', 'Git', '2021-08-25 16:47:47', 0, NULL); -INSERT INTO "augur_data"."repo" ("repo_id", "repo_group_id", "repo_git", "repo_path", "repo_name", "repo_added", "repo_status", "repo_type", "url", "owner_id", "description", "primary_language", "created_at", "forked_from", "updated_at", "tool_source", "tool_version", "data_source", "data_collection_date", "repo_archived", "repo_archived_date_collected") VALUES (24442, 10, 'https://github.com/operate-first/blueprint', 'github.com/operate-first/', 'blueprint', '2021-08-25 16:47:47', 'Complete', '', NULL, NULL, NULL, NULL, NULL, 'Parent not available', NULL, 'CLI', '1.0', 'Git', '2021-08-25 16:47:47', 0, NULL); -INSERT INTO "augur_data"."repo" ("repo_id", "repo_group_id", "repo_git", "repo_path", "repo_name", "repo_added", "repo_status", "repo_type", "url", "owner_id", "description", "primary_language", "created_at", "forked_from", "updated_at", "tool_source", "tool_version", "data_source", "data_collection_date", "repo_archived", "repo_archived_date_collected") VALUES (25445, 10, 'https://github.com/chaoss/grimoirelab-perceval-opnfv', 'github.com/chaoss/', 'grimoirelab-perceval-opnfv', '2020-04-17 21:40:39', 'Complete', '', NULL, NULL, NULL, NULL, NULL, 'Parent not available', NULL, 'CLI', '1.0', 'Git', '2021-04-17 21:40:39', 0, NULL); -INSERT INTO "augur_data"."repo" ("repo_id", "repo_group_id", "repo_git", "repo_path", "repo_name", "repo_added", "repo_status", "repo_type", "url", "owner_id", "description", "primary_language", "created_at", "forked_from", "updated_at", "tool_source", "tool_version", "data_source", "data_collection_date", "repo_archived", "repo_archived_date_collected") VALUES (1, 1, 'https://github.com/chaoss/augur', 'github.com/chaoss/', 'augur', '2021-08-10 14:28:44', 'Complete', '', NULL, NULL, NULL, NULL, NULL, 'Parent not available', NULL, 'data load', 'one', 'git', '2021-06-05 18:41:14', 0, NULL); -INSERT INTO "augur_data"."repo" ("repo_id", "repo_group_id", "repo_git", "repo_path", "repo_name", "repo_added", "repo_status", "repo_type", "url", "owner_id", "description", "primary_language", "created_at", "forked_from", "updated_at", "tool_source", "tool_version", "data_source", "data_collection_date", "repo_archived", "repo_archived_date_collected") VALUES (25430, 10, 'https://github.com/SociallyCompute/update-test', 'github.com/SociallyCompute/', 'update-test', '2021-10-07 08:50:13', 'Complete', '', NULL, NULL, NULL, NULL, NULL, 'Parent not available', NULL, NULL, NULL, NULL, NULL, 0, NULL); -INSERT INTO "augur_data"."repo" ("repo_id", "repo_group_id", "repo_git", "repo_path", "repo_name", "repo_added", "repo_status", "repo_type", "url", "owner_id", "description", "primary_language", "created_at", "forked_from", "updated_at", "tool_source", "tool_version", "data_source", "data_collection_date", "repo_archived", "repo_archived_date_collected") VALUES (25450, 10, 'https://github.com/chaoss/grimoirelab-hatstall', 'github.com/chaoss/', 'grimoirelab-hatstall', '2021-04-17 21:40:42', 'Complete', '', NULL, NULL, NULL, NULL, NULL, 'Parent not available', NULL, 'CLI', '1.0', 'Git', '2021-04-17 21:40:42', 0, NULL); +INSERT INTO "augur_data"."repo" ("repo_id", "repo_group_id", "repo_git", "repo_path", "repo_name", "repo_status", "repo_type", "url", "owner_id", "description", "primary_language", "created_at", "forked_from", "updated_at", "tool_source", "tool_version", "data_source", "data_collection_date", "repo_archived", "repo_archived_date_collected") VALUES (25452, 10, 'https://github.com/chaoss/whitepaper', 'github.com/chaoss/', 'whitepaper', 'Complete', '', NULL, NULL, NULL, NULL, NULL, 'Parent not available', NULL, 'CLI', '1.0', 'Git', '2021-04-17 21:40:42', 0, NULL); +INSERT INTO "augur_data"."repo" ("repo_id", "repo_group_id", "repo_git", "repo_path", "repo_name", "repo_status", "repo_type", "url", "owner_id", "description", "primary_language", "created_at", "forked_from", "updated_at", "tool_source", "tool_version", "data_source", "data_collection_date", "repo_archived", "repo_archived_date_collected") VALUES (24441, 10, 'https://github.com/operate-first/operate-first-twitter', 'github.com/operate-first/', 'operate-first-twitter', 'Complete', '', NULL, NULL, NULL, NULL, NULL, 'Parent not available', NULL, 'CLI', '1.0', 'Git', '2021-08-25 16:47:47', 0, NULL); +INSERT INTO "augur_data"."repo" ("repo_id", "repo_group_id", "repo_git", "repo_path", "repo_name", "repo_status", "repo_type", "url", "owner_id", "description", "primary_language", "created_at", "forked_from", "updated_at", "tool_source", "tool_version", "data_source", "data_collection_date", "repo_archived", "repo_archived_date_collected") VALUES (24442, 10, 'https://github.com/operate-first/blueprint', 'github.com/operate-first/', 'blueprint', 'Complete', '', NULL, NULL, NULL, NULL, NULL, 'Parent not available', NULL, 'CLI', '1.0', 'Git', '2021-08-25 16:47:47', 0, NULL); +INSERT INTO "augur_data"."repo" ("repo_id", "repo_group_id", "repo_git", "repo_path", "repo_name", "repo_status", "repo_type", "url", "owner_id", "description", "primary_language", "created_at", "forked_from", "updated_at", "tool_source", "tool_version", "data_source", "data_collection_date", "repo_archived", "repo_archived_date_collected") VALUES (25445, 10, 'https://github.com/chaoss/grimoirelab-perceval-opnfv', 'github.com/chaoss/', 'grimoirelab-perceval-opnfv', 'Complete', '', NULL, NULL, NULL, NULL, NULL, 'Parent not available', NULL, 'CLI', '1.0', 'Git', '2021-04-17 21:40:39', 0, NULL); +INSERT INTO "augur_data"."repo" ("repo_id", "repo_group_id", "repo_git", "repo_path", "repo_name", "repo_status", "repo_type", "url", "owner_id", "description", "primary_language", "created_at", "forked_from", "updated_at", "tool_source", "tool_version", "data_source", "data_collection_date", "repo_archived", "repo_archived_date_collected") VALUES (1, 1, 'https://github.com/chaoss/augur', 'github.com/chaoss/', 'augur', 'Complete', '', NULL, NULL, NULL, NULL, NULL, 'Parent not available', NULL, 'data load', 'one', 'git', '2021-06-05 18:41:14', 0, NULL); +INSERT INTO "augur_data"."repo" ("repo_id", "repo_group_id", "repo_git", "repo_path", "repo_name", "repo_status", "repo_type", "url", "owner_id", "description", "primary_language", "created_at", "forked_from", "updated_at", "tool_source", "tool_version", "data_source", "data_collection_date", "repo_archived", "repo_archived_date_collected") VALUES (25430, 10, 'https://github.com/SociallyCompute/update-test', 'github.com/SociallyCompute/', 'update-test', 'Complete', '', NULL, NULL, NULL, NULL, NULL, 'Parent not available', NULL, NULL, NULL, NULL, NULL, 0, NULL); +INSERT INTO "augur_data"."repo" ("repo_id", "repo_group_id", "repo_git", "repo_path", "repo_name", "repo_status", "repo_type", "url", "owner_id", "description", "primary_language", "created_at", "forked_from", "updated_at", "tool_source", "tool_version", "data_source", "data_collection_date", "repo_archived", "repo_archived_date_collected") VALUES (25450, 10, 'https://github.com/chaoss/grimoirelab-hatstall', 'github.com/chaoss/', 'grimoirelab-hatstall', 'Complete', '', NULL, NULL, NULL, NULL, NULL, 'Parent not available', NULL, 'CLI', '1.0', 'Git', '2021-04-17 21:40:42', 0, NULL); UPDATE "augur_data"."repo" set repo_name = NULL, repo_path = NULL, repo_status = 'New'; diff --git a/augur/tasks/git/facade_tasks.py b/augur/tasks/git/facade_tasks.py index a412554963..d9a556e9c2 100644 --- a/augur/tasks/git/facade_tasks.py +++ b/augur/tasks/git/facade_tasks.py @@ -24,10 +24,13 @@ from datetime import timedelta import sqlalchemy as s +from sqlalchemy import or_, and_, update from augur.tasks.git.util.facade_worker.facade_worker.facade02utilitymethods import update_repo_log, trim_commit, store_working_author, trim_author from augur.tasks.git.util.facade_worker.facade_worker.facade02utilitymethods import get_absolute_repo_path, get_parent_commits_set, get_existing_commits_set from augur.tasks.git.util.facade_worker.facade_worker.facade03analyzecommit import analyze_commit +from augur.tasks.git.util.facade_worker.facade_worker.facade02utilitymethods import get_facade_weight_time_factor, get_repo_commit_count + from augur.tasks.github.facade_github.tasks import * from augur.tasks.util.worker_util import create_grouped_task_load @@ -347,6 +350,31 @@ def git_repo_initialize_facade_task(repo_git): # with FacadeSession(logger) as session: # check_for_repo_updates(session, repo_git) +@celery.task +def git_update_commit_count_weight(repo_git): + + from augur.tasks.init.celery_app import engine + logger = logging.getLogger(git_update_commit_count_weight.__name__) + + with FacadeSession(logger) as session: + commit_count = get_repo_commit_count(session, repo_git) + date_factor = get_facade_weight_time_factor(session, repo_git) + + weight = commit_count - date_factor + logger.info(f"Repo {repo_git} has a weight of {weight} and a commit count of {commit_count}") + + with DatabaseSession(logger,engine=engine) as session: + repo = Repo.get_by_repo_git(session, repo_git) + + update_query = ( + update(CollectionStatus) + .where(CollectionStatus.repo_id == repo.repo_id) + .values(facade_weight=weight,commit_sum=commit_count) + ) + + session.execute(update_query) + session.commit() + @celery.task def git_repo_updates_facade_task(repo_git): @@ -460,7 +488,9 @@ def facade_clone_update_phase(repo_git): if not limited_run or (limited_run and pull_repos): facade_sequence.append(git_repo_updates_facade_task.si(repo_git)) - + + facade_sequence.append(git_update_commit_count_weight.si(repo_git)) + return chain(*facade_sequence) @@ -507,7 +537,8 @@ def facade_phase(repo_git): group( chain(*facade_core_collection), process_dependency_metrics.si(repo_git), - process_libyear_dependency_metrics.si(repo_git) + process_libyear_dependency_metrics.si(repo_git), + git_update_commit_count_weight.si(repo_git) ) ) diff --git a/augur/tasks/git/util/facade_worker/facade_worker/facade02utilitymethods.py b/augur/tasks/git/util/facade_worker/facade_worker/facade02utilitymethods.py index 9d35faeaed..a9c48f42ab 100644 --- a/augur/tasks/git/util/facade_worker/facade_worker/facade02utilitymethods.py +++ b/augur/tasks/git/util/facade_worker/facade_worker/facade02utilitymethods.py @@ -41,6 +41,7 @@ from .facade01config import get_database_args_from_env from augur.application.db.models.augur_data import * from .facade01config import FacadeSession as FacadeSession +from augur.tasks.util.worker_util import calculate_date_weight_from_timestamps #from augur.tasks.git.util.facade_worker.facade def update_repo_log(session, repos_id,status): @@ -142,18 +143,37 @@ def get_existing_commits_set(session, repo_id): return set(existing_commits) -def date_weight_factor(days_since_last_collection): - return (days_since_last_collection ** 3) / 25 -def get_repo_weight_by_commit(logger,repo_git,days_since_last_collection): - with FacadeSession(logger) as session: - repo = Repo.get_by_repo_git(session, repo_git) - absolute_path = get_absolute_repo_path(session.repo_base_directory, repo.repo_group_id, repo.repo_path, repo.repo_name) - repo_loc = (f"{absolute_path}/.git") +def get_repo_commit_count(session,repo_git): + + repo = Repo.get_by_repo_git(session, repo_git) + + + absolute_path = get_absolute_repo_path(session.repo_base_directory, repo.repo_group_id, repo.repo_path, repo.repo_name) + repo_loc = (f"{absolute_path}/.git") + + #git --git-dir <.git directory> rev-list --count HEAD + check_commit_count_cmd = check_output(["git","--git-dir",repo_loc, "rev-list", "--count", "HEAD"]) + + commit_count = int(check_commit_count_cmd) - #git --git-dir <.git directory> rev-list --count HEAD - check_commit_count_cmd = check_output(["git","--git-dir",repo_loc, "rev-list", "--count", "HEAD"]) + return commit_count - commit_count = int(check_commit_count_cmd) +def get_facade_weight_time_factor(session,repo_git): + repo = Repo.get_by_repo_git(session, repo_git) - return commit_count - date_weight_factor(days_since_last_collection) \ No newline at end of file + try: + status = repo.collection_status[0] + time_factor = calculate_date_weight_from_timestamps(repo.repo_added, status.facade_data_last_collected) + except IndexError: + time_factor = calculate_date_weight_from_timestamps(repo.repo_added, None) + + #Adjust for commits. + time_factor *= 1.2 + + return time_factor + + +def get_repo_weight_by_commit(logger,repo_git): + with FacadeSession(logger) as session: + return get_repo_commit_count(session, repo_git) - get_facade_weight_time_factor(session, repo_git) \ No newline at end of file diff --git a/augur/tasks/github/issues/tasks.py b/augur/tasks/github/issues/tasks.py index d5ce19f68d..81fa3a341a 100644 --- a/augur/tasks/github/issues/tasks.py +++ b/augur/tasks/github/issues/tasks.py @@ -21,7 +21,7 @@ development = get_development_flag() @celery.task(base=AugurCoreRepoCollectionTask) -def collect_issues(repo_git : str) -> None: +def collect_issues(repo_git : str) -> int: logger = logging.getLogger(collect_issues.__name__) @@ -39,14 +39,18 @@ def collect_issues(repo_git : str) -> None: issue_data = retrieve_all_issue_data(repo_git, logger, manifest.key_auth) + if issue_data: - + total_issues = len(issue_data) process_issues(issue_data, f"{owner}/{repo}: Issue task", repo_id, logger, augur_db) + return total_issues else: logger.info(f"{owner}/{repo} has no issues") + return 0 except Exception as e: logger.error(f"Could not collect issues for repo {repo_git}\n Reason: {e} \n Traceback: {''.join(traceback.format_exception(None, e, e.__traceback__))}") + return -1 diff --git a/augur/tasks/github/pull_requests/tasks.py b/augur/tasks/github/pull_requests/tasks.py index 0fef125408..a5ba6db7c4 100644 --- a/augur/tasks/github/pull_requests/tasks.py +++ b/augur/tasks/github/pull_requests/tasks.py @@ -20,7 +20,7 @@ @celery.task(base=AugurCoreRepoCollectionTask) -def collect_pull_requests(repo_git: str) -> None: +def collect_pull_requests(repo_git: str) -> int: logger = logging.getLogger(collect_pull_requests.__name__) @@ -36,8 +36,11 @@ def collect_pull_requests(repo_git: str) -> None: if pr_data: process_pull_requests(pr_data, f"{owner}/{repo}: Pr task", repo_id, logger, augur_db) + + return len(pr_data) else: logger.info(f"{owner}/{repo} has no pull requests") + return 0 # TODO: Rename pull_request_reviewers table to pull_request_requested_reviewers diff --git a/augur/tasks/github/util/gh_graphql_entities.py b/augur/tasks/github/util/gh_graphql_entities.py index 626c8ac51e..e4f718af68 100644 --- a/augur/tasks/github/util/gh_graphql_entities.py +++ b/augur/tasks/github/util/gh_graphql_entities.py @@ -200,10 +200,14 @@ def request_graphql_dict(self,variables={},timeout_wait=10): if type(response_data) == dict: err = process_dict_response(self.logger, result, response_data) - + + if err == GithubApiResult.REPO_NOT_FOUND: + self.logger.error(f"Repo not found! \n response_data: {response_data}") + return None + if err and err != GithubApiResult.SUCCESS: attempts += 1 - self.logger.info(f"err: {err}") + self.logger.info(f"err: {err} \n response_data: {response_data}") continue success = True @@ -227,6 +231,7 @@ def request_graphql_dict(self,variables={},timeout_wait=10): #If we get an error message that's not None if err and err != GithubApiResult.SUCCESS: + attempts += 1 continue success = True diff --git a/augur/tasks/github/util/github_paginator.py b/augur/tasks/github/util/github_paginator.py index 26b3c1c795..1c252d8ce6 100644 --- a/augur/tasks/github/util/github_paginator.py +++ b/augur/tasks/github/util/github_paginator.py @@ -134,6 +134,12 @@ def process_dict_response(logger: logging.Logger, response: httpx.Response, page logger.info(f"\n\n\nAPI rate limit exceeded. Sleeping until the key resets ({key_reset_time} seconds)") time.sleep(key_reset_time) return GithubApiResult.RATE_LIMIT_EXCEEDED + + err_type = error.get('type') + + if err_type and 'NOT_FOUND' in err_type: + return GithubApiResult.REPO_NOT_FOUND + return GithubApiResult.NEW_RESULT diff --git a/augur/tasks/github/util/util.py b/augur/tasks/github/util/util.py index 37ee12dd5e..fbb23dd6e8 100644 --- a/augur/tasks/github/util/util.py +++ b/augur/tasks/github/util/util.py @@ -4,6 +4,10 @@ import logging import json import httpx +from augur.tasks.github.util.github_task_session import GithubTaskManifest +from augur.application.db.session import DatabaseSession +from augur.application.db.models import Repo +from augur.tasks.util.worker_util import calculate_date_weight_from_timestamps # This function adds a key value pair to a list of dicts and returns the modified list of dicts back @@ -53,3 +57,34 @@ def parse_json_response(logger: logging.Logger, response: httpx.Response) -> dic logger.warning(f"invalid return from GitHub. Response was: {response.text}. Exception: {e}") return json.loads(json.dumps(response.text)) +def get_repo_weight_by_issue(logger,repo_git): + from augur.tasks.github.util.gh_graphql_entities import GitHubRepo as GitHubRepoGraphql + + owner,name = get_owner_repo(repo_git) + + with GithubTaskManifest(logger) as manifest: + repo_graphql = GitHubRepoGraphql(logger, manifest.key_auth, owner, name) + number_of_issues_and_prs = len(repo_graphql.get_issues_collection()) + len(repo_graphql.get_pull_requests_collection()) + + return number_of_issues_and_prs + +#Get the weight for each repo for the core collection hook +def get_repo_weight_core(logger,repo_git): + from augur.tasks.init.celery_app import engine + + with DatabaseSession(logger,engine) as session: + repo = Repo.get_by_repo_git(session, repo_git) + if not repo: + raise Exception(f"Task with repo_git of {repo_git} but could not be found in Repo table") + + #try to get the collection status if it exists at this point + try: + status = repo.collection_status[0] + time_factor = calculate_date_weight_from_timestamps(repo.repo_added,status.core_data_last_collected) + except IndexError: + time_factor = calculate_date_weight_from_timestamps(repo.repo_added,None) + + + #Don't go below zero. + return max(0,get_repo_weight_by_issue(logger, repo_git) - time_factor) + diff --git a/augur/tasks/init/celery_app.py b/augur/tasks/init/celery_app.py index 59f6f30084..9cea8460c1 100644 --- a/augur/tasks/init/celery_app.py +++ b/augur/tasks/init/celery_app.py @@ -187,7 +187,7 @@ def setup_periodic_tasks(sender, **kwargs): The tasks so that they are grouped by the module they are defined in """ from celery.schedules import crontab - from augur.tasks.start_tasks import augur_collection_monitor + from augur.tasks.start_tasks import augur_collection_monitor, augur_collection_update_weights from augur.tasks.start_tasks import non_repo_domain_tasks from augur.tasks.db.refresh_materialized_views import refresh_materialized_views @@ -208,7 +208,9 @@ def setup_periodic_tasks(sender, **kwargs): logger.info(f"Scheduling refresh materialized view every night at 1am CDT") sender.add_periodic_task(crontab(hour=1, minute=0), refresh_materialized_views.s()) - + + logger.info(f"Scheduling update of collection weights on midnight on even numbered days.") + sender.add_periodic_task(crontab(0, 0,day_of_month='2-30/2'),augur_collection_update_weights.s()) @after_setup_logger.connect diff --git a/augur/tasks/start_tasks.py b/augur/tasks/start_tasks.py index 76f2be61a1..3e999c5c47 100644 --- a/augur/tasks/start_tasks.py +++ b/augur/tasks/start_tasks.py @@ -10,7 +10,7 @@ #from celery.result import AsyncResult from celery import signature from celery import group, chain, chord, signature -from sqlalchemy import or_, and_ +from sqlalchemy import or_, and_, update from augur.tasks.github import * @@ -33,6 +33,7 @@ from augur.tasks.util.redis_list import RedisList from augur.application.db.models import CollectionStatus, Repo from augur.tasks.util.collection_util import * +from augur.tasks.git.util.facade_worker.facade_worker.facade02utilitymethods import get_facade_weight_time_factor CELERY_GROUP_TYPE = type(group()) CELERY_CHAIN_TYPE = type(chain()) @@ -87,7 +88,7 @@ def primary_repo_collect_phase(repo_git): repo_task_group = group( repo_info_task, - chain(primary_repo_jobs,secondary_repo_jobs,process_contributors.si()), + chain(primary_repo_jobs | core_task_update_weight_util.s(repo_git=repo_git),secondary_repo_jobs,process_contributors.si()), #facade_phase(logger,repo_git), collect_releases.si(repo_git), @@ -158,9 +159,8 @@ def non_repo_domain_tasks(): """ The below functions define augur's collection hooks. Each collection hook schedules tasks for a number of repos - that are either new or older than a set amount of days. """ -def start_primary_collection(session,max_repo,days): +def start_primary_collection(session,max_repo): #Get list of enabled phases enabled_phase_names = get_enabled_phase_names_from_config(session.logger, session) @@ -183,30 +183,36 @@ def core_task_success_util_gen(repo_git): active_repo_count = len(session.query(CollectionStatus).filter(CollectionStatus.core_status == CollectionState.COLLECTING.value).all()) - cutoff_date = datetime.datetime.now() - datetime.timedelta(days=days) not_erroed = CollectionStatus.core_status != str(CollectionState.ERROR.value) not_collecting = CollectionStatus.core_status != str(CollectionState.COLLECTING.value) never_collected = CollectionStatus.core_data_last_collected == None - old_collection = CollectionStatus.core_data_last_collected <= cutoff_date limit = max_repo-active_repo_count + core_order = CollectionStatus.core_weight + #Get repos for primary collection hook - repo_git_identifiers = get_collection_status_repo_git_from_filter(session,and_(not_erroed, not_collecting, or_(never_collected, old_collection)),limit) + collection_size = start_block_of_repos( + session.logger, session, + and_(not_erroed, not_collecting,never_collected), + limit, primary_enabled_phases,sort=core_order + ) - session.logger.info(f"Starting primary collection on {len(repo_git_identifiers)} repos") - if len(repo_git_identifiers) == 0: - return - session.logger.info(f"Primary collection starting for: {tuple(repo_git_identifiers)}") + #Now start old repos if there is space to do so. + limit -= collection_size - primary_augur_collection = AugurTaskRoutine(session,repos=repo_git_identifiers,collection_phases=primary_enabled_phases) + collected_before = CollectionStatus.core_data_last_collected != None - #Start data collection and update the collectionStatus with the task_ids - primary_augur_collection.start_data_collection() + if limit > 0: + start_block_of_repos( + session.logger, session, + and_(not_erroed, not_collecting,collected_before), + limit, primary_enabled_phases,sort=core_order + ) -def start_secondary_collection(session,max_repo,days): +def start_secondary_collection(session,max_repo): #Get list of enabled phases enabled_phase_names = get_enabled_phase_names_from_config(session.logger, session) @@ -227,26 +233,35 @@ def secondary_task_success_util_gen(repo_git): active_repo_count = len(session.query(CollectionStatus).filter(CollectionStatus.secondary_status == CollectionState.COLLECTING.value).all()) - cutoff_date = datetime.datetime.now() - datetime.timedelta(days=days) not_erroed = CollectionStatus.secondary_status != str(CollectionState.ERROR.value) not_collecting = CollectionStatus.secondary_status != str(CollectionState.COLLECTING.value) - never_collected = CollectionStatus.secondary_data_last_collected == None - old_collection = CollectionStatus.secondary_data_last_collected <= cutoff_date primary_collected = CollectionStatus.core_status == str(CollectionState.SUCCESS.value) + never_collected = CollectionStatus.secondary_data_last_collected == None limit = max_repo-active_repo_count - repo_git_identifiers = get_collection_status_repo_git_from_filter(session,and_(primary_collected,not_erroed, not_collecting, or_(never_collected, old_collection)),limit) + secondary_order = CollectionStatus.secondary_weight - session.logger.info(f"Starting secondary collection on {len(repo_git_identifiers)} repos") - if len(repo_git_identifiers) == 0: - return + collection_size = start_block_of_repos( + session.logger, session, + and_(primary_collected,not_erroed, not_collecting,never_collected), + limit, secondary_enabled_phases, + hook="secondary", + sort=secondary_order + ) - session.logger.info(f"Secondary collection starting for: {tuple(repo_git_identifiers)}") + limit -= collection_size + collected_before = CollectionStatus.secondary_data_last_collected != None - secondary_augur_collection = AugurTaskRoutine(session,repos=repo_git_identifiers,collection_phases=secondary_enabled_phases,collection_hook="secondary") + if limit > 0: + start_block_of_repos( + session.logger, session, + and_(primary_collected,not_erroed, not_collecting,collected_before), + limit, secondary_enabled_phases, + hook="secondary", + sort=secondary_order + ) - secondary_augur_collection.start_data_collection() def start_facade_clone_update(session,max_repo,days): facade_enabled_phases = [] @@ -285,7 +300,7 @@ def facade_clone_update_success_util_gen(repo_git): facade_augur_collection.start_data_collection() -def start_facade_collection(session,max_repo,days): +def start_facade_collection(session,max_repo): #Deal with secondary collection facade_enabled_phases = [] @@ -299,28 +314,38 @@ def facade_task_success_util_gen(repo_git): active_repo_count = len(session.query(CollectionStatus).filter(CollectionStatus.facade_status == CollectionState.COLLECTING.value).all()) - cutoff_date = datetime.datetime.now() - datetime.timedelta(days=days) + #cutoff_date = datetime.datetime.now() - datetime.timedelta(days=days) not_erroed = CollectionStatus.facade_status != str(CollectionState.ERROR.value) not_pending = CollectionStatus.facade_status != str(CollectionState.PENDING.value) not_failed_clone = CollectionStatus.facade_status != str(CollectionState.FAILED_CLONE.value) not_collecting = CollectionStatus.facade_status != str(CollectionState.COLLECTING.value) not_initializing = CollectionStatus.facade_status != str(CollectionState.INITIALIZING.value) never_collected = CollectionStatus.facade_data_last_collected == None - old_collection = CollectionStatus.facade_data_last_collected <= cutoff_date limit = max_repo-active_repo_count - repo_git_identifiers = get_collection_status_repo_git_from_filter(session,and_(not_pending,not_failed_clone,not_erroed, not_collecting, not_initializing, or_(never_collected, old_collection)),limit) + facade_order = CollectionStatus.facade_weight - session.logger.info(f"Starting facade collection on {len(repo_git_identifiers)} repos") - if len(repo_git_identifiers) == 0: - return + collection_size = start_block_of_repos( + session.logger, session, + and_(not_pending,not_failed_clone,not_erroed, not_collecting, not_initializing,never_collected), + limit, facade_enabled_phases, + hook="facade", + sort=facade_order + ) - session.logger.info(f"Facade collection starting for: {tuple(repo_git_identifiers)}") + limit -= collection_size + collected_before = CollectionStatus.facade_data_last_collected != None - facade_augur_collection = AugurTaskRoutine(session,repos=repo_git_identifiers,collection_phases=facade_enabled_phases,collection_hook="facade") + if limit > 0: + start_block_of_repos( + session.logger, session, + and_(not_pending,not_failed_clone,not_erroed, not_collecting, not_initializing,collected_before), + limit, facade_enabled_phases, + hook="facade", + sort=facade_order + ) - facade_augur_collection.start_data_collection() @celery.task def augur_collection_monitor(): @@ -336,15 +361,59 @@ def augur_collection_monitor(): enabled_phase_names = get_enabled_phase_names_from_config(session.logger, session) if primary_repo_collect_phase.__name__ in enabled_phase_names: - start_primary_collection(session, max_repo=40, days=30) + start_primary_collection(session, max_repo=40) if secondary_repo_collect_phase.__name__ in enabled_phase_names: - start_secondary_collection(session, max_repo=10, days=30) + start_secondary_collection(session, max_repo=5) if facade_phase.__name__ in enabled_phase_names: #Schedule facade collection before clone/updates as that is a higher priority - start_facade_collection(session, max_repo=15, days=30) + start_facade_collection(session, max_repo=15) start_facade_clone_update(session,max_repo=5,days=30) +@celery.task +def augur_collection_update_weights(): + + from augur.tasks.init.celery_app import engine + + logger = logging.getLogger(augur_collection_update_weights.__name__) + + logger.info("Updating stale collection weights") + + with DatabaseSession(logger,engine) as session: + + core_weight_update_repos = session.query(CollectionStatus).filter(CollectionStatus.core_weight != None).all() + + for status in core_weight_update_repos: + repo = Repo.get_by_id(session, status.repo_id) + + repo_git = repo.repo_git + status = repo.collection_status[0] + raw_count = status.issue_pr_sum + + core_task_update_weight_util([int(raw_count)],repo_git=repo_git,session=session) + + facade_not_pending = CollectionStatus.facade_status != CollectionState.PENDING.value + facade_not_failed = CollectionStatus.facade_status != CollectionState.FAILED_CLONE.value + facade_weight_not_null = CollectionStatus.facade_weight != None + + facade_weight_update_repos = session.query(CollectionStatus).filter(and_(facade_not_pending,facade_not_failed,facade_weight_not_null)).all() + + for status in facade_weight_update_repos: + repo = Repo.get_by_id(session, status.repo_id) + + commit_count = status.commit_sum + date_factor = get_facade_weight_time_factor(session, repo.repo_git) + weight = commit_count - date_factor + + update_query = ( + update(CollectionStatus) + .where(CollectionStatus.repo_id == status.repo_id) + .values(facade_weight=weight) + ) + + session.execute(update_query) + session.commit() + #git_update_commit_count_weight(repo_git) diff --git a/augur/tasks/util/collection_util.py b/augur/tasks/util/collection_util.py index 53824a9ab7..3b0e679dc7 100644 --- a/augur/tasks/util/collection_util.py +++ b/augur/tasks/util/collection_util.py @@ -11,18 +11,19 @@ #from celery.result import AsyncResult from celery import signature from celery import group, chain, chord, signature -from sqlalchemy import or_, and_ +from sqlalchemy import or_, and_, update from augur.application.logs import AugurLogger from augur.tasks.init.celery_app import celery_app as celery from augur.application.db.models import CollectionStatus, Repo from augur.application.db.util import execute_session_query from augur.application.config import AugurConfig -from augur.tasks.github.util.util import get_owner_repo +from augur.tasks.github.util.util import get_owner_repo, get_repo_weight_core, get_repo_weight_by_issue from augur.tasks.github.util.gh_graphql_entities import GitHubRepo as GitHubRepoGraphql from augur.tasks.github.util.gh_graphql_entities import GraphQlPageCollection from augur.tasks.github.util.github_task_session import GithubTaskManifest from augur.application.db.session import DatabaseSession -from augur.tasks.git.util.facade_worker.facade_worker.facade02utilitymethods import get_repo_weight_by_commit +from augur.tasks.util.worker_util import calculate_date_weight_from_timestamps + # class syntax class CollectionState(Enum): @@ -46,8 +47,12 @@ def get_enabled_phase_names_from_config(logger, session): #Query db for CollectionStatus records that fit the desired condition. #Used to get CollectionStatus for differant collection hooks -def get_collection_status_repo_git_from_filter(session,filter_condition,limit): - repo_status_list = session.query(CollectionStatus).filter(filter_condition).limit(limit).all() +def get_collection_status_repo_git_from_filter(session,filter_condition,limit,order=None): + + if order is not None: + repo_status_list = session.query(CollectionStatus).order_by(order).filter(filter_condition).limit(limit).all() + else: + repo_status_list = session.query(CollectionStatus).filter(filter_condition).limit(limit).all() return [status.repo.repo_git for status in repo_status_list] @@ -131,42 +136,55 @@ def core_task_success_util(repo_git): session.commit() -def date_weight_factor(days_since_last_collection): - return (days_since_last_collection ** 3) / 25 +def update_repo_core_weight(logger,session,repo_git,raw_sum): + repo = Repo.get_by_repo_git(session, repo_git) + status = repo.collection_status[0] + try: + weight = raw_sum#get_repo_weight_core(logger,repo_git) -def get_repo_weight_by_issue(logger,repo_git,days_since_last_collection): + weight -= calculate_date_weight_from_timestamps(repo.repo_added, status.core_data_last_collected) + secondary_tasks_weight = raw_sum - calculate_date_weight_from_timestamps(repo.repo_added, status.secondary_data_last_collected) + except Exception as e: + logger.error(f"{e}") + weight = None + secondary_tasks_weight = None - owner,name = get_owner_repo(repo_git) + logger.info(f"Repo {repo_git} has a weight of {weight}") - with GithubTaskManifest(logger) as manifest: - repo_graphql = GitHubRepoGraphql(logger, manifest.key_auth, owner, name) - number_of_issues_and_prs = len(repo_graphql.get_issues_collection()) + len(repo_graphql.get_pull_requests_collection()) - - return number_of_issues_and_prs - date_weight_factor(days_since_last_collection) + logger.info(f"Args: {raw_sum} , {repo_git}") + if weight is None: + return -#Get the weight for each repo for the core collection hook -def get_repo_weight_core(logger,repo_git): - from augur.tasks.init.celery_app import engine - with DatabaseSession(logger,engine) as session: - repo = Repo.get_by_repo_git(session, repo_git) - if not repo: - raise Exception(f"Task with repo_git of {repo_git} but could not be found in Repo table") + update_query = ( + update(CollectionStatus) + .where(CollectionStatus.repo_id == repo.repo_id) + .values(core_weight=weight,issue_pr_sum=raw_sum,secondary_weight=secondary_tasks_weight) + ) - status = repo.collection_status[0] + session.execute(update_query) + session.commit() - last_collected = status.core_data_last_collected - if last_collected: - time_delta = datetime.datetime.now() - last_collected - days = time_delta.days - else: - days = 0 - return get_repo_weight_by_issue(logger, repo_git, days) +#This task updates the weight with the issues and prs already passed in +@celery.task +def core_task_update_weight_util(issue_and_pr_nums,repo_git=None,session=None): + from augur.tasks.init.celery_app import engine + logger = logging.getLogger(core_task_update_weight_util.__name__) + + if repo_git is None: + return + + if session is not None: + update_repo_core_weight(logger, session, repo_git, sum(issue_and_pr_nums)) + else: + with DatabaseSession(logger,engine=engine) as session: + update_repo_core_weight(logger,session,repo_git,sum(issue_and_pr_nums)) + @celery.task @@ -237,24 +255,7 @@ def facade_task_success_util(repo_git): session.commit() -def get_repo_weight_facade(logger,repo_git): - from augur.tasks.init.celery_app import engine - with DatabaseSession(logger,engine) as session: - repo = Repo.get_by_repo_git(session, repo_git) - if not repo: - raise Exception(f"Task with repo_git of {repo_git} but could not be found in Repo table") - - status = repo.collection_status[0] - last_collected = status.facade_data_last_collected - - if last_collected: - time_delta = datetime.datetime.now() - last_collected - days = time_delta.days - else: - days = 0 - - return get_repo_weight_by_commit(logger, repo_git, days) @celery.task @@ -337,7 +338,7 @@ class to keep track of various groups of collection tasks for a group of repos. session: Database session to use """ def __init__(self,session,repos: List[str]=[],collection_phases: List=[],collection_hook: str="core"): - self.logger = AugurLogger("data_collection_jobs").get_logger() + self.logger = session.logger #self.session = TaskSession(self.logger) self.collection_phases = collection_phases #self.disabled_collection_tasks = disabled_collection_tasks @@ -377,8 +378,8 @@ def send_messages(self): for repo_git in self.repos: - repo = self.session.query(Repo).filter(Repo.repo_git == repo_git).one() - repo_id = repo.repo_id + #repo = self.session.query(Repo).filter(Repo.repo_git == repo_git).one() + #repo_id = repo.repo_id augur_collection_sequence = [] for job in self.collection_phases: @@ -391,98 +392,22 @@ def send_messages(self): augur_collection_chain = chain(*augur_collection_sequence) task_id = augur_collection_chain.apply_async(link_error=task_failed_util.s()).task_id - self.logger.info(f"Setting repo_id {repo_id} to collecting for repo: {repo_git}") + self.logger.info(f"Setting repo {self.collection_hook} status to collecting for repo: {repo_git}") #yield the value of the task_id to the calling method so that the proper collectionStatus field can be updated yield repo_git, task_id +def start_block_of_repos(logger,session,condition,limit,phases,hook="core",sort=None): + repo_git_identifiers = get_collection_status_repo_git_from_filter(session,condition,limit,order=sort) -class AugurWeightedTaskRoutine(AugurTaskRoutine): - """ - class to keep track of various groups of collection tasks for a group of repos. - Intermediate class that takes into account relative weights of repos and stops after - a set limit of repos limited by their size. - - - Attributes: - logger (Logger): Get logger from AugurLogger - repos (List[str]): List of repo_ids to run collection on. - collection_phases (List[str]): List of phases to run in augur collection. - collection_hook (str): String determining the attributes to update when collection for a repo starts. e.g. core - session: Database session to use - total_repo_weight (AugurCollectionTotalRepoWeight): object that allows repo objects and repo_git strings to be subtracted from it - """ - def __init__(self,session,repos: List[str]=[],collection_phases: List[str]=[],collection_hook: str="core",total_repo_weight=10000): - - #Define superclass vars - super().__init__(session,repos=repos,collection_phases=collection_phases,collection_hook=collection_hook) - - #Define Total repo weight - if collection_hook == "core": - #Core collection hook has a repo weight of - self.total_repo_weight = AugurCollectionTotalRepoWeight(total_repo_weight) - elif collection_hook == "secondary": - self.total_repo_weight = AugurCollectionTotalRepoWeight(total_repo_weight,weight_calculation=get_repo_weight_secondary) - elif collection_hook == "facade": - self.total_repo_weight = AugurCollectionTotalRepoWeight(total_repo_weight,weight_calculation=get_repo_weight_facade) + logger.info(f"Starting new collection on {hook}: {len(repo_git_identifiers)} repos") + if len(repo_git_identifiers) == 0: + return 0 + logger.info(f"Collection starting for {hook}: {tuple(repo_git_identifiers)}") - #Overwrite super method - #now returns resulting weight after either reaching zero or - #scheduling all repos assigned to the object. - def start_data_collection(self): - super().start_data_collection() - - return self.total_repo_weight.value + routine = AugurTaskRoutine(session,repos=repo_git_identifiers,collection_phases=phases,collection_hook=hook) - def send_messages(self): - augur_collection_list = [] - - for repo_git in self.repos: - #Check total repo weight - if self.total_repo_weight.value == 0: - break - - #Subtract repo's weight - self.total_repo_weight = self.total_repo_weight - repo_git - - repo = self.session.query(Repo).filter(Repo.repo_git == repo_git).one() - repo_id = repo.repo_id - - augur_collection_sequence = [] - for job in self.collection_phases: - #Add the phase to the sequence in order as a celery task. - #The preliminary task creates the larger task chain - augur_collection_sequence.append(job(repo_git)) - - #augur_collection_sequence.append(core_task_success_util.si(repo_git)) - #Link all phases in a chain and send to celery - augur_collection_chain = chain(*augur_collection_sequence) - task_id = augur_collection_chain.apply_async(link_error=task_failed_util.s()).task_id - - self.logger.info(f"Setting repo_id {repo_id} to collecting for repo: {repo_git}") - - #yield the value of the task_id to the calling method so that the proper collectionStatus field can be updated - yield repo_git, task_id - -if __name__ == "__main__": - #Examples of using AugurCollectionTotalRepoWeight - weight = AugurCollectionTotalRepoWeight(10000) - print(f"Weight value: {weight.value}") - - #Apply subtraction operation with string - weight = weight - "https://github.com/chaoss/augur" - print(f"Weight value: {weight.value}") - - #Apply subtraction operation with orm object - with DatabaseSession(logging.getLogger()) as session: - repo = Repo.get_by_repo_git(session, 'https://github.com/operate-first/blueprint') - weight = weight - repo - - print(f"Weight value: {weight.value}") + routine.start_data_collection() - #Use commit count instead of issues and pr count - commitWeight = AugurCollectionTotalRepoWeight(100000,weight_calculation=get_repo_weight_facade) - print(f"commit weight value: {commitWeight.value}") - #commitWeight = commitWeight - "https://github.com/eclipse/che-theia-activity-tracker" - #print(f"commit weight value: {commitWeight.value}") \ No newline at end of file + return len(repo_git_identifiers) \ No newline at end of file diff --git a/augur/tasks/util/redis_scalar.py b/augur/tasks/util/redis_scalar.py new file mode 100644 index 0000000000..88ebfb3d43 --- /dev/null +++ b/augur/tasks/util/redis_scalar.py @@ -0,0 +1,37 @@ +"""This module defines the RedisCount class. +It imports the redis_connection as redis which is a connection to the redis cache +""" +from typing import Iterable, Any, Union + +from collections.abc import MutableSequence +from augur.tasks.init.redis_connection import redis_connection as redis +from augur import instance_id +from redis import exceptions +import numbers + +class RedisScalar: + + def __init__(self, scalar_name: str, default_value: int = 0, override_existing: bool = False): + + self.redis_scalar_key = f"{instance_id}_{scalar_name}" + self._scalar_name = scalar_name + + self.__value = default_value + + #Check redis to see if key exists in cache + if 1 != redis.exists(self.redis_scalar_key) or override_existing: + #Set value + redis.set(self.redis_scalar_key,self.__value) + else: + #else get the value + self.__value = int(float(redis.get(self.redis_scalar_key))) + + @property + def value(self): + return self.__value + + @value.setter + def value(self, otherVal): + if isinstance(otherVal, numbers.Number): + self.__value = otherVal + redis.set(self.redis_scalar_key,self.__value) diff --git a/augur/tasks/util/worker_util.py b/augur/tasks/util/worker_util.py index b0cb335d94..cbfeb0e904 100644 --- a/augur/tasks/util/worker_util.py +++ b/augur/tasks/util/worker_util.py @@ -8,7 +8,7 @@ from celery.result import allow_join_result from typing import Optional, List, Any, Tuple - +from datetime import datetime, timedelta def create_grouped_task_load(*args,processes=8,dataList=[],task=None): @@ -100,6 +100,28 @@ def remove_duplicate_naturals(data, natural_keys): #print(new_data) return new_data +#4th root of 10,000 is 10 +#ten days for a 10,000 weight repo to reach zero. +def date_weight_factor(days_since_last_collection,domain_shift=0): + return (days_since_last_collection - domain_shift) ** 4 + +def calculate_date_weight_from_timestamps(added,last_collection,domain_start_days=30): + #Get the time since last collection as well as when the repo was added. + if last_collection is None: + delta = datetime.now() - added + return date_weight_factor(delta.days) + else: + delta = datetime.now() - last_collection + + factor = date_weight_factor(delta.days,domain_shift=domain_start_days) + + #If the repo is older than thirty days, start to decrease its weight. + if delta.days >= domain_start_days: + return factor + else: + #Else increase its weight + return -1 * factor + # def create_server(app, worker=None): diff --git a/scripts/install/install.sh b/scripts/install/install.sh index 479640f8cb..f5dbe89902 100755 --- a/scripts/install/install.sh +++ b/scripts/install/install.sh @@ -89,7 +89,6 @@ scripts/install/config.sh $target augur db check-pgpass - echo "**********************************" echo "***** INSTALLATION COMPLETE *****" echo "**********************************"