diff --git a/augur/api/routes/util.py b/augur/api/routes/util.py index 460f216ef6..0bdd4fb1c2 100644 --- a/augur/api/routes/util.py +++ b/augur/api/routes/util.py @@ -38,7 +38,6 @@ def get_all_repos(): repo.repo_name, repo.description, repo.repo_git AS url, - repo.repo_status, a.commits_all_time, b.issues_all_time, c.pull_requests_all_time, @@ -80,7 +79,6 @@ def get_repos_in_repo_group(repo_group_id): repo.repo_name, repo.description, repo.repo_git AS url, - repo.repo_status, a.commits_all_time, b.issues_all_time, c.pull_requests_all_time diff --git a/augur/application/cli/backend.py b/augur/application/cli/backend.py index 008afe8b26..8f2099d0f2 100644 --- a/augur/application/cli/backend.py +++ b/augur/application/cli/backend.py @@ -116,28 +116,10 @@ def start(disable_collection, development, port): time.sleep(5) create_collection_status(logger) - + with DatabaseSession(logger) as session: - primaryCollecting = CollectionStatus.core_status == CollectionState.COLLECTING.value - secondaryCollecting = CollectionStatus.secondary_status == CollectionState.COLLECTING.value - - query = session.query(CollectionStatus).filter(or_(primaryCollecting,secondaryCollecting)) - - collection_status_list = execute_session_query(query,'all') - - for status in collection_status_list: - repo = status.repo - repo.repo_name = None - repo.repo_path = None - repo.repo_status = "New" - - status.core_status = "Pending" - status.secondary_status = "Pending" - - #collection_status_list.update({CollectionStatus.core_status: "Pending"}) - #collection_status_list.update({CollectionStatus.secondary_status: "Pending"}) - session.commit() - + clean_collection_status(session) + augur_collection_monitor.si().apply_async() celery_command = "celery -A augur.tasks.init.celery_app.celery_app beat -l debug" @@ -171,14 +153,7 @@ def start(disable_collection, development, port): celery_beat_process.terminate() try: - clear_redis_caches() - connection_string = "" - with DatabaseSession(logger) as session: - config = AugurConfig(logger, session) - connection_string = config.get_section("RabbitMQ")['connection_string'] - - clear_rabbitmq_messages(connection_string) - + cleanup_after_collection_halt(logger) except RedisConnectionError: pass @@ -191,13 +166,7 @@ def stop(): logger = logging.getLogger("augur.cli") _broadcast_signal_to_processes(given_logger=logger) - clear_redis_caches() - connection_string = "" - with DatabaseSession(logger) as session: - config = AugurConfig(logger, session) - connection_string = config.get_section("RabbitMQ")['connection_string'] - - clear_rabbitmq_messages(connection_string) + cleanup_after_collection_halt(logger) @cli.command('kill') def kill(): @@ -207,15 +176,18 @@ def kill(): logger = logging.getLogger("augur.cli") _broadcast_signal_to_processes(broadcast_signal=signal.SIGKILL, given_logger=logger) - clear_redis_caches() + cleanup_after_collection_halt(logger) +def cleanup_after_collection_halt(logger): + clear_redis_caches() connection_string = "" with DatabaseSession(logger) as session: config = AugurConfig(logger, session) connection_string = config.get_section("RabbitMQ")['connection_string'] - clear_rabbitmq_messages(connection_string) + clean_collection_status(session) + clear_rabbitmq_messages(connection_string) def clear_redis_caches(): """Clears the redis databases that celery and redis use.""" @@ -232,6 +204,22 @@ def clear_rabbitmq_messages(connection_string): rabbitmq_purge_command = f"sudo rabbitmqctl purge_queue celery -p {virtual_host_string}" subprocess.call(rabbitmq_purge_command.split(" ")) +#Make sure that database reflects collection status when processes are killed/stopped. +def clean_collection_status(session): + session.execute_sql(s.sql.text(""" + UPDATE augur_operations.collection_status + SET core_status='Pending' + WHERE core_status='Collecting'; + UPDATE augur_operations.collection_status + SET secondary_status='Pending' + WHERE secondary_status='Collecting'; + UPDATE augur_operations.collection_status + SET facade_status='Update', facade_task_id=NULL + WHERE facade_status LIKE '%Collecting%'; + UPDATE augur_operations.collection_status + SET facade_status='Pending' + WHERE facade_status='Failed Clone'; + """)) @cli.command('export-env') def export_env(config): @@ -260,7 +248,18 @@ def repo_reset(augur_app): """ Refresh repo collection to force data collection """ - augur_app.database.execute("UPDATE augur_data.repo SET repo_path = NULL, repo_name = NULL, repo_status = 'New'; TRUNCATE augur_data.commits CASCADE; ") + augur_app.database.execute(s.sql.text("""UPDATE augur_operations.collection_status + SET core_status='Pending'; + UPDATE augur_operations.collection_status + SET secondary_status='Pending'; + UPDATE augur_operations.collection_status + SET facade_status='Update' + WHERE facade_status='Collecting' OR facade_status='Success' OR facade_status='Error'; + UPDATE augur_operations.collection_status + SET facade_status='Pending' + WHERE facade_status='Failed Clone'; + TRUNCATE augur_data.commits CASCADE; + """)) logger.info("Repos successfully reset") diff --git a/augur/application/config.py b/augur/application/config.py index be5cd3ab9e..2eb29d8443 100644 --- a/augur/application/config.py +++ b/augur/application/config.py @@ -38,19 +38,17 @@ def get_development_flag(): }, "Facade": { "check_updates": 1, - "clone_repos": 1, "create_xlsx_summary_files": 1, "delete_marked_repos": 0, "fix_affiliations": 1, - "force_analysis": 1, "force_invalidate_caches": 1, - "force_updates": 1, "limited_run": 0, "multithreaded": 1, "nuke_stored_affiliations": 0, "pull_repos": 1, "rebuild_caches": 1, - "run_analysis": 1 + "run_analysis": 1, + "run_facade_contributors": 1 }, "Server": { "cache_expire": "3600", @@ -77,7 +75,7 @@ def get_development_flag(): "connection_string": "amqp://augur:password123@localhost:5672/augur_vhost" }, "Tasks": { - "collection_interval": 300 + "collection_interval": 60 }, "Message_Insights": { "insight_days": 30, @@ -102,6 +100,7 @@ def get_development_flag(): "prelim_phase": 1, "primary_repo_collect_phase": 1, "secondary_repo_collect_phase": 1, + "facade_phase": 1, "machine_learning_phase": 0 } } diff --git a/augur/application/db/models/augur_data.py b/augur/application/db/models/augur_data.py index 5365dbf37f..fcb7345ccb 100644 --- a/augur/application/db/models/augur_data.py +++ b/augur/application/db/models/augur_data.py @@ -814,14 +814,17 @@ class Repo(Base): ForeignKey("augur_data.repo_groups.repo_group_id"), nullable=False ) repo_git = Column(String, nullable=False) + + #TODO: repo_path and repo_name should be generated columns in postgresql repo_path = Column(String) repo_name = Column(String) repo_added = Column( TIMESTAMP(precision=0), nullable=False, server_default=text("CURRENT_TIMESTAMP") ) - repo_status = Column( - String, nullable=False, server_default=text("'New'::character varying") - ) + + #repo_status = Column( + # String, nullable=False, server_default=text("'New'::character varying") + #) repo_type = Column( String, server_default=text("''::character varying"), @@ -969,7 +972,6 @@ def insert(session, url: str, repo_group_id: int, tool_source): repo_data = { "repo_group_id": repo_group_id, "repo_git": url, - "repo_status": "New", "tool_source": tool_source, "tool_version": "1.0", "data_source": "Git" diff --git a/augur/application/db/models/augur_operations.py b/augur/application/db/models/augur_operations.py index ab7c7d45cd..38fb9d7536 100644 --- a/augur/application/db/models/augur_operations.py +++ b/augur/application/db/models/augur_operations.py @@ -933,6 +933,10 @@ class CollectionStatus(Base): secondary_task_id = Column(String) event_last_collected = Column(TIMESTAMP) + facade_status = Column(String,nullable=False, server_default=text("'Pending'")) + facade_data_last_collected = Column(TIMESTAMP) + facade_task_id = Column(String) + repo = relationship("Repo", back_populates="collection_status") @staticmethod diff --git a/augur/application/schema/alembic/versions/6_change_collectionstatus_table_to_keep_.py b/augur/application/schema/alembic/versions/6_change_collectionstatus_table_to_keep_.py new file mode 100644 index 0000000000..240651b4fb --- /dev/null +++ b/augur/application/schema/alembic/versions/6_change_collectionstatus_table_to_keep_.py @@ -0,0 +1,46 @@ +"""change CollectionStatus table to keep track of facade independently + +Revision ID: 6 +Revises: 5 +Create Date: 2023-02-16 12:45:57.486871 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql +from sqlalchemy.sql import text + +# revision identifiers, used by Alembic. +revision = '6' +down_revision = '5' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + + op.drop_column('repo', 'repo_status', schema='augur_data') + op.add_column('collection_status', sa.Column('facade_status', sa.String(), server_default=sa.text("'Pending'"), nullable=False), schema='augur_operations') + op.add_column('collection_status', sa.Column('facade_data_last_collected', postgresql.TIMESTAMP(), nullable=True), schema='augur_operations') + op.add_column('collection_status', sa.Column('facade_task_id', sa.String(), nullable=True), schema='augur_operations') + + #Add toggle for facade collection. + conn = op.get_bind() + result = conn.execute(text("""SELECT * FROM augur_operations.config WHERE section_name='Task_Routine';""")).fetchall() + if result: + + conn.execute(text(f""" + INSERT INTO "augur_operations"."config" ("section_name", "setting_name", "value", "type") VALUES ('Task_Routine', 'facade_phase', '{1}', 'int'); + INSERT INTO "augur_operations"."config" ("section_name", "setting_name", "value", "type") VALUES ('Facade', 'run_facade_contributors', '{1}', 'int'); + """)) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column('collection_status', 'facade_task_id', schema='augur_operations') + op.drop_column('collection_status', 'facade_data_last_collected', schema='augur_operations') + op.drop_column('collection_status', 'facade_status', schema='augur_operations') + op.add_column('repo', sa.Column('repo_status', sa.VARCHAR(), server_default=sa.text("'New'::character varying"), autoincrement=False, nullable=False), schema='augur_data') + # ### end Alembic commands ### diff --git a/augur/tasks/git/dependency_tasks/tasks.py b/augur/tasks/git/dependency_tasks/tasks.py index 7e151a1b26..2df4ef6aa9 100644 --- a/augur/tasks/git/dependency_tasks/tasks.py +++ b/augur/tasks/git/dependency_tasks/tasks.py @@ -15,10 +15,12 @@ def process_dependency_metrics(repo_git): logger = logging.getLogger(process_dependency_metrics.__name__) with DatabaseSession(logger, engine) as session: + logger.info(f"repo_git: {repo_git}") query = session.query(Repo).filter(Repo.repo_git == repo_git) - repo = execute_session_query(query,'one') + try: + repo = execute_session_query(query,'one') deps_model(session, repo.repo_id) except Exception as e: session.logger.error(f"Could not complete deps_model!\n Reason: {e} \n Traceback: {''.join(traceback.format_exception(None, e, e.__traceback__))}") \ No newline at end of file diff --git a/augur/tasks/git/facade_tasks.py b/augur/tasks/git/facade_tasks.py index 8344c6f50a..e9927a3e5a 100644 --- a/augur/tasks/git/facade_tasks.py +++ b/augur/tasks/git/facade_tasks.py @@ -34,7 +34,7 @@ from augur.application.db import data_parse from augur.tasks.util.AugurUUID import GithubUUID, UnresolvableUUID -from augur.application.db.models import PullRequest, Message, PullRequestReview, PullRequestLabel, PullRequestReviewer, PullRequestEvent, PullRequestMeta, PullRequestAssignee, PullRequestReviewMessageRef, Issue, IssueEvent, IssueLabel, IssueAssignee, PullRequestMessageRef, IssueMessageRef, Contributor, Repo +from augur.application.db.models import PullRequest, Message, PullRequestReview, PullRequestLabel, PullRequestReviewer, PullRequestEvent, PullRequestMeta, PullRequestAssignee, PullRequestReviewMessageRef, Issue, IssueEvent, IssueLabel, IssueAssignee, PullRequestMessageRef, IssueMessageRef, Contributor, Repo, CollectionStatus from augur.tasks.github.util.github_paginator import GithubPaginator, hit_api from augur.tasks.github.util.gh_graphql_entities import PullRequest @@ -64,13 +64,18 @@ def facade_error_handler(request,exc,traceback): #Predefine facade collection with tasks @celery.task -def facade_analysis_init_facade_task(): +def facade_analysis_init_facade_task(repo_id): logger = logging.getLogger(facade_analysis_init_facade_task.__name__) with FacadeSession(logger) as session: session.update_status('Running analysis') session.log_activity('Info',f"Beginning analysis.") + update_project_status = s.sql.text("""UPDATE augur_operations.collection_status + SET facade_status='Collecting' WHERE + repo_id=:repo_id""").bindparams(repo_id=repo_id) + session.execute_sql(update_project_status) + @celery.task def grab_comitters(repo_id,platform="github"): @@ -208,10 +213,6 @@ def update_analysis_log(repos_id,status): for commit in trimmed_commits: trim_commit(session,repo_id,commit) - set_complete = s.sql.text("""UPDATE repo SET repo_status='Complete' WHERE repo_id=:repo_id and repo_status != 'Empty' - """).bindparams(repo_id=repo_id) - - session.execute_sql(set_complete) update_analysis_log(repo_id,'Commit trimming complete') @@ -358,13 +359,6 @@ def rebuild_unknown_affiliation_and_web_caches_facade_task(): with FacadeSession(logger) as session: rebuild_unknown_affiliation_and_web_caches(session) -@celery.task -def force_repo_analysis_facade_task(repo_git): - - logger = logging.getLogger(force_repo_analysis_facade_task.__name__) - - with FacadeSession(logger) as session: - force_repo_analysis(session,repo_git) @celery.task def git_repo_cleanup_facade_task(repo_git): @@ -382,21 +376,16 @@ def git_repo_initialize_facade_task(repo_git): with FacadeSession(logger) as session: git_repo_initialize(session, repo_git) -@celery.task -def check_for_repo_updates_facade_task(repo_git): +#@celery.task +#def check_for_repo_updates_facade_task(repo_git): +# +# from augur.tasks.init.celery_app import engine +# +# logger = logging.getLogger(check_for_repo_updates_facade_task.__name__) +# +# with FacadeSession(logger) as session: +# check_for_repo_updates(session, repo_git) - logger = logging.getLogger(check_for_repo_updates_facade_task.__name__) - - with FacadeSession(logger) as session: - check_for_repo_updates(session, repo_git) - -@celery.task -def force_repo_updates_facade_task(repo_git): - - logger = logging.getLogger(force_repo_updates_facade_task.__name__) - - with FacadeSession(logger) as session: - force_repo_updates(session, repo_git) @celery.task def git_repo_updates_facade_task(repo_git): @@ -436,7 +425,7 @@ def generate_analysis_sequence(logger,repo_git, session): concurrentTasks = int((-1 * (15/(len(repo_ids)+1))) + 15) logger.info(f"Scheduling concurrent layers {concurrentTasks} tasks at a time.") - analysis_sequence.append(facade_analysis_init_facade_task.si()) + analysis_sequence.append(facade_analysis_init_facade_task.si(repo_id)) analysis_sequence.append(grab_comitters.si(repo_id)) @@ -481,21 +470,38 @@ def generate_contributor_sequence(logger,repo_git, session): -def generate_facade_chain(logger,repo_git): +def facade_phase(repo_git): #raise NotImplemented + logger = logging.getLogger(git_repo_initialize_facade_task.__name__) logger.info("Generating facade sequence") with FacadeSession(logger) as session: + #Get the repo_id + repo_list = s.sql.text("""SELECT repo_id,repo_group_id,repo_path,repo_name FROM repo + WHERE repo_git=:value""").bindparams(value=repo_git) + repos = session.fetchall_data_from_sql_text(repo_list) + + start_date = session.get_setting('start_date') + + repo_ids = [repo['repo_id'] for repo in repos] + + repo_id = repo_ids.pop(0) + + #Get the collectionStatus + query = session.query(CollectionStatus).filter(CollectionStatus.repo_id == repo_id) + + status = execute_session_query(query,'one') # Figure out what we need to do limited_run = session.limited_run delete_marked_repos = session.delete_marked_repos pull_repos = session.pull_repos - clone_repos = session.clone_repos + #clone_repos = session.clone_repos check_updates = session.check_updates - force_updates = session.force_updates + #force_updates = session.force_updates run_analysis = session.run_analysis - force_analysis = session.force_analysis + #force_analysis = session.force_analysis + run_facade_contributors = session.run_facade_contributors nuke_stored_affiliations = session.nuke_stored_affiliations fix_affiliations = session.fix_affiliations force_invalidate_caches = session.force_invalidate_caches @@ -509,31 +515,29 @@ def generate_facade_chain(logger,repo_git): facade_sequence = [] - if not limited_run or (limited_run and delete_marked_repos): - facade_sequence.append(git_repo_cleanup_facade_task.si(repo_git))#git_repo_cleanup(session,repo_git_identifiers) + #Currently repos are never deleted + #if not limited_run or (limited_run and delete_marked_repos): + # facade_sequence.append(git_repo_cleanup_facade_task.si(repo_git))#git_repo_cleanup(session,repo_git_identifiers) - if not limited_run or (limited_run and clone_repos): + if 'Pending' in status.facade_status or 'Failed Clone' in status.facade_status: facade_sequence.append(git_repo_initialize_facade_task.si(repo_git))#git_repo_initialize(session,repo_git_identifiers) - if not limited_run or (limited_run and check_updates): - facade_sequence.append(check_for_repo_updates_facade_task.si(repo_git))#check_for_repo_updates(session,repo_git_identifiers) - - if force_updates: - facade_sequence.append(force_repo_updates_facade_task.si(repo_git)) + #TODO: alter this to work with current collection. + #if not limited_run or (limited_run and check_updates): + # facade_sequence.append(check_for_repo_updates_facade_task.si(repo_git))#check_for_repo_updates(session,repo_git_identifiers) if not limited_run or (limited_run and pull_repos): facade_sequence.append(git_repo_updates_facade_task.si(repo_git)) - if force_analysis: - facade_sequence.append(force_repo_analysis_facade_task.si(repo_git)) - #Generate commit analysis task order. - facade_sequence.extend(generate_analysis_sequence(logger,repo_git,session)) + if not limited_run or (limited_run and run_analysis): + facade_sequence.extend(generate_analysis_sequence(logger,repo_git,session)) #Generate contributor analysis task group. - facade_sequence.append(generate_contributor_sequence(logger,repo_git, session)) + if not limited_run or (limited_run and run_facade_contributors): + facade_sequence.append(generate_contributor_sequence(logger,repo_git,session)) + - logger.info(f"Facade sequence: {facade_sequence}") return chain(*facade_sequence) diff --git a/augur/tasks/git/util/facade_worker/facade_worker/facade01config.py b/augur/tasks/git/util/facade_worker/facade_worker/facade01config.py index a405aadcca..64a81e6709 100644 --- a/augur/tasks/git/util/facade_worker/facade_worker/facade01config.py +++ b/augur/tasks/git/util/facade_worker/facade_worker/facade01config.py @@ -123,11 +123,12 @@ def __init__(self,logger: Logger): self.limited_run = worker_options["limited_run"] self.delete_marked_repos = worker_options["delete_marked_repos"] self.pull_repos = worker_options["pull_repos"] - self.clone_repos = worker_options["clone_repos"] + #self.clone_repos = worker_options["clone_repos"] self.check_updates = worker_options["check_updates"] - self.force_updates = worker_options["force_updates"] + #self.force_updates = worker_options["force_updates"] self.run_analysis = worker_options["run_analysis"] - self.force_analysis = worker_options["force_analysis"] + #self.force_analysis = worker_options["force_analysis"] + self.run_facade_contributors = worker_options["run_facade_contributors"] self.nuke_stored_affiliations = worker_options["nuke_stored_affiliations"] self.fix_affiliations = worker_options["fix_affiliations"] self.force_invalidate_caches = worker_options["force_invalidate_caches"] diff --git a/augur/tasks/git/util/facade_worker/facade_worker/facade04postanalysiscleanup.py b/augur/tasks/git/util/facade_worker/facade_worker/facade04postanalysiscleanup.py index f1e4ea23eb..1b2d101115 100644 --- a/augur/tasks/git/util/facade_worker/facade_worker/facade04postanalysiscleanup.py +++ b/augur/tasks/git/util/facade_worker/facade_worker/facade04postanalysiscleanup.py @@ -40,6 +40,7 @@ from augur.application.db.util import execute_session_query from augur.application.db.models import * +#Will delete repos passed and cleanup associated commit data. def git_repo_cleanup(session,repo_git): # Clean up any git repos that are pending deletion @@ -50,7 +51,7 @@ def git_repo_cleanup(session,repo_git): query = session.query(Repo).filter( - Repo.repo_git == repo_git,Repo.repo_status == "Delete")#s.sql.text("""SELECT repo_id,repo_group_id,repo_path,repo_name FROM repo WHERE repo_status='Delete'""") + Repo.repo_git == repo_git)#s.sql.text("""SELECT repo_id,repo_group_id,repo_path,repo_name FROM repo WHERE repo_status='Delete'""") delete_repos = execute_session_query(query,'all')#session.fetchall_data_from_sql_text(query) diff --git a/augur/tasks/git/util/facade_worker/facade_worker/facade05repofetch.py b/augur/tasks/git/util/facade_worker/facade_worker/facade05repofetch.py index 48f0bfc346..f0ef2e46e5 100644 --- a/augur/tasks/git/util/facade_worker/facade_worker/facade05repofetch.py +++ b/augur/tasks/git/util/facade_worker/facade_worker/facade05repofetch.py @@ -41,51 +41,26 @@ from augur.application.db.models.augur_data import * from augur.application.db.util import execute_session_query, convert_orm_list_to_dict_list -def git_repo_initialize(session, repo_git,repo_group_id=None): +def git_repo_initialize(session, repo_git): # Select any new git repos so we can set up their locations and git clone - # Select any new git repos so we can set up their locations and git clone - new_repos = [] - if repo_group_id is None: - session.update_status('Fetching non-cloned repos') - session.log_activity('Info','Fetching non-cloned repos') + + session.update_status('Fetching non-cloned repos') + session.log_activity('Info','Fetching non-cloned repos') - query = s.sql.text("""SELECT repo_id,repo_group_id,repo_git FROM repo WHERE repo_status LIKE 'New%' - AND repo_git=:value""").bindparams(value=repo_git) - - #Get data as a list of dicts - new_repos = session.fetchall_data_from_sql_text(query)#list(cfg.cursor) - session.log_activity('Info', f'SPG new_repos is {new_repos}') - + #Get data as a list of dicts + #new_repos = session.fetchall_data_from_sql_text(query)#list(cfg.cursor) + row = Repo.get_by_repo_git(session, repo_git) - else: - session.update_status(f"Fetching repos with repo group id: {repo_group_id}") - session.log_activity('Info',f"Fetching repos with repo group id: {repo_group_id}") - #query = s.sql.text("""SELECT repo_id,repo_group_id,repo_git FROM repo WHERE repo_status LIKE 'New%'""") - - query = session.query(Repo).filter('New' in Repo.repo_status, Repo.repo_git == repo_git) - result = execute_session_query(query, 'all') + if row: - session.log_activity('Info',f'SPG result is {result}') + session.log_activity('Info',f"Fetching repos with repo group id: {row.repo_group_id}") - for repo in result: - repo_dict = repo.__dict__ - try: - del repo_dict['_sa_instance_state'] - except: - pass - - new_repos.append(repo_dict) + update_repo_log(session, row.repo_id,'Cloning') - for row in new_repos: - - session.log_activity('Info',f"Fetching repos with repo group id: {row['repo_group_id']}") - - update_repo_log(session, row['repo_id'],'Cloning') - - git = html.unescape(row['repo_git']) + git = html.unescape(row.repo_git) # Strip protocol from remote URL, set a unique path on the filesystem if git.find('://',0) > 0: @@ -100,7 +75,7 @@ def git_repo_initialize(session, repo_git,repo_group_id=None): # Get the full path to the directory where we'll clone the repo - repo_path = (f"{session.repo_base_directory}{row['repo_group_id']}/{repo_relative_path}") + repo_path = (f"{session.repo_base_directory}{row.repo_group_id}/{repo_relative_path}") session.log_activity('Info',f"Repo Path from facade05, line 86: {repo_path}") @@ -113,7 +88,7 @@ def git_repo_initialize(session, repo_git,repo_group_id=None): # Check if there will be a storage path collision query = s.sql.text("""SELECT NULL FROM repo WHERE CONCAT(repo_group_id,'/',repo_path,repo_name) = :repo_group_id - """).bindparams(repo_group_id=f"{row['repo_group_id']}/{repo_relative_path}{repo_name}") + """).bindparams(repo_group_id=f"{row.repo_group_id}/{repo_relative_path}{repo_name}") result = session.fetchall_data_from_sql_text(query) @@ -143,18 +118,24 @@ def git_repo_initialize(session, repo_git,repo_group_id=None): if return_code != 0: print("COULD NOT CREATE REPO DIRECTORY") - update_repo_log(session, row['repo_id'],'Failed (mkdir)') + update_repo_log(session, row.repo_id,'Failed (mkdir)') session.update_status(f"Failed (mkdir {repo_path})") session.log_activity('Error',f"Could not create repo directory: {repo_path}" ) raise Exception("Could not create git repo's prerequisite directories. " " Do you have write access?") - update_repo_log(session, row['repo_id'],'New (cloning)') + update_repo_log(session, row.repo_id,'New (cloning)') - query = s.sql.text("""UPDATE repo SET repo_status='New (Initializing)', repo_path=:pathParam, - repo_name=:nameParam WHERE repo_id=:idParam and repo_status != 'Empty' - """).bindparams(pathParam=repo_relative_path,nameParam=repo_name,idParam=row['repo_id']) + query = s.sql.text("""UPDATE repo SET repo_path=:pathParam, + repo_name=:nameParam WHERE repo_id=:idParam + """).bindparams(pathParam=repo_relative_path,nameParam=repo_name,idParam=row.repo_id) + + session.execute_sql(query) + + query = s.sql.text("""UPDATE augur_operations.collection_status + SET facade_status='Collecting (Initializing)' + WHERE repo_id=:idParam""").bindparams(idParam=row.repo_id) session.execute_sql(query) @@ -168,26 +149,27 @@ def git_repo_initialize(session, repo_git,repo_group_id=None): # Mark the entire project for an update, so that under normal # circumstances caches are rebuilt only once per waiting period. - update_project_status = s.sql.text("""UPDATE repo SET repo_status='Update' WHERE - repo_group_id=:repo_group_id AND repo_status != 'Empty' AND repo_id=:repo_id""").bindparams(repo_group_id=row['repo_group_id'], repo_id=row["repo_id"]) + update_project_status = s.sql.text("""UPDATE augur_operations.collection_status + SET facade_status='Update' WHERE + repo_id=:repo_id""").bindparams(repo_id=row.repo_id) session.execute_sql(update_project_status) # Since we just cloned the new repo, set it straight to analyze. - query = s.sql.text("""UPDATE repo SET repo_status='Analyze',repo_path=:repo_path, repo_name=:repo_name - WHERE repo_id=:repo_id and repo_status != 'Empty' - """).bindparams(repo_path=repo_relative_path,repo_name=repo_name,repo_id=row['repo_id']) + query = s.sql.text("""UPDATE repo SET repo_path=:repo_path, repo_name=:repo_name + WHERE repo_id=:repo_id + """).bindparams(repo_path=repo_relative_path,repo_name=repo_name,repo_id=row.repo_id) session.execute_sql(query) - update_repo_log(session, row['repo_id'],'Up-to-date') + update_repo_log(session, row.repo_id,'Up-to-date') session.log_activity('Info',f"Cloned {git}") else: # If cloning failed, log it and set the status back to new - update_repo_log(session, row['repo_id'],f"Failed ({return_code})") + update_repo_log(session, row.repo_id,f"Failed ({return_code})") - query = s.sql.text("""UPDATE repo SET repo_status='New (failed)' WHERE repo_id=:repo_id and repo_status !='Empty' - """).bindparams(repo_id=row['repo_id']) + query = s.sql.text("""UPDATE augur_operations.collection_status SET facade_status='Failed Clone' WHERE repo_id=:repo_id + """).bindparams(repo_id=row.repo_id) session.execute_sql(query) @@ -197,12 +179,14 @@ def git_repo_initialize(session, repo_git,repo_group_id=None): session.log_activity('Info', f"Fetching new repos (complete)") - + +#Deprecated functionality. No longer used +#Should be re-purposed in start_tasks when tasks are being scheduled def check_for_repo_updates(session,repo_git): # Check the last time a repo was updated and if it has been longer than the # update_frequency, mark its project for updating during the next analysis. - + raise NotImplementedError("This functionality is deprecated and won't work with present facade versions") session.update_status('Checking if any repos need to update') session.log_activity('Info','Checking repos to update') @@ -264,7 +248,9 @@ def check_for_repo_updates(session,repo_git): session.log_activity('Info','Checking repos to update (complete)') +#Deprecated. No longer used. def force_repo_updates(session,repo_git): + raise NotImplementedError("This functionality is deprecated and won't work with present facade versions") # Set the status of all non-new repos to "Update". @@ -278,7 +264,9 @@ def force_repo_updates(session,repo_git): session.log_activity('Info','Forcing repos to update (complete)') +#Deprecated. No longer used. def force_repo_analysis(session,repo_git): + raise NotImplementedError("This functionality is deprecated and won't work with present facade versions") session.update_status('Forcing all non-new repos to be analyzed') session.log_activity('Info','Forcing repos to be analyzed') @@ -302,7 +290,7 @@ def git_repo_updates(session,repo_git): #query = s.sql.text("""SELECT repo_id,repo_group_id,repo_git,repo_name,repo_path FROM repo WHERE # repo_status='Update'""") query = session.query(Repo).filter( - Repo.repo_git == repo_git,Repo.repo_status == 'Update') + Repo.repo_git == repo_git) result = execute_session_query(query, 'all') try: @@ -451,11 +439,6 @@ def git_repo_updates(session,repo_git): if return_code == 0: - set_to_analyze = s.sql.text("""UPDATE repo SET repo_status='Analyze' WHERE repo_id=:repo_id and repo_status != 'Empty AND repo_id=:repo_id' - """).bindparams(repo_id=row['repo_id']) - - session.execute_sql(set_to_analyze) - update_repo_log(session, row['repo_id'],'Up-to-date') session.log_activity('Verbose',f"Updated {row['repo_git']}") diff --git a/augur/tasks/github/detect_move/core.py b/augur/tasks/github/detect_move/core.py index 97c456c8d2..7183607d04 100644 --- a/augur/tasks/github/detect_move/core.py +++ b/augur/tasks/github/detect_move/core.py @@ -66,7 +66,6 @@ def ping_github_for_repo_move(session,repo, logger): 'repo_git': f"https://github.com/{owner}/{name}", 'repo_path': None, 'repo_name': None, - 'repo_status': 'New', 'description': f"(Originally hosted at {url}) {old_description}" } diff --git a/augur/tasks/init/celery_app.py b/augur/tasks/init/celery_app.py index 1a238ebbd0..ee9c9d642d 100644 --- a/augur/tasks/init/celery_app.py +++ b/augur/tasks/init/celery_app.py @@ -137,7 +137,7 @@ def setup_periodic_tasks(sender, **kwargs): sender.add_periodic_task(collection_interval, augur_collection_monitor.s()) #Do longer tasks less often - non_domain_collection_interval = collection_interval * 5 + non_domain_collection_interval = collection_interval * 300 logger.info(f"Scheduling non-repo-domain collection every {non_domain_collection_interval/60} minutes") sender.add_periodic_task(non_domain_collection_interval, non_repo_domain_tasks.s()) diff --git a/augur/tasks/start_tasks.py b/augur/tasks/start_tasks.py index 8af4c78194..669c392fda 100644 --- a/augur/tasks/start_tasks.py +++ b/augur/tasks/start_tasks.py @@ -46,6 +46,8 @@ class CollectionState(Enum): PENDING = "Pending" ERROR = "Error" COLLECTING = "Collecting" + UPDATE = "Update" + FAILED_CLONE = "Failed Clone" """ @celery.task(bind=True) @@ -103,15 +105,45 @@ def secondary_task_success(repo_git): session.commit() +@celery.task +def facade_task_success(repo_git): + + from augur.tasks.init.celery_app import engine + + logger = logging.getLogger(facade_task_success.__name__) + + logger.info(f"Repo '{repo_git}' succeeded through facade task collection") + + with DatabaseSession(logger, engine) as session: + + repo = Repo.get_by_repo_git(session, repo_git) + if not repo: + raise Exception(f"Task with repo_git of {repo_git} but could not be found in Repo table") + + collection_status = repo.collection_status[0] + + collection_status.facade_status = CollectionState.SUCCESS.value + collection_status.facade_data_last_collected = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') + collection_status.facade_task_id = None + + session.commit() + @celery.task def task_failed(request,exc,traceback): from augur.tasks.init.celery_app import engine logger = logging.getLogger(task_failed.__name__) + + # log traceback to error file + logger.error(f"Task {request.id} raised exception: {exc}\n{traceback}") with DatabaseSession(logger,engine) as session: - query = session.query(CollectionStatus).filter(CollectionStatus.core_task_id == request.id) + core_id_match = CollectionStatus.core_task_id == request.id + secondary_id_match = CollectionStatus.secondary_task_id == request.id + facade_id_match = CollectionStatus.facade_task_id == request.id + + query = session.query(CollectionStatus).filter(or_(core_id_match,secondary_id_match,facade_id_match)) collectionRecord = execute_session_query(query,'one') @@ -125,18 +157,23 @@ def task_failed(request,exc,traceback): except Exception as e: logger.error(f"Could not mutate request chain! \n Error: {e}") - if collectionRecord.core_status == CollectionState.COLLECTING.value: + if collectionRecord.core_task_id == request.id: # set status to Error in db - collectionRecord.core_status = CollectionStatus.ERROR - session.commit() + collectionRecord.core_status = CollectionStatus.ERROR.value + collectionRecord.core_task_id = None + - if collectionRecord.secondary_status == CollectionState.COLLECTING.value: + if collectionRecord.secondary_task_id == request.id: # set status to Error in db - collectionRecord.secondary_status = CollectionStatus.ERROR - session.commit() - - # log traceback to error file - session.logger.error(f"Task {request.id} raised exception: {exc}\n{traceback}") + collectionRecord.secondary_status = CollectionStatus.ERROR.value + collectionRecord.secondary_task_id = None + + + if collectionRecord.facade_task_id == request.id: + collectionRecord.facade_status = CollectionStatus.ERROR.value + collectionRecord.facade_task_id = None + + session.commit() @@ -174,9 +211,9 @@ def primary_repo_collect_phase(repo_git): ) repo_task_group = group( - #repo_info_task, + repo_info_task, chain(primary_repo_jobs,secondary_repo_jobs,process_contributors.si()), - generate_facade_chain(logger,repo_git), + #facade_phase(logger,repo_git), collect_releases.si(repo_git) ) @@ -239,8 +276,6 @@ def start_data_collection(self): self.logger.info(f"Enabled phases: {list(self.jobs_dict.keys())}") augur_collection_list = [] - - for repo_git in self.repos: @@ -266,12 +301,23 @@ def start_data_collection(self): #yield the value of the task_id to the calling method so that the proper collectionStatus field can be updated yield repo_git, task_id +def get_enabled_phase_names_from_config(logger, session): + + config = AugurConfig(logger, session) + phase_options = config.get_section("Task_Routine") + + #Get list of enabled phases + enabled_phase_names = [name for name, phase in phase_options.items() if phase == 1] + + return enabled_phase_names + + + @celery.task def non_repo_domain_tasks(): from augur.tasks.init.celery_app import engine - logger = logging.getLogger(non_repo_domain_tasks.__name__) logger.info("Executing non-repo domain tasks") @@ -279,14 +325,7 @@ def non_repo_domain_tasks(): enabled_phase_names = [] with DatabaseSession(logger, engine) as session: - max_repo_count = 500 - days = 30 - - config = AugurConfig(logger, session) - phase_options = config.get_section("Task_Routine") - - #Get list of enabled phases - enabled_phase_names = [name for name, phase in phase_options.items() if phase == 1] + enabled_phase_names = get_enabled_phase_names_from_config(session.logger, session) #Disable augur from running these tasks more than once unless requested query = s.sql.text(""" @@ -321,120 +360,170 @@ def get_collection_status_repo_git_from_filter(session,filter_condition,limit): return [status.repo.repo_git for status in repo_status_list] -@celery.task -def augur_collection_monitor(): +def start_primary_collection(session,max_repo,days): - from augur.tasks.init.celery_app import engine + #Get list of enabled phases + enabled_phase_names = get_enabled_phase_names_from_config(session.logger, session) - logger = logging.getLogger(augur_collection_monitor.__name__) + #Primary collection hook. + primary_enabled_phases = [] - logger.info("Checking for repos to collect") + #Primary jobs + if prelim_phase.__name__ in enabled_phase_names: + primary_enabled_phases.append(prelim_phase) + + if primary_repo_collect_phase.__name__ in enabled_phase_names: + primary_enabled_phases.append(primary_repo_collect_phase) - #Get phase options from the config - with DatabaseSession(logger, engine) as session: + #task success is scheduled no matter what the config says. + def core_task_success_gen(repo_git): + return core_task_success.si(repo_git) + + primary_enabled_phases.append(core_task_success_gen) + + active_repo_count = len(session.query(CollectionStatus).filter(CollectionStatus.core_status == CollectionState.COLLECTING.value).all()) - max_repo_primary_count = 50 - days = 30 + cutoff_date = datetime.datetime.now() - datetime.timedelta(days=days) + not_erroed = CollectionStatus.core_status != str(CollectionState.ERROR.value) + not_collecting = CollectionStatus.core_status != str(CollectionState.COLLECTING.value) + never_collected = CollectionStatus.core_data_last_collected == None + old_collection = CollectionStatus.core_data_last_collected <= cutoff_date - config = AugurConfig(logger, session) - phase_options = config.get_section("Task_Routine") + limit = max_repo-active_repo_count - #Get list of enabled phases - enabled_phase_names = [name for name, phase in phase_options.items() if phase == 1] + #Get repos for primary collection hook + repo_git_identifiers = get_collection_status_repo_git_from_filter(session,and_(not_erroed, not_collecting, or_(never_collected, old_collection)),limit) - #Primary collection hook. - primary_enabled_phases = [] + session.logger.info(f"Starting primary collection on {len(repo_git_identifiers)} repos") - #Primary jobs - if prelim_phase.__name__ in enabled_phase_names: - primary_enabled_phases.append(prelim_phase) - - if primary_repo_collect_phase.__name__ in enabled_phase_names: - primary_enabled_phases.append(primary_repo_collect_phase) + session.logger.info(f"Primary collection starting for: {tuple(repo_git_identifiers)}") - #task success is scheduled no matter what the config says. - def core_task_success_gen(repo_git): - return core_task_success.si(repo_git) - - primary_enabled_phases.append(core_task_success_gen) + primary_augur_collection = AugurTaskRoutine(session,repos=repo_git_identifiers,collection_phases=primary_enabled_phases) + + #Start data collection and update the collectionStatus with the task_ids + for repo_git, task_id in primary_augur_collection.start_data_collection(): - active_repo_count = len(session.query(CollectionStatus).filter(CollectionStatus.core_status == CollectionState.COLLECTING.value).all()) + repo = session.query(Repo).filter(Repo.repo_git == repo_git).one() - cutoff_date = datetime.datetime.now() - datetime.timedelta(days=days) - not_erroed = CollectionStatus.core_status != str(CollectionState.ERROR.value) - not_collecting = CollectionStatus.core_status != str(CollectionState.COLLECTING.value) - never_collected = CollectionStatus.core_data_last_collected == None - old_collection = CollectionStatus.core_data_last_collected <= cutoff_date + #set status in database to collecting + repoStatus = repo.collection_status[0] + repoStatus.core_task_id = task_id + #repoStatus.secondary_task_id = task_id + repoStatus.core_status = CollectionState.COLLECTING.value + session.commit() - limit = max_repo_primary_count-active_repo_count +def start_secondary_collection(session,max_repo,days): - #Get repos for primary collection hook - repo_git_identifiers = get_collection_status_repo_git_from_filter(session,and_(not_erroed, not_collecting, or_(never_collected, old_collection)),limit) + #Get list of enabled phases + enabled_phase_names = get_enabled_phase_names_from_config(session.logger, session) - logger.info(f"Starting primary collection on {len(repo_git_identifiers)} repos") + #Deal with secondary collection + secondary_enabled_phases = [] - logger.info(f"Primary collection starting for: {tuple(repo_git_identifiers)}") + if prelim_phase.__name__ in enabled_phase_names: + secondary_enabled_phases.append(prelim_phase) - primary_augur_collection = AugurTaskRoutine(session,repos=repo_git_identifiers,collection_phases=primary_enabled_phases) + if secondary_repo_collect_phase.__name__ in enabled_phase_names: + secondary_enabled_phases.append(secondary_repo_collect_phase) - #Start data collection and update the collectionStatus with the task_ids - for repo_git, task_id in primary_augur_collection.start_data_collection(): - - repo = session.query(Repo).filter(Repo.repo_git == repo_git).one() - - #set status in database to collecting - repoStatus = repo.collection_status[0] - repoStatus.core_task_id = task_id - #repoStatus.secondary_task_id = task_id - repoStatus.core_status = CollectionState.COLLECTING.value - session.commit() + def secondary_task_success_gen(repo_git): + return secondary_task_success.si(repo_git) + + secondary_enabled_phases.append(secondary_task_success_gen) + + active_repo_count = len(session.query(CollectionStatus).filter(CollectionStatus.secondary_status == CollectionState.COLLECTING.value).all()) + + cutoff_date = datetime.datetime.now() - datetime.timedelta(days=days) + not_erroed = CollectionStatus.secondary_status != str(CollectionState.ERROR.value) + not_collecting = CollectionStatus.secondary_status != str(CollectionState.COLLECTING.value) + never_collected = CollectionStatus.secondary_data_last_collected == None + old_collection = CollectionStatus.secondary_data_last_collected <= cutoff_date + primary_collected = CollectionStatus.core_status == str(CollectionState.SUCCESS.value) + + limit = max_repo-active_repo_count + + repo_git_identifiers = get_collection_status_repo_git_from_filter(session,and_(primary_collected,not_erroed, not_collecting, or_(never_collected, old_collection)),limit) + + session.logger.info(f"Starting secondary collection on {len(repo_git_identifiers)} repos") + + session.logger.info(f"Secondary collection starting for: {tuple(repo_git_identifiers)}") + + secondary_augur_collection = AugurTaskRoutine(session,repos=repo_git_identifiers,collection_phases=secondary_enabled_phases) + + #Start data collection and update the collectionStatus with the task_ids + for repo_git, task_id in secondary_augur_collection.start_data_collection(): - #Deal with secondary collection - secondary_enabled_phases = [] + repo = session.query(Repo).filter(Repo.repo_git == repo_git).one() - if prelim_phase.__name__ in enabled_phase_names: - secondary_enabled_phases.append(prelim_phase) + #set status in database to collecting + repoStatus = repo.collection_status[0] + repoStatus.secondary_task_id = task_id + repoStatus.secondary_status = CollectionState.COLLECTING.value + session.commit() - if secondary_repo_collect_phase.__name__ in enabled_phase_names: - secondary_enabled_phases.append(secondary_repo_collect_phase) +def start_facade_collection(session,max_repo,days): - def secondary_task_success_gen(repo_git): - return secondary_task_success.si(repo_git) + #Get list of enabled phases + enabled_phase_names = get_enabled_phase_names_from_config(session.logger, session) - secondary_enabled_phases.append(secondary_task_success_gen) + #Deal with secondary collection + facade_enabled_phases = [] + if prelim_phase.__name__ in enabled_phase_names: + facade_enabled_phases.append(prelim_phase) - max_repo_secondary_count = 30 - active_repo_count = len(session.query(CollectionStatus).filter(CollectionStatus.secondary_status == CollectionState.COLLECTING.value).all()) + if facade_phase.__name__ in enabled_phase_names: + facade_enabled_phases.append(facade_phase) - cutoff_date = datetime.datetime.now() - datetime.timedelta(days=days) - not_erroed = CollectionStatus.secondary_status != str(CollectionState.ERROR.value) - not_collecting = CollectionStatus.secondary_status != str(CollectionState.COLLECTING.value) - never_collected = CollectionStatus.secondary_data_last_collected == None - old_collection = CollectionStatus.secondary_data_last_collected <= cutoff_date - primary_collected = CollectionStatus.core_status == str(CollectionState.SUCCESS.value) + def facade_task_success_gen(repo_git): + return facade_task_success.si(repo_git) - limit = max_repo_secondary_count-active_repo_count + facade_enabled_phases.append(facade_task_success_gen) - repo_git_identifiers = get_collection_status_repo_git_from_filter(session,and_(primary_collected,not_erroed, not_collecting, or_(never_collected, old_collection)),limit) + active_repo_count = len(session.query(CollectionStatus).filter(CollectionStatus.facade_task_id != None).all()) - logger.info(f"Starting secondary collection on {len(repo_git_identifiers)} repos") + cutoff_date = datetime.datetime.now() - datetime.timedelta(days=days) + not_erroed = CollectionStatus.facade_status != str(CollectionState.ERROR.value) + not_collecting = CollectionStatus.facade_task_id == None + never_collected = CollectionStatus.facade_data_last_collected == None + old_collection = CollectionStatus.facade_data_last_collected <= cutoff_date - logger.info(f"Secondary collection starting for: {tuple(repo_git_identifiers)}") + limit = max_repo-active_repo_count - secondary_augur_collection = AugurTaskRoutine(session,repos=repo_git_identifiers,collection_phases=secondary_enabled_phases) + repo_git_identifiers = get_collection_status_repo_git_from_filter(session,and_(not_erroed, not_collecting, or_(never_collected, old_collection)),limit) - #Start data collection and update the collectionStatus with the task_ids - for repo_git, task_id in secondary_augur_collection.start_data_collection(): - - repo = session.query(Repo).filter(Repo.repo_git == repo_git).one() - - #set status in database to collecting - repoStatus = repo.collection_status[0] - #repoStatus.core_task_id = task_id - repoStatus.secondary_task_id = task_id - repoStatus.secondary_status = CollectionState.COLLECTING.value - session.commit() + session.logger.info(f"Starting facade collection on {len(repo_git_identifiers)} repos") + + session.logger.info(f"Facade collection starting for: {tuple(repo_git_identifiers)}") + + facade_augur_collection = AugurTaskRoutine(session,repos=repo_git_identifiers,collection_phases=facade_enabled_phases) + + #Start data collection and update the collectionStatus with the task_ids + for repo_git, task_id in facade_augur_collection.start_data_collection(): + + repo = session.query(Repo).filter(Repo.repo_git == repo_git).one() + + #set status in database to collecting + repoStatus = repo.collection_status[0] + repoStatus.facade_task_id = task_id + session.commit() + +@celery.task +def augur_collection_monitor(): + + from augur.tasks.init.celery_app import engine + + logger = logging.getLogger(augur_collection_monitor.__name__) + + logger.info("Checking for repos to collect") + + with DatabaseSession(logger, engine) as session: + + start_primary_collection(session, max_repo=50, days=30) + + start_secondary_collection(session, max_repo=30, days=30) + + start_facade_collection(session, max_repo=30, days=30) diff --git a/augur/util/repo_load_controller.py b/augur/util/repo_load_controller.py index 934e435a70..0ded0268c4 100644 --- a/augur/util/repo_load_controller.py +++ b/augur/util/repo_load_controller.py @@ -233,7 +233,6 @@ def generate_repo_query(self, source, count, **kwargs): select = """ DISTINCT(augur_data.repo.repo_id), augur_data.repo.description, augur_data.repo.repo_git AS url, - augur_data.repo.repo_status, a.commits_all_time, b.issues_all_time, rg_name, diff --git a/tests/test_applicaton/test_repo_load_controller/helper.py b/tests/test_applicaton/test_repo_load_controller/helper.py index 640819eb2c..11ac16640f 100644 --- a/tests/test_applicaton/test_repo_load_controller/helper.py +++ b/tests/test_applicaton/test_repo_load_controller/helper.py @@ -91,9 +91,9 @@ def add_keys_to_test_db(test_db_engine): ######## Helper Functions to get insert statements ################# -def get_repo_insert_statement(repo_id, rg_id, repo_url="place holder url", repo_status="New"): +def get_repo_insert_statement(repo_id, rg_id, repo_url="place holder url"): - return """INSERT INTO "augur_data"."repo" ("repo_id", "repo_group_id", "repo_git", "repo_path", "repo_name", "repo_added", "repo_status", "repo_type", "url", "owner_id", "description", "primary_language", "created_at", "forked_from", "updated_at", "repo_archived_date_collected", "repo_archived", "tool_source", "tool_version", "data_source", "data_collection_date") VALUES ({}, {}, '{}', NULL, NULL, '2022-08-15 21:08:07', '{}', '', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'CLI', '1.0', 'Git', '2022-08-15 21:08:07');""".format(repo_id, rg_id, repo_url, repo_status) + return """INSERT INTO "augur_data"."repo" ("repo_id", "repo_group_id", "repo_git", "repo_path", "repo_name", "repo_added", "repo_type", "url", "owner_id", "description", "primary_language", "created_at", "forked_from", "updated_at", "repo_archived_date_collected", "repo_archived", "tool_source", "tool_version", "data_source", "data_collection_date") VALUES ({}, {}, '{}', NULL, NULL, '2022-08-15 21:08:07', '', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'CLI', '1.0', 'Git', '2022-08-15 21:08:07');""".format(repo_id, rg_id, repo_url) def get_user_repo_insert_statement(repo_id, group_id): diff --git a/tests/test_applicaton/test_repo_load_controller/util.py b/tests/test_applicaton/test_repo_load_controller/util.py index b77cdb8bfe..1283e7580e 100644 --- a/tests/test_applicaton/test_repo_load_controller/util.py +++ b/tests/test_applicaton/test_repo_load_controller/util.py @@ -76,9 +76,9 @@ def add_keys_to_test_db(test_db_engine): ######## Helper Functions to get insert statements ################# -def get_repo_insert_statement(repo_id, rg_id, repo_url="place holder url", repo_status="New"): +def get_repo_insert_statement(repo_id, rg_id, repo_url="place holder url"): - return """INSERT INTO "augur_data"."repo" ("repo_id", "repo_group_id", "repo_git", "repo_path", "repo_name", "repo_added", "repo_status", "repo_type", "url", "owner_id", "description", "primary_language", "created_at", "forked_from", "updated_at", "repo_archived_date_collected", "repo_archived", "tool_source", "tool_version", "data_source", "data_collection_date") VALUES ({}, {}, '{}', NULL, NULL, '2022-08-15 21:08:07', '{}', '', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'CLI', '1.0', 'Git', '2022-08-15 21:08:07');""".format(repo_id, rg_id, repo_url, repo_status) + return """INSERT INTO "augur_data"."repo" ("repo_id", "repo_group_id", "repo_git", "repo_path", "repo_name", "repo_added", "repo_type", "url", "owner_id", "description", "primary_language", "created_at", "forked_from", "updated_at", "repo_archived_date_collected", "repo_archived", "tool_source", "tool_version", "data_source", "data_collection_date") VALUES ({}, {}, '{}', NULL, NULL, '2022-08-15 21:08:07', '', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'CLI', '1.0', 'Git', '2022-08-15 21:08:07');""".format(repo_id, rg_id, repo_url) def get_repo_group_insert_statement(rg_id):