Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Patch Augur-new's task scheduling mechanism #1975

Merged
merged 2 commits into from
Sep 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions augur/tasks/github/events/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@


@celery.task
def collect_events(repo_id: int):
def collect_events(repo_git: str):

logger = logging.getLogger(collect_events.__name__)

# define GithubTaskSession to handle insertions, and store oauth keys
with GithubTaskSession(logger) as session:

repo_obj = session.query(Repo).filter(Repo.repo_id == repo_id).one()
repo_obj = session.query(Repo).filter(Repo.repo_git == repo_git).one()
repo_id = repo_obj.repo_id
repo_git = repo_obj.repo_git

owner, repo = get_owner_repo(repo_git)

logger.info(f"Collecting Github events for {owner}/{repo}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def request_dict_from_endpoint(session, url, timeout_wait=10):
err = process_dict_response(session.logger,response,response_data)

#If we get an error message that's not None
if err:
if err and err != GithubApiResult.NEW_RESULT:
continue

success = True
Expand Down
8 changes: 4 additions & 4 deletions augur/tasks/github/issues/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,17 @@


@celery.task
def collect_issues(repo_id: int) -> None:
def collect_issues(repo_git: str) -> None:

logger = logging.getLogger(collect_issues.__name__)
owner, repo = get_owner_repo(repo_git)

# define GithubTaskSession to handle insertions, and store oauth keys
with GithubTaskSession(logger) as session:

repo_obj = session.query(Repo).filter(Repo.repo_id == repo_id).one()
repo_obj = session.query(Repo).filter(Repo.repo_git == repo_git).one()
repo_id = repo_obj.repo_id
repo_git = repo_obj.repo_git
owner, repo = get_owner_repo(repo_git)


issue_data = retrieve_all_issue_data(repo_git, logger)

Expand Down
6 changes: 3 additions & 3 deletions augur/tasks/github/messages/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@


@celery.task
def collect_github_messages(repo_id: int) -> None:
def collect_github_messages(repo_git: str) -> None:

logger = logging.getLogger(collect_github_messages.__name__)

with GithubTaskSession(logger, engine) as session:

repo_git = session.query(Repo).filter(
Repo.repo_id == repo_id).one().repo_git
repo_id = session.query(Repo).filter(
Repo.repo_git == repo_git).one().repo_id

owner, repo = get_owner_repo(repo_git)
message_data = retrieve_all_pr_and_issue_messages(repo_git, logger)
Expand Down
6 changes: 3 additions & 3 deletions augur/tasks/github/pull_requests/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@


@celery.task
def collect_pull_requests(repo_id: int) -> None:
def collect_pull_requests(repo_git: str) -> None:

logger = logging.getLogger(collect_pull_requests.__name__)

with GithubTaskSession(logger, engine) as session:

repo_git = session.query(Repo).filter(
Repo.repo_id == repo_id).one().repo_git
repo_id = session.query(Repo).filter(
Repo.repo_git == repo_git).one().repo_id

owner, repo = get_owner_repo(repo_git)
pr_data = retrieve_all_pr_data(repo_git, logger)
Expand Down
5 changes: 2 additions & 3 deletions augur/tasks/github/util/github_paginator.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@ def hit_api(key_manager, url: str, logger: logging.Logger, timeout: float = 10,
logger.info(f"Request timed out. Sleeping {round(timeout)} seconds and trying again...\n")
time.sleep(round(timeout))
return None
except httpx.ReadError:
logger.info(f"Request timed out. Sleeping {round(timeout)} seconds and trying again...\n")
logger.debug("Read error timeout")
except httpx.NetworkError:
logger.info(f"Network Error. Sleeping {round(timeout)} seconds and trying again...\n")
time.sleep(round(timeout))
return None

Expand Down
141 changes: 33 additions & 108 deletions augur/tasks/start_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from augur.tasks.git.facade_tasks import *
# from augur.tasks.data_analysis import *
from augur.tasks.init.celery_app import celery_app as celery
from celery.result import allow_join_result
from augur.application.logs import AugurLogger
from augur.application.db.session import DatabaseSession
from augur.tasks.init.celery_app import engine
Expand All @@ -26,13 +27,31 @@

#Predefine phases. For new phases edit this and the config to reflect.
#The domain of tasks ran should be very explicit.
class AugurTaskPhase(Enum):
"""All the differant phases of collection augur goes through in sequence"""
def prelim_phase(logger):
preliminary_task_list = [detect_github_repo_move.si()]
preliminary_tasks = group(preliminary_task_list)
return preliminary_tasks

def repo_collect_phase(logger):
#store all tasks that taks a repo as an argument
tasks_with_repo_domain = []
#A chain is needed for each repo.
with DatabaseSession(logger) as session:
repos = session.query(Repo).all()

PRELIMINARY = "Preliminary"
REPO_COLLECT = "Repo_collect"
MACHINE_LEARNING = "Machine_learning"
POST_PHASE = "Post_phase"
for repo in repos:
first_tasks_repo = group(collect_issues.si(repo.repo_git),collect_pull_requests.si(repo.repo_git))
second_tasks_repo = group(collect_events.si(repo.repo_git),collect_github_messages.si(repo.repo_git))

repo_chain = chain(first_tasks_repo,second_tasks_repo)
tasks_with_repo_domain.append(repo_chain)

return group(
chain(group(*tasks_with_repo_domain),process_contributors.si()),
facade_commits_model.si(),
collect_releases.si(),
collect_repo_info.si()
)


class AugurTaskRoutine:
Expand All @@ -53,30 +72,9 @@ def __init__(self,disabled_collection_phases: List[str]=[], disabled_collection_

#Assemble default phases
#These will then be able to be overridden through the config.
preliminary_task_list = [detect_github_repo_move.si()]

preliminary_tasks = group(preliminary_task_list)
self.jobs_dict[AugurTaskPhase.PRELIMINARY] = preliminary_tasks

#store all tasks that taks a repo as an argument
tasks_with_repo_domain = []
#A chain is needed for each repo.
with DatabaseSession(self.logger) as session:
repos = session.query(Repo).all()

for repo in repos:
first_tasks_repo = group(collect_issues.si(repo.repo_id),collect_pull_requests.si(repo.repo_id))
second_tasks_repo = group(collect_events.si(repo.repo_id),collect_github_messages.si(repo.repo_id))

repo_chain = chain(first_tasks_repo,second_tasks_repo)
tasks_with_repo_domain.append(repo_chain)
self.jobs_dict[prelim_phase.__name__] = prelim_phase

self.jobs_dict[AugurTaskPhase.REPO_COLLECT] = group(
chain(group(*tasks_with_repo_domain),process_contributors.si()),
facade_commits_model.si(),
collect_releases.si(),
collect_repo_info.si()
)
self.jobs_dict[repo_collect_phase.__name__] = repo_collect_phase



Expand All @@ -89,10 +87,6 @@ def __getitem__(self,key: str) -> dict:
def __setitem__(self,key: str,newJobs):
"""Create a new collection job group with the name of the key specified.
"""
if not hasattr(newJobs, 'apply_async') or not callable(newJobs.apply_async):
self.logger.error("Collection groups must be of celery types that can be called with \'apply_async\'")
raise AttributeError

if key in self.disabled_collection_phases:
self.logger.error("Group has been disabled")
return
Expand All @@ -114,10 +108,12 @@ def start_data_collection(self):
self.logger.info(f"Enabled phases: {self.jobs_dict.keys()}")
augur_collection_list = []
for phaseName, job in self.jobs_dict.items():
augur_collection_list.append(job)

augur_collection = chain(*augur_collection_list)
augur_collection.apply_async()
self.logger.info(f"Starting phase {phaseName}")
#Call the function stored in the dict to return the object to call apply_async on
phaseResult = job(self.logger).apply_async()
with allow_join_result():
phaseResult.join()
#self.logger.info(f"Result of {phaseName} phase: {phaseResult.status}")


@celery.task
Expand All @@ -129,76 +125,5 @@ def start_task():

default_augur_collection.start_data_collection()

"""
logger.info(f"Collecting data for git and github...")

with DatabaseSession(logger) as session:

repos = session.query(Repo).all()

#task_list = []
augur_main_routine = AugurTaskRoutine()

augur_main_routine['facade'] = facade_commits_model.si()

issues_and_pr_list = [collect_issues.si(repo.repo_git) for repo in repos]
issues_and_pr_list.extend([collect_pull_requests.si(repo.repo_git) for repo in repos])

augur_main_routine['collect_issues_and_pull_requests'] = group(issues_and_pr_list)

augur_main_routine['collect_events'] = group([collect_events.si(repo.repo_git) for repo in repos])
augur_main_routine['collect_issue_and_pr_comments'] = group([collect_issue_and_pr_comments.si(repo.repo_git) for repo in repos])

augur_main_routine['process_contributors'] = process_contributors.si()


augur_main_routine.add_dependency_relationship(job='collect_events',depends_on='collect_issues_and_pull_requests')
augur_main_routine.add_dependency_relationship(job='collect_issue_and_pr_comments',depends_on='collect_issues_and_pull_requests')
augur_main_routine.add_dependency_relationship(job='process_contributors',depends_on='collect_events')
augur_main_routine.add_dependency_relationship(job='process_contributors',depends_on='collect_issue_and_pr_comments')

augur_main_routine.logger.info(augur_main_routine.dependency_relationships)
augur_main_routine.start_data_collection()
augur_main_routine.logger.info(augur_main_routine.dependency_relationships)
print('no cycle!')

# routine = AugurTaskRoutine()
# routine['start'] = chain(start_tasks_group,secondary_task_group)
# routine.start_data_collection()
"""

def create_github_task_chain(repo_git):

start_task_list = []
start_task_list.append(collect_pull_requests.si(repo_git))
start_task_list.append(collect_issues.si(repo_git))

start_tasks_group = group(start_task_list)

secondary_task_list = []
secondary_task_list.append(collect_events.si(repo_git))
secondary_task_list.append(collect_github_messages.si(repo_git))

secondary_task_group = group(secondary_task_list)

github_task_chain = chain(
start_tasks_group, secondary_task_group)

return github_task_chain


def get_owner_repo(git_url):
"""Gets the owner and repository names of a repository from a git url

:param git_url: String, the git url of a repository
:return: Tuple, includes the owner and repository names in that order
"""
split = git_url.split('/')

owner = split[-2]
repo = split[-1]

if '.git' == repo[-4:]:
repo = repo[:-4]

return owner, repo