Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Secondary collection update #2181

Merged
merged 7 commits into from
Feb 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 13 additions & 4 deletions augur/application/cli/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
from augur.tasks.init.redis_connection import redis_connection
from augur.application.db.models import Repo, CollectionStatus
from augur.application.db.session import DatabaseSession
from augur.application.db.util import execute_session_query
from augur.application.logs import AugurLogger
from augur.application.config import AugurConfig
from augur.application.cli import test_connection, test_db_connection
import sqlalchemy as s
from sqlalchemy import or_, and_


logger = AugurLogger("augur", reset_logfiles=True).get_logger()
Expand Down Expand Up @@ -116,17 +118,24 @@ def start(disable_collection, development, port):
create_collection_status(logger)

with DatabaseSession(logger) as session:
collection_status_list = session.query(CollectionStatus).filter(CollectionStatus.core_status == CollectionState.COLLECTING.value
or CollectionStatus.secondary_status == CollectionState.COLLECTING.value)
primaryCollecting = CollectionStatus.core_status == CollectionState.COLLECTING.value
secondaryCollecting = CollectionStatus.secondary_status == CollectionState.COLLECTING.value

query = session.query(CollectionStatus).filter(or_(primaryCollecting,secondaryCollecting))

collection_status_list = execute_session_query(query,'all')

for status in collection_status_list:
repo = status.repo
repo.repo_name = None
repo.repo_path = None
repo.repo_status = "New"

status.core_status = "Pending"
status.secondary_status = "Pending"

collection_status_list.update({CollectionStatus.core_status: "Pending"})
collection_status_list.update({CollectionStatus.secondary_status: "Pending"})
#collection_status_list.update({CollectionStatus.core_status: "Pending"})
#collection_status_list.update({CollectionStatus.secondary_status: "Pending"})
session.commit()

augur_collection_monitor.si().apply_async()
Expand Down
115 changes: 83 additions & 32 deletions augur/tasks/start_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,6 @@ def core_task_success(repo_git):
collection_status.core_data_last_collected = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
collection_status.core_task_id = None

#TODO: remove when secondary tasks are changed to start elsewhere.
collection_status.secondary_status = CollectionState.COLLECTING.value

session.commit()

@celery.task
Expand Down Expand Up @@ -134,8 +131,13 @@ def task_failed(request,exc,traceback):
collectionRecord.core_status = CollectionStatus.ERROR
session.commit()

# log traceback to error file
session.logger.error(f"Task {request.id} raised exception: {exc}\n{traceback}")
if collectionRecord.secondary_status == CollectionState.COLLECTING.value:
# set status to Error in db
collectionRecord.secondary_status = CollectionStatus.ERROR
session.commit()

# log traceback to error file
session.logger.error(f"Task {request.id} raised exception: {exc}\n{traceback}")



Expand Down Expand Up @@ -287,6 +289,16 @@ def non_repo_domain_tasks():
#Get list of enabled phases
enabled_phase_names = [name for name, phase in phase_options.items() if phase == 1]

#Disable augur from running these tasks more than once unless requested
query = s.sql.text("""
UPDATE augur_operations.config
SET value=0
WHERE section_name='Task_Routine'
AND setting_name='machine_learning_phase'
""")

session.execute_sql(query)

enabled_tasks = []

enabled_tasks.extend(generate_non_repo_domain_facade_tasks(logger))
Expand All @@ -302,6 +314,13 @@ def non_repo_domain_tasks():
tasks.apply_async()


#Query db for CollectionStatus records that fit the desired condition.
#Used to get CollectionStatus for differant collection hooks
def get_collection_status_repo_git_from_filter(session,filter_condition,limit):
repo_status_list = session.query(CollectionStatus).filter(filter_condition).limit(limit).all()

return [status.repo.repo_git for status in repo_status_list]


@celery.task
def augur_collection_monitor():
Expand All @@ -312,43 +331,33 @@ def augur_collection_monitor():

logger.info("Checking for repos to collect")

coreCollection = [prelim_phase, primary_repo_collect_phase]

#Get phase options from the config
with DatabaseSession(logger, engine) as session:

max_repo_count = 50
max_repo_primary_count = 50
days = 30

config = AugurConfig(logger, session)
phase_options = config.get_section("Task_Routine")

#Get list of enabled phases
enabled_phase_names = [name for name, phase in phase_options.items() if phase == 1]
#enabled_phases = [phase for phase in coreCollection if phase.__name__ in enabled_phase_names]

enabled_phases = []
#Primary collection hook.
primary_enabled_phases = []

#Primary jobs
if prelim_phase.__name__ in enabled_phase_names:
enabled_phases.append(prelim_phase)
primary_enabled_phases.append(prelim_phase)

if primary_repo_collect_phase.__name__ in enabled_phase_names:
enabled_phases.append(primary_repo_collect_phase)
primary_enabled_phases.append(primary_repo_collect_phase)

#task success is scheduled no matter what the config says.
def core_task_success_gen(repo_git):
return core_task_success.si(repo_git)

enabled_phases.append(core_task_success_gen)

if secondary_repo_collect_phase.__name__ in enabled_phase_names:
enabled_phases.append(secondary_repo_collect_phase)

def secondary_task_success_gen(repo_git):
return secondary_task_success.si(repo_git)

enabled_phases.append(secondary_task_success_gen)
primary_enabled_phases.append(core_task_success_gen)

active_repo_count = len(session.query(CollectionStatus).filter(CollectionStatus.core_status == CollectionState.COLLECTING.value).all())

Expand All @@ -358,32 +367,74 @@ def secondary_task_success_gen(repo_git):
never_collected = CollectionStatus.core_data_last_collected == None
old_collection = CollectionStatus.core_data_last_collected <= cutoff_date

limit = max_repo_count-active_repo_count
limit = max_repo_primary_count-active_repo_count

#Get repos for primary collection hook
repo_git_identifiers = get_collection_status_repo_git_from_filter(session,and_(not_erroed, not_collecting, or_(never_collected, old_collection)),limit)

logger.info(f"Starting primary collection on {len(repo_git_identifiers)} repos")

logger.info(f"Primary collection starting for: {tuple(repo_git_identifiers)}")

primary_augur_collection = AugurTaskRoutine(session,repos=repo_git_identifiers,collection_phases=primary_enabled_phases)

#Start data collection and update the collectionStatus with the task_ids
for repo_git, task_id in primary_augur_collection.start_data_collection():

repo = session.query(Repo).filter(Repo.repo_git == repo_git).one()

#set status in database to collecting
repoStatus = repo.collection_status[0]
repoStatus.core_task_id = task_id
#repoStatus.secondary_task_id = task_id
repoStatus.core_status = CollectionState.COLLECTING.value
session.commit()

#Deal with secondary collection
secondary_enabled_phases = []

repo_status_list = session.query(CollectionStatus).filter(and_(not_erroed, not_collecting, or_(never_collected, old_collection))).limit(limit).all()
if prelim_phase.__name__ in enabled_phase_names:
secondary_enabled_phases.append(prelim_phase)

repo_ids = [repo.repo_id for repo in repo_status_list]
if secondary_repo_collect_phase.__name__ in enabled_phase_names:
secondary_enabled_phases.append(secondary_repo_collect_phase)

def secondary_task_success_gen(repo_git):
return secondary_task_success.si(repo_git)

secondary_enabled_phases.append(secondary_task_success_gen)

repo_git_result = session.query(Repo).filter(Repo.repo_id.in_(tuple(repo_ids))).all()

repo_git_identifiers = [repo.repo_git for repo in repo_git_result]
max_repo_secondary_count = 30
active_repo_count = len(session.query(CollectionStatus).filter(CollectionStatus.secondary_status == CollectionState.COLLECTING.value).all())

cutoff_date = datetime.datetime.now() - datetime.timedelta(days=days)
not_erroed = CollectionStatus.secondary_status != str(CollectionState.ERROR.value)
not_collecting = CollectionStatus.secondary_status != str(CollectionState.COLLECTING.value)
never_collected = CollectionStatus.secondary_data_last_collected == None
old_collection = CollectionStatus.secondary_data_last_collected <= cutoff_date
primary_collected = CollectionStatus.core_status == str(CollectionState.SUCCESS.value)

logger.info(f"Starting collection on {len(repo_ids)} repos")
limit = max_repo_secondary_count-active_repo_count

logger.info(f"Collection starting for: {tuple(repo_git_identifiers)}")
repo_git_identifiers = get_collection_status_repo_git_from_filter(session,and_(primary_collected,not_erroed, not_collecting, or_(never_collected, old_collection)),limit)

augur_collection = AugurTaskRoutine(session,repos=repo_git_identifiers,collection_phases=enabled_phases)
logger.info(f"Starting secondary collection on {len(repo_git_identifiers)} repos")

logger.info(f"Secondary collection starting for: {tuple(repo_git_identifiers)}")

secondary_augur_collection = AugurTaskRoutine(session,repos=repo_git_identifiers,collection_phases=secondary_enabled_phases)

#Start data collection and update the collectionStatus with the task_ids
for repo_git, task_id in augur_collection.start_data_collection():
for repo_git, task_id in secondary_augur_collection.start_data_collection():

repo = session.query(Repo).filter(Repo.repo_git == repo_git).one()

#set status in database to collecting
repoStatus = repo.collection_status[0]
repoStatus.core_task_id = task_id
#repoStatus.core_task_id = task_id
repoStatus.secondary_task_id = task_id
repoStatus.core_status = CollectionState.COLLECTING.value
repoStatus.secondary_status = CollectionState.COLLECTING.value
session.commit()


Expand Down