diff --git a/augur/application/cli/backend.py b/augur/application/cli/backend.py index be2582320e..008afe8b26 100644 --- a/augur/application/cli/backend.py +++ b/augur/application/cli/backend.py @@ -23,10 +23,12 @@ from augur.tasks.init.redis_connection import redis_connection from augur.application.db.models import Repo, CollectionStatus from augur.application.db.session import DatabaseSession +from augur.application.db.util import execute_session_query from augur.application.logs import AugurLogger from augur.application.config import AugurConfig from augur.application.cli import test_connection, test_db_connection import sqlalchemy as s +from sqlalchemy import or_, and_ logger = AugurLogger("augur", reset_logfiles=True).get_logger() @@ -116,17 +118,24 @@ def start(disable_collection, development, port): create_collection_status(logger) with DatabaseSession(logger) as session: - collection_status_list = session.query(CollectionStatus).filter(CollectionStatus.core_status == CollectionState.COLLECTING.value - or CollectionStatus.secondary_status == CollectionState.COLLECTING.value) + primaryCollecting = CollectionStatus.core_status == CollectionState.COLLECTING.value + secondaryCollecting = CollectionStatus.secondary_status == CollectionState.COLLECTING.value + + query = session.query(CollectionStatus).filter(or_(primaryCollecting,secondaryCollecting)) + + collection_status_list = execute_session_query(query,'all') for status in collection_status_list: repo = status.repo repo.repo_name = None repo.repo_path = None repo.repo_status = "New" + + status.core_status = "Pending" + status.secondary_status = "Pending" - collection_status_list.update({CollectionStatus.core_status: "Pending"}) - collection_status_list.update({CollectionStatus.secondary_status: "Pending"}) + #collection_status_list.update({CollectionStatus.core_status: "Pending"}) + #collection_status_list.update({CollectionStatus.secondary_status: "Pending"}) session.commit() augur_collection_monitor.si().apply_async() diff --git a/augur/tasks/start_tasks.py b/augur/tasks/start_tasks.py index 5f31c753a6..1bff697086 100644 --- a/augur/tasks/start_tasks.py +++ b/augur/tasks/start_tasks.py @@ -79,9 +79,6 @@ def core_task_success(repo_git): collection_status.core_data_last_collected = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') collection_status.core_task_id = None - #TODO: remove when secondary tasks are changed to start elsewhere. - collection_status.secondary_status = CollectionState.COLLECTING.value - session.commit() @celery.task @@ -134,8 +131,13 @@ def task_failed(request,exc,traceback): collectionRecord.core_status = CollectionStatus.ERROR session.commit() - # log traceback to error file - session.logger.error(f"Task {request.id} raised exception: {exc}\n{traceback}") + if collectionRecord.secondary_status == CollectionState.COLLECTING.value: + # set status to Error in db + collectionRecord.secondary_status = CollectionStatus.ERROR + session.commit() + + # log traceback to error file + session.logger.error(f"Task {request.id} raised exception: {exc}\n{traceback}") @@ -287,6 +289,16 @@ def non_repo_domain_tasks(): #Get list of enabled phases enabled_phase_names = [name for name, phase in phase_options.items() if phase == 1] + #Disable augur from running these tasks more than once unless requested + query = s.sql.text(""" + UPDATE augur_operations.config + SET value=0 + WHERE section_name='Task_Routine' + AND setting_name='machine_learning_phase' + """) + + session.execute_sql(query) + enabled_tasks = [] enabled_tasks.extend(generate_non_repo_domain_facade_tasks(logger)) @@ -302,6 +314,13 @@ def non_repo_domain_tasks(): tasks.apply_async() +#Query db for CollectionStatus records that fit the desired condition. +#Used to get CollectionStatus for differant collection hooks +def get_collection_status_repo_git_from_filter(session,filter_condition,limit): + repo_status_list = session.query(CollectionStatus).filter(filter_condition).limit(limit).all() + + return [status.repo.repo_git for status in repo_status_list] + @celery.task def augur_collection_monitor(): @@ -312,12 +331,10 @@ def augur_collection_monitor(): logger.info("Checking for repos to collect") - coreCollection = [prelim_phase, primary_repo_collect_phase] - #Get phase options from the config with DatabaseSession(logger, engine) as session: - max_repo_count = 50 + max_repo_primary_count = 50 days = 30 config = AugurConfig(logger, session) @@ -325,30 +342,22 @@ def augur_collection_monitor(): #Get list of enabled phases enabled_phase_names = [name for name, phase in phase_options.items() if phase == 1] - #enabled_phases = [phase for phase in coreCollection if phase.__name__ in enabled_phase_names] - enabled_phases = [] + #Primary collection hook. + primary_enabled_phases = [] #Primary jobs if prelim_phase.__name__ in enabled_phase_names: - enabled_phases.append(prelim_phase) + primary_enabled_phases.append(prelim_phase) if primary_repo_collect_phase.__name__ in enabled_phase_names: - enabled_phases.append(primary_repo_collect_phase) + primary_enabled_phases.append(primary_repo_collect_phase) #task success is scheduled no matter what the config says. def core_task_success_gen(repo_git): return core_task_success.si(repo_git) - enabled_phases.append(core_task_success_gen) - - if secondary_repo_collect_phase.__name__ in enabled_phase_names: - enabled_phases.append(secondary_repo_collect_phase) - - def secondary_task_success_gen(repo_git): - return secondary_task_success.si(repo_git) - - enabled_phases.append(secondary_task_success_gen) + primary_enabled_phases.append(core_task_success_gen) active_repo_count = len(session.query(CollectionStatus).filter(CollectionStatus.core_status == CollectionState.COLLECTING.value).all()) @@ -358,32 +367,74 @@ def secondary_task_success_gen(repo_git): never_collected = CollectionStatus.core_data_last_collected == None old_collection = CollectionStatus.core_data_last_collected <= cutoff_date - limit = max_repo_count-active_repo_count + limit = max_repo_primary_count-active_repo_count + + #Get repos for primary collection hook + repo_git_identifiers = get_collection_status_repo_git_from_filter(session,and_(not_erroed, not_collecting, or_(never_collected, old_collection)),limit) + + logger.info(f"Starting primary collection on {len(repo_git_identifiers)} repos") + + logger.info(f"Primary collection starting for: {tuple(repo_git_identifiers)}") + + primary_augur_collection = AugurTaskRoutine(session,repos=repo_git_identifiers,collection_phases=primary_enabled_phases) + + #Start data collection and update the collectionStatus with the task_ids + for repo_git, task_id in primary_augur_collection.start_data_collection(): + + repo = session.query(Repo).filter(Repo.repo_git == repo_git).one() + + #set status in database to collecting + repoStatus = repo.collection_status[0] + repoStatus.core_task_id = task_id + #repoStatus.secondary_task_id = task_id + repoStatus.core_status = CollectionState.COLLECTING.value + session.commit() + + #Deal with secondary collection + secondary_enabled_phases = [] - repo_status_list = session.query(CollectionStatus).filter(and_(not_erroed, not_collecting, or_(never_collected, old_collection))).limit(limit).all() + if prelim_phase.__name__ in enabled_phase_names: + secondary_enabled_phases.append(prelim_phase) - repo_ids = [repo.repo_id for repo in repo_status_list] + if secondary_repo_collect_phase.__name__ in enabled_phase_names: + secondary_enabled_phases.append(secondary_repo_collect_phase) + + def secondary_task_success_gen(repo_git): + return secondary_task_success.si(repo_git) + + secondary_enabled_phases.append(secondary_task_success_gen) - repo_git_result = session.query(Repo).filter(Repo.repo_id.in_(tuple(repo_ids))).all() - repo_git_identifiers = [repo.repo_git for repo in repo_git_result] + max_repo_secondary_count = 30 + active_repo_count = len(session.query(CollectionStatus).filter(CollectionStatus.secondary_status == CollectionState.COLLECTING.value).all()) + + cutoff_date = datetime.datetime.now() - datetime.timedelta(days=days) + not_erroed = CollectionStatus.secondary_status != str(CollectionState.ERROR.value) + not_collecting = CollectionStatus.secondary_status != str(CollectionState.COLLECTING.value) + never_collected = CollectionStatus.secondary_data_last_collected == None + old_collection = CollectionStatus.secondary_data_last_collected <= cutoff_date + primary_collected = CollectionStatus.core_status == str(CollectionState.SUCCESS.value) - logger.info(f"Starting collection on {len(repo_ids)} repos") + limit = max_repo_secondary_count-active_repo_count - logger.info(f"Collection starting for: {tuple(repo_git_identifiers)}") + repo_git_identifiers = get_collection_status_repo_git_from_filter(session,and_(primary_collected,not_erroed, not_collecting, or_(never_collected, old_collection)),limit) - augur_collection = AugurTaskRoutine(session,repos=repo_git_identifiers,collection_phases=enabled_phases) + logger.info(f"Starting secondary collection on {len(repo_git_identifiers)} repos") + + logger.info(f"Secondary collection starting for: {tuple(repo_git_identifiers)}") + + secondary_augur_collection = AugurTaskRoutine(session,repos=repo_git_identifiers,collection_phases=secondary_enabled_phases) #Start data collection and update the collectionStatus with the task_ids - for repo_git, task_id in augur_collection.start_data_collection(): + for repo_git, task_id in secondary_augur_collection.start_data_collection(): repo = session.query(Repo).filter(Repo.repo_git == repo_git).one() #set status in database to collecting repoStatus = repo.collection_status[0] - repoStatus.core_task_id = task_id + #repoStatus.core_task_id = task_id repoStatus.secondary_task_id = task_id - repoStatus.core_status = CollectionState.COLLECTING.value + repoStatus.secondary_status = CollectionState.COLLECTING.value session.commit()