From d38e3c706cd9be9eb5d0fb8b9291fb3c06c533e6 Mon Sep 17 00:00:00 2001 From: Isaac Milarsky Date: Tue, 14 Feb 2023 12:53:27 -0600 Subject: [PATCH 1/6] fix for ml phase running without checking if it had already run Signed-off-by: Isaac Milarsky --- augur/tasks/start_tasks.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/augur/tasks/start_tasks.py b/augur/tasks/start_tasks.py index 5f31c753a6..3404bc679b 100644 --- a/augur/tasks/start_tasks.py +++ b/augur/tasks/start_tasks.py @@ -287,6 +287,16 @@ def non_repo_domain_tasks(): #Get list of enabled phases enabled_phase_names = [name for name, phase in phase_options.items() if phase == 1] + #Disable augur from running these tasks more than once unless requested + query = s.sql.text(""" + UPDATE augur_operations.config + SET value=0 + WHERE section_name='Task_Routine' + AND setting_name='machine_learning_phase' + """) + + session.execute_sql(query) + enabled_tasks = [] enabled_tasks.extend(generate_non_repo_domain_facade_tasks(logger)) From 82031ee0594082d7443e6d3047e4f59a4a756837 Mon Sep 17 00:00:00 2001 From: Isaac Milarsky Date: Tue, 14 Feb 2023 13:14:28 -0600 Subject: [PATCH 2/6] fix problem query Signed-off-by: Isaac Milarsky --- augur/application/cli/backend.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/augur/application/cli/backend.py b/augur/application/cli/backend.py index be2582320e..61597844a7 100644 --- a/augur/application/cli/backend.py +++ b/augur/application/cli/backend.py @@ -23,10 +23,12 @@ from augur.tasks.init.redis_connection import redis_connection from augur.application.db.models import Repo, CollectionStatus from augur.application.db.session import DatabaseSession +from augur.application.db.util import execute_session_query from augur.application.logs import AugurLogger from augur.application.config import AugurConfig from augur.application.cli import test_connection, test_db_connection import sqlalchemy as s +from sqlalchemy import or_, and_ logger = AugurLogger("augur", reset_logfiles=True).get_logger() @@ -116,8 +118,12 @@ def start(disable_collection, development, port): create_collection_status(logger) with DatabaseSession(logger) as session: - collection_status_list = session.query(CollectionStatus).filter(CollectionStatus.core_status == CollectionState.COLLECTING.value - or CollectionStatus.secondary_status == CollectionState.COLLECTING.value) + primaryCollecting = CollectionStatus.core_status == CollectionState.COLLECTING.value + secondaryCollecting = CollectionStatus.secondary_status == CollectionState.COLLECTING.value + + query = session.query(CollectionStatus).filter(or_(primaryCollecting,secondaryCollecting)) + + collection_status_list = execute_session_query(query,'all') for status in collection_status_list: repo = status.repo From 4bfabd4cda97de6688b8ef30f1c1f6b9214ca44d Mon Sep 17 00:00:00 2001 From: Isaac Milarsky Date: Tue, 14 Feb 2023 13:23:49 -0600 Subject: [PATCH 3/6] syntax Signed-off-by: Isaac Milarsky --- augur/application/cli/backend.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/augur/application/cli/backend.py b/augur/application/cli/backend.py index 61597844a7..008afe8b26 100644 --- a/augur/application/cli/backend.py +++ b/augur/application/cli/backend.py @@ -130,9 +130,12 @@ def start(disable_collection, development, port): repo.repo_name = None repo.repo_path = None repo.repo_status = "New" + + status.core_status = "Pending" + status.secondary_status = "Pending" - collection_status_list.update({CollectionStatus.core_status: "Pending"}) - collection_status_list.update({CollectionStatus.secondary_status: "Pending"}) + #collection_status_list.update({CollectionStatus.core_status: "Pending"}) + #collection_status_list.update({CollectionStatus.secondary_status: "Pending"}) session.commit() augur_collection_monitor.si().apply_async() From 10988ab31a793178d05b742948205b902085b5be Mon Sep 17 00:00:00 2001 From: Isaac Milarsky Date: Wed, 15 Feb 2023 11:00:10 -0600 Subject: [PATCH 4/6] Changes to make secondary collection hook seperate from primary Signed-off-by: Isaac Milarsky --- augur/tasks/start_tasks.py | 108 ++++++++++++++++++++++++++----------- 1 file changed, 76 insertions(+), 32 deletions(-) diff --git a/augur/tasks/start_tasks.py b/augur/tasks/start_tasks.py index 3404bc679b..9b697dcf56 100644 --- a/augur/tasks/start_tasks.py +++ b/augur/tasks/start_tasks.py @@ -79,9 +79,6 @@ def core_task_success(repo_git): collection_status.core_data_last_collected = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') collection_status.core_task_id = None - #TODO: remove when secondary tasks are changed to start elsewhere. - collection_status.secondary_status = CollectionState.COLLECTING.value - session.commit() @celery.task @@ -134,8 +131,13 @@ def task_failed(request,exc,traceback): collectionRecord.core_status = CollectionStatus.ERROR session.commit() - # log traceback to error file - session.logger.error(f"Task {request.id} raised exception: {exc}\n{traceback}") + if collectionRecord.secondary_status == CollectionState.COLLECTING.value: + # set status to Error in db + collectionRecord.secondary_status = CollectionStatus.ERROR + session.commit() + + # log traceback to error file + session.logger.error(f"Task {request.id} raised exception: {exc}\n{traceback}") @@ -312,6 +314,17 @@ def non_repo_domain_tasks(): tasks.apply_async() +#Query db for CollectionStatus records that fit the desired condition. +#Used to get CollectionStatus for differant collection hooks +def get_collection_status_repo_git_from_filter(session,filter_condition,limit): + repo_status_list = session.query(CollectionStatus).filter(filter_condition).limit(limit).all() + + repo_ids = [repo.repo_id for repo in repo_status_list] + + repo_git_result = session.query(Repo).filter(Repo.repo_id.in_(tuple(repo_ids))).all() + + return [repo.repo_git for repo in repo_git_result] + @celery.task def augur_collection_monitor(): @@ -322,12 +335,10 @@ def augur_collection_monitor(): logger.info("Checking for repos to collect") - coreCollection = [prelim_phase, primary_repo_collect_phase] - #Get phase options from the config with DatabaseSession(logger, engine) as session: - max_repo_count = 50 + max_repo_primary_count = 50 days = 30 config = AugurConfig(logger, session) @@ -335,30 +346,22 @@ def augur_collection_monitor(): #Get list of enabled phases enabled_phase_names = [name for name, phase in phase_options.items() if phase == 1] - #enabled_phases = [phase for phase in coreCollection if phase.__name__ in enabled_phase_names] - enabled_phases = [] + #Primary collection hook. + primary_enabled_phases = [] #Primary jobs if prelim_phase.__name__ in enabled_phase_names: - enabled_phases.append(prelim_phase) + primary_enabled_phases.append(prelim_phase) if primary_repo_collect_phase.__name__ in enabled_phase_names: - enabled_phases.append(primary_repo_collect_phase) + primary_enabled_phases.append(primary_repo_collect_phase) #task success is scheduled no matter what the config says. def core_task_success_gen(repo_git): return core_task_success.si(repo_git) - enabled_phases.append(core_task_success_gen) - - if secondary_repo_collect_phase.__name__ in enabled_phase_names: - enabled_phases.append(secondary_repo_collect_phase) - - def secondary_task_success_gen(repo_git): - return secondary_task_success.si(repo_git) - - enabled_phases.append(secondary_task_success_gen) + primary_enabled_phases.append(core_task_success_gen) active_repo_count = len(session.query(CollectionStatus).filter(CollectionStatus.core_status == CollectionState.COLLECTING.value).all()) @@ -368,32 +371,73 @@ def secondary_task_success_gen(repo_git): never_collected = CollectionStatus.core_data_last_collected == None old_collection = CollectionStatus.core_data_last_collected <= cutoff_date - limit = max_repo_count-active_repo_count + limit = max_repo_primary_count-active_repo_count + + #Get repos for primary collection hook + repo_git_identifiers = get_collection_status_repo_git_from_filter(session,and_(not_erroed, not_collecting, or_(never_collected, old_collection)),limit) + + logger.info(f"Starting primary collection on {len(repo_git_identifiers)} repos") + + logger.info(f"Primary collection starting for: {tuple(repo_git_identifiers)}") + + primary_augur_collection = AugurTaskRoutine(session,repos=repo_git_identifiers,collection_phases=primary_enabled_phases) + + #Start data collection and update the collectionStatus with the task_ids + for repo_git, task_id in primary_augur_collection.start_data_collection(): + + repo = session.query(Repo).filter(Repo.repo_git == repo_git).one() + + #set status in database to collecting + repoStatus = repo.collection_status[0] + repoStatus.core_task_id = task_id + #repoStatus.secondary_task_id = task_id + repoStatus.core_status = CollectionState.COLLECTING.value + session.commit() + + #Deal with secondary collection + secondary_enabled_phases = [] - repo_status_list = session.query(CollectionStatus).filter(and_(not_erroed, not_collecting, or_(never_collected, old_collection))).limit(limit).all() + if prelim_phase.__name__ in enabled_phase_names: + secondary_enabled_phases.append(prelim_phase) - repo_ids = [repo.repo_id for repo in repo_status_list] + if secondary_repo_collect_phase.__name__ in enabled_phase_names: + secondary_enabled_phases.append(secondary_repo_collect_phase) + + def secondary_task_success_gen(repo_git): + return secondary_task_success.si(repo_git) + + secondary_enabled_phases.append(secondary_task_success_gen) - repo_git_result = session.query(Repo).filter(Repo.repo_id.in_(tuple(repo_ids))).all() - repo_git_identifiers = [repo.repo_git for repo in repo_git_result] + max_repo_secondary_count = 30 + active_repo_count = len(session.query(CollectionStatus).filter(CollectionStatus.secondary_status == CollectionState.COLLECTING.value).all()) + + cutoff_date = datetime.datetime.now() - datetime.timedelta(days=days) + not_erroed = CollectionStatus.secondary_status != str(CollectionState.ERROR.value) + not_collecting = CollectionStatus.secondary_status != str(CollectionState.COLLECTING.value) + never_collected = CollectionStatus.secondary_data_last_collected == None + old_collection = CollectionStatus.secondary_data_last_collected <= cutoff_date - logger.info(f"Starting collection on {len(repo_ids)} repos") + limit = max_repo_secondary_count-active_repo_count - logger.info(f"Collection starting for: {tuple(repo_git_identifiers)}") + repo_git_identifiers = get_collection_status_repo_git_from_filter(session,and_(not_erroed, not_collecting, or_(never_collected, old_collection)),limit) - augur_collection = AugurTaskRoutine(session,repos=repo_git_identifiers,collection_phases=enabled_phases) + logger.info(f"Starting secondary collection on {len(repo_git_identifiers)} repos") + + logger.info(f"Secondary collection starting for: {tuple(repo_git_identifiers)}") + + secondary_augur_collection = AugurTaskRoutine(session,repos=repo_git_identifiers,collection_phases=secondary_enabled_phases) #Start data collection and update the collectionStatus with the task_ids - for repo_git, task_id in augur_collection.start_data_collection(): + for repo_git, task_id in secondary_augur_collection.start_data_collection(): repo = session.query(Repo).filter(Repo.repo_git == repo_git).one() #set status in database to collecting repoStatus = repo.collection_status[0] - repoStatus.core_task_id = task_id + #repoStatus.core_task_id = task_id repoStatus.secondary_task_id = task_id - repoStatus.core_status = CollectionState.COLLECTING.value + repoStatus.secondary_status = CollectionState.COLLECTING.value session.commit() From 02d046c6dac7fd879449fc070d65df0ff49deb82 Mon Sep 17 00:00:00 2001 From: Isaac Milarsky Date: Wed, 15 Feb 2023 11:57:37 -0600 Subject: [PATCH 5/6] Missing subcondition Signed-off-by: Isaac Milarsky --- augur/tasks/start_tasks.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/augur/tasks/start_tasks.py b/augur/tasks/start_tasks.py index 9b697dcf56..1b43785b95 100644 --- a/augur/tasks/start_tasks.py +++ b/augur/tasks/start_tasks.py @@ -417,10 +417,11 @@ def secondary_task_success_gen(repo_git): not_collecting = CollectionStatus.secondary_status != str(CollectionState.COLLECTING.value) never_collected = CollectionStatus.secondary_data_last_collected == None old_collection = CollectionStatus.secondary_data_last_collected <= cutoff_date + primary_collected = CollectionStatus.core_status == str(CollectionState.SUCCESS.value) limit = max_repo_secondary_count-active_repo_count - repo_git_identifiers = get_collection_status_repo_git_from_filter(session,and_(not_erroed, not_collecting, or_(never_collected, old_collection)),limit) + repo_git_identifiers = get_collection_status_repo_git_from_filter(session,and_(primary_collected,not_erroed, not_collecting, or_(never_collected, old_collection)),limit) logger.info(f"Starting secondary collection on {len(repo_git_identifiers)} repos") From 97cbaa9f4c472f35831a1070279d92e6da4f6c41 Mon Sep 17 00:00:00 2001 From: Andrew Brain <61482022+ABrain7710@users.noreply.github.com> Date: Wed, 15 Feb 2023 15:13:10 -0600 Subject: [PATCH 6/6] Simplify query Signed-off-by: Andrew Brain <61482022+ABrain7710@users.noreply.github.com> --- augur/tasks/start_tasks.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/augur/tasks/start_tasks.py b/augur/tasks/start_tasks.py index 9b697dcf56..d2cfbf4970 100644 --- a/augur/tasks/start_tasks.py +++ b/augur/tasks/start_tasks.py @@ -319,11 +319,7 @@ def non_repo_domain_tasks(): def get_collection_status_repo_git_from_filter(session,filter_condition,limit): repo_status_list = session.query(CollectionStatus).filter(filter_condition).limit(limit).all() - repo_ids = [repo.repo_id for repo in repo_status_list] - - repo_git_result = session.query(Repo).filter(Repo.repo_id.in_(tuple(repo_ids))).all() - - return [repo.repo_git for repo in repo_git_result] + return [status.repo.repo_git for status in repo_status_list] @celery.task