Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mark 404'd Repos to be Ignored and Re-collect Errored Repos #2678

Merged
merged 15 commits into from Feb 13, 2024
Merged
3 changes: 2 additions & 1 deletion augur/application/cli/backend.py
Expand Up @@ -19,7 +19,8 @@
from datetime import datetime

from augur import instance_id
from augur.tasks.start_tasks import augur_collection_monitor, CollectionState, create_collection_status_records
from augur.tasks.util.collection_state import CollectionState
from augur.tasks.start_tasks import augur_collection_monitor, create_collection_status_records
from augur.tasks.git.facade_tasks import clone_repos
from augur.tasks.data_analysis.contributor_breadth_worker.contributor_breadth_worker import contributor_breadth_model
from augur.tasks.init.redis_connection import redis_connection
Expand All @@ -38,7 +39,7 @@


@click.group('server', short_help='Commands for controlling the backend API server & data collection workers')
def cli():

Check warning on line 42 in augur/application/cli/backend.py

View workflow job for this annotation

GitHub Actions / runner / pylint

[pylint] reported by reviewdog 🐶 C0116: Missing function or method docstring (missing-function-docstring) Raw Output: augur/application/cli/backend.py:42:0: C0116: Missing function or method docstring (missing-function-docstring)
pass

@cli.command("start")
Expand All @@ -47,7 +48,7 @@
@click.option('--port')
@test_connection
@test_db_connection
def start(disable_collection, development, port):

Check warning on line 51 in augur/application/cli/backend.py

View workflow job for this annotation

GitHub Actions / runner / pylint

[pylint] reported by reviewdog 🐶 R0912: Too many branches (15/12) (too-many-branches) Raw Output: augur/application/cli/backend.py:51:0: R0912: Too many branches (15/12) (too-many-branches)

Check warning on line 51 in augur/application/cli/backend.py

View workflow job for this annotation

GitHub Actions / runner / pylint

[pylint] reported by reviewdog 🐶 R0915: Too many statements (64/50) (too-many-statements) Raw Output: augur/application/cli/backend.py:51:0: R0915: Too many statements (64/50) (too-many-statements)
"""Start Augur's backend server."""

try:
Expand Down Expand Up @@ -79,13 +80,13 @@
worker_vmem_cap = config.get_value("Celery", 'worker_process_vmem_cap')

gunicorn_command = f"gunicorn -c {gunicorn_location} -b {host}:{port} augur.api.server:app --log-file gunicorn.log"
server = subprocess.Popen(gunicorn_command.split(" "))

Check warning on line 83 in augur/application/cli/backend.py

View workflow job for this annotation

GitHub Actions / runner / pylint

[pylint] reported by reviewdog 🐶 R1732: Consider using 'with' for resource-allocating operations (consider-using-with) Raw Output: augur/application/cli/backend.py:83:13: R1732: Consider using 'with' for resource-allocating operations (consider-using-with)

time.sleep(3)
logger.info('Gunicorn webserver started...')
logger.info(f'Augur is running at: {"http" if development else "https"}://{host}:{port}')

processes = start_celery_worker_processes(float(worker_vmem_cap), disable_collection)

Check warning on line 89 in augur/application/cli/backend.py

View workflow job for this annotation

GitHub Actions / runner / pylint

[pylint] reported by reviewdog 🐶 W0621: Redefining name 'processes' from outer scope (line 367) (redefined-outer-name) Raw Output: augur/application/cli/backend.py:89:4: W0621: Redefining name 'processes' from outer scope (line 367) (redefined-outer-name)

if os.path.exists("celerybeat-schedule.db"):
logger.info("Deleting old task schedule")
Expand All @@ -96,7 +97,7 @@
log_level = config.get_value("Logging", "log_level")
celery_beat_process = None
celery_command = f"celery -A augur.tasks.init.celery_app.celery_app beat -l {log_level.lower()}"
celery_beat_process = subprocess.Popen(celery_command.split(" "))

Check warning on line 100 in augur/application/cli/backend.py

View workflow job for this annotation

GitHub Actions / runner / pylint

[pylint] reported by reviewdog 🐶 R1732: Consider using 'with' for resource-allocating operations (consider-using-with) Raw Output: augur/application/cli/backend.py:100:30: R1732: Consider using 'with' for resource-allocating operations (consider-using-with)

if not disable_collection:

Expand Down Expand Up @@ -142,7 +143,7 @@
except RedisConnectionError:
pass

def start_celery_worker_processes(vmem_cap_ratio, disable_collection=False):

Check warning on line 146 in augur/application/cli/backend.py

View workflow job for this annotation

GitHub Actions / runner / pylint

[pylint] reported by reviewdog 🐶 C0116: Missing function or method docstring (missing-function-docstring) Raw Output: augur/application/cli/backend.py:146:0: C0116: Missing function or method docstring (missing-function-docstring)

#Calculate process scaling based on how much memory is available on the system in bytes.
#Each celery process takes ~500MB or 500 * 1024^2 bytes
Expand All @@ -161,7 +162,7 @@

frontend_worker = f"celery -A augur.tasks.init.celery_app.celery_app worker -l info --concurrency=1 -n frontend:{uuid.uuid4().hex}@%h -Q frontend"
max_process_estimate -= 1
process_list.append(subprocess.Popen(frontend_worker.split(" ")))

Check warning on line 165 in augur/application/cli/backend.py

View workflow job for this annotation

GitHub Actions / runner / pylint

[pylint] reported by reviewdog 🐶 R1732: Consider using 'with' for resource-allocating operations (consider-using-with) Raw Output: augur/application/cli/backend.py:165:24: R1732: Consider using 'with' for resource-allocating operations (consider-using-with)
sleep_time += 6

if not disable_collection:
Expand All @@ -169,7 +170,7 @@
#2 processes are always reserved as a baseline.
scheduling_worker = f"celery -A augur.tasks.init.celery_app.celery_app worker -l info --concurrency=2 -n scheduling:{uuid.uuid4().hex}@%h -Q scheduling"
max_process_estimate -= 2
process_list.append(subprocess.Popen(scheduling_worker.split(" ")))

Check warning on line 173 in augur/application/cli/backend.py

View workflow job for this annotation

GitHub Actions / runner / pylint

[pylint] reported by reviewdog 🐶 R1732: Consider using 'with' for resource-allocating operations (consider-using-with) Raw Output: augur/application/cli/backend.py:173:28: R1732: Consider using 'with' for resource-allocating operations (consider-using-with)
sleep_time += 6

#60% of estimate, Maximum value of 45
Expand Down Expand Up @@ -307,7 +308,7 @@
SET facade_status='Pending', facade_task_id=NULL
WHERE facade_status='Failed Clone' OR facade_status='Initializing';
"""))
#TODO: write timestamp for currently running repos.

Check warning on line 311 in augur/application/cli/backend.py

View workflow job for this annotation

GitHub Actions / runner / pylint

[pylint] reported by reviewdog 🐶 W0511: TODO: write timestamp for currently running repos. (fixme) Raw Output: augur/application/cli/backend.py:311:5: W0511: TODO: write timestamp for currently running repos. (fixme)

def assign_orphan_repos_to_default_user(session):
query = s.sql.text("""
Expand Down
3 changes: 2 additions & 1 deletion augur/tasks/git/facade_tasks.py
Expand Up @@ -31,7 +31,8 @@
from augur.tasks.git.util.facade_worker.facade_worker.utilitymethods import get_facade_weight_time_factor, get_repo_commit_count, update_facade_scheduling_fields, get_facade_weight_with_commit_count, facade_bulk_insert_commits

from augur.tasks.github.facade_github.tasks import *
IsaacMilarky marked this conversation as resolved.
Show resolved Hide resolved
from augur.tasks.util.collection_util import CollectionState, get_collection_status_repo_git_from_filter
from augur.tasks.util.collection_state import CollectionState
from augur.tasks.util.collection_util import get_collection_status_repo_git_from_filter
from augur.tasks.git.util.facade_worker.facade_worker.repofetch import GitCloneError, git_repo_initialize


Expand Down
33 changes: 18 additions & 15 deletions augur/tasks/github/detect_move/core.py
Expand Up @@ -6,20 +6,24 @@
from augur.tasks.github.util.util import parse_json_response
import logging
from datetime import datetime
from enum import Enum
from augur.tasks.utl.collection_state import CollectionState
from augur.application.db.util import execute_session_query

class CollectionState(Enum):
SUCCESS = "Success"
PENDING = "Pending"
ERROR = "Error"
COLLECTING = "Collecting"


def update_repo_with_dict(current_dict,new_dict,logger,db):

def update_repo_with_dict(repo,new_dict,logger,db):
"""
Update a repository record in the database using a dictionary tagged with
the appropriate table fields

Args:
repo: orm repo object to update
new_dict: dict of new values to add to the repo record
logger: logging object
db: db object
"""

to_insert = current_dict
to_insert = repo.__dict__
del to_insert['_sa_instance_state']
to_insert.update(new_dict)

Expand All @@ -45,7 +49,6 @@ def ping_github_for_repo_move(augur_db, key_auth, repo, logger,collection_hook='

owner, name = get_owner_repo(repo.repo_git)
url = f"https://api.github.com/repos/{owner}/{name}"
current_repo_dict = repo.__dict__

attempts = 0
while attempts < 10:
Expand All @@ -68,7 +71,7 @@ def ping_github_for_repo_move(augur_db, key_auth, repo, logger,collection_hook='
'data_collection_date': datetime.today().strftime('%Y-%m-%dT%H:%M:%SZ')
}

update_repo_with_dict(current_repo_dict, repo_update_dict, logger, augur_db)
update_repo_with_dict(repo, repo_update_dict, logger, augur_db)

raise Exception(f"ERROR: Repo not found at requested host {repo.repo_git}")
elif attempts >= 10:
Expand Down Expand Up @@ -98,22 +101,22 @@ def ping_github_for_repo_move(augur_db, key_auth, repo, logger,collection_hook='
'description': f"(Originally hosted at {url}) {old_description}"
}

update_repo_with_dict(current_repo_dict, repo_update_dict, logger,augur_db)
update_repo_with_dict(repo, repo_update_dict, logger,augur_db)

statusQuery = augur_db.session.query(CollectionStatus).filter(CollectionStatus.repo_id == repo.repo_id)

collectionRecord = execute_session_query(statusQuery,'one')
if collection_hook == 'core':
collectionRecord.core_status = CollectionState.PENDING.value
collectionRecord.core_status = CollectionState.IGNORE.value
collectionRecord.core_task_id = None
collectionRecord.core_data_last_collected = datetime.today().strftime('%Y-%m-%dT%H:%M:%SZ')
elif collection_hook == 'secondary':
collectionRecord.secondary_status = CollectionState.PENDING.value
collectionRecord.secondary_status = CollectionState.IGNORE.value
collectionRecord.secondary_task_id = None
collectionRecord.secondary_data_last_collected = datetime.today().strftime('%Y-%m-%dT%H:%M:%SZ')

augur_db.session.commit()

raise Exception("ERROR: Repo has moved! Marked repo as pending and stopped collection")
raise Exception("ERROR: Repo has moved! Marked repo as IGNORE and stopped collection!")


21 changes: 8 additions & 13 deletions augur/tasks/init/celery_app.py
Expand Up @@ -20,16 +20,7 @@
from augur.application.db.engine import get_database_string
from augur.tasks.init import get_redis_conn_values, get_rabbitmq_conn_string
from augur.application.db.models import CollectionStatus, Repo

class CollectionState(Enum):
SUCCESS = "Success"
PENDING = "Pending"
ERROR = "Error"
COLLECTING = "Collecting"
INITIALIZING = "Initializing"
UPDATE = "Update"
FAILED_CLONE = "Failed Clone"

from augur.tasks.util.collection_state import CollectionState

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -85,7 +76,7 @@ class CollectionState(Enum):
#Classes for tasks that take a repo_git as an argument.
class AugurCoreRepoCollectionTask(celery.Task):
IsaacMilarky marked this conversation as resolved.
Show resolved Hide resolved

def augur_handle_task_failure(self,exc,task_id,repo_git,logger_name,collection_hook='core'):
def augur_handle_task_failure(self,exc,task_id,repo_git,logger_name,collection_hook='core',after_fail=CollectionState.ERROR.value):
IsaacMilarky marked this conversation as resolved.
Show resolved Hide resolved
from augur.tasks.init.celery_app import engine
IsaacMilarky marked this conversation as resolved.
Show resolved Hide resolved
IsaacMilarky marked this conversation as resolved.
Show resolved Hide resolved
IsaacMilarky marked this conversation as resolved.
Show resolved Hide resolved

logger = AugurLogger(logger_name).get_logger()
IsaacMilarky marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -104,7 +95,7 @@ def augur_handle_task_failure(self,exc,task_id,repo_git,logger_name,collection_h
prevStatus = getattr(repoStatus, f"{collection_hook}_status")

if prevStatus == CollectionState.COLLECTING.value or prevStatus == CollectionState.INITIALIZING.value:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[pylint] reported by reviewdog 🐶
R1714: Consider merging these comparisons with 'in' by using 'prevStatus in (CollectionState.COLLECTING.value, CollectionState.INITIALIZING.value)'. Use a set instead if elements are hashable. (consider-using-in)

setattr(repoStatus, f"{collection_hook}_status", CollectionState.ERROR.value)
setattr(repoStatus, f"{collection_hook}_status", after_fail)
setattr(repoStatus, f"{collection_hook}_task_id", None)
session.commit()

Expand All @@ -129,6 +120,7 @@ def on_failure(self,exc,task_id,args,kwargs,einfo):
repo_git = args[0]
self.augur_handle_task_failure(exc,task_id,repo_git, "ml_task_failure", collection_hook='ml')


#task_cls='augur.tasks.init.celery_app:AugurCoreRepoCollectionTask'
celery_app = Celery('tasks', broker=BROKER_URL, backend=BACKEND_URL, include=tasks)

Expand Down Expand Up @@ -209,7 +201,7 @@ def setup_periodic_tasks(sender, **kwargs):
"""
from celery.schedules import crontab
IsaacMilarky marked this conversation as resolved.
Show resolved Hide resolved
from augur.tasks.start_tasks import augur_collection_monitor, augur_collection_update_weights
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[pylint] reported by reviewdog 🐶
C0415: Import outside toplevel (augur.tasks.start_tasks.augur_collection_monitor, augur.tasks.start_tasks.augur_collection_update_weights) (import-outside-toplevel)

from augur.tasks.start_tasks import non_repo_domain_tasks
from augur.tasks.start_tasks import non_repo_domain_tasks, retry_errored_repos
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[pylint] reported by reviewdog 🐶
C0415: Import outside toplevel (augur.tasks.start_tasks.non_repo_domain_tasks, augur.tasks.start_tasks.retry_errored_repos) (import-outside-toplevel)

from augur.tasks.git.facade_tasks import clone_repos
IsaacMilarky marked this conversation as resolved.
Show resolved Hide resolved
from augur.tasks.db.refresh_materialized_views import refresh_materialized_views
IsaacMilarky marked this conversation as resolved.
Show resolved Hide resolved
from augur.tasks.data_analysis.contributor_breadth_worker.contributor_breadth_worker import contributor_breadth_model
IsaacMilarky marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -234,6 +226,9 @@ def setup_periodic_tasks(sender, **kwargs):
logger.info(f"Scheduling update of collection weights on midnight each day")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[pylint] reported by reviewdog 🐶
W1309: Using an f-string that does not have any interpolated variables (f-string-without-interpolation)

sender.add_periodic_task(crontab(hour=0, minute=0),augur_collection_update_weights.s())

logger.info(f"Setting 404 repos to be marked for retry on midnight each day")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[pylint] reported by reviewdog 🐶
W1309: Using an f-string that does not have any interpolated variables (f-string-without-interpolation)

sender.add_periodic_task(crontab(hour=0, minute=0),retry_errored_repos.s())

logger.info(f"Scheduling contributor breadth every 30 days")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[pylint] reported by reviewdog 🐶
W1309: Using an f-string that does not have any interpolated variables (f-string-without-interpolation)

thirty_days_in_seconds = 30*24*60*60
sender.add_periodic_task(thirty_days_in_seconds, contributor_breadth_model.s())
Expand Down
34 changes: 33 additions & 1 deletion augur/tasks/start_tasks.py
Expand Up @@ -33,9 +33,9 @@
from augur.tasks.init.celery_app import celery_app as celery
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[pylint] reported by reviewdog 🐶
C0413: Import "from augur.tasks.init.celery_app import celery_app as celery" should be placed at the top of the module (wrong-import-position)

from augur.application.db.session import DatabaseSession
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[pylint] reported by reviewdog 🐶
C0413: Import "from augur.application.db.session import DatabaseSession" should be placed at the top of the module (wrong-import-position)

from logging import Logger
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[pylint] reported by reviewdog 🐶
C0413: Import "from logging import Logger" should be placed at the top of the module (wrong-import-position)

from enum import Enum
from augur.tasks.util.redis_list import RedisList
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[pylint] reported by reviewdog 🐶
C0413: Import "from augur.tasks.util.redis_list import RedisList" should be placed at the top of the module (wrong-import-position)

from augur.application.db.models import CollectionStatus, Repo
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[pylint] reported by reviewdog 🐶
C0413: Import "from augur.application.db.models import CollectionStatus, Repo" should be placed at the top of the module (wrong-import-position)

from augur.tasks.util.collection_state import CollectionState
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[pylint] reported by reviewdog 🐶
C0413: Import "from augur.tasks.util.collection_state import CollectionState" should be placed at the top of the module (wrong-import-position)

from augur.tasks.util.collection_util import *
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[pylint] reported by reviewdog 🐶
W0401: Wildcard import augur.tasks.util.collection_util (wildcard-import)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[pylint] reported by reviewdog 🐶
C0413: Import "from augur.tasks.util.collection_util import *" should be placed at the top of the module (wrong-import-position)

from augur.tasks.git.util.facade_worker.facade_worker.utilitymethods import get_facade_weight_time_factor
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[pylint] reported by reviewdog 🐶
C0413: Import "from augur.tasks.git.util.facade_worker.facade_worker.utilitymethods import get_facade_weight_time_factor" should be placed at the top of the module (wrong-import-position)


Expand Down Expand Up @@ -328,9 +328,41 @@ def augur_collection_update_weights():
session.commit()
#git_update_commit_count_weight(repo_git)

@celery.task
def retry_errored_repos():
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[pylint] reported by reviewdog 🐶
C0116: Missing function or method docstring (missing-function-docstring)

"""
Periodic task to reset repositories that have errored and try again.
"""
from augur.tasks.init.celery_app import engine
IsaacMilarky marked this conversation as resolved.
Show resolved Hide resolved
logger = logging.getLogger(create_collection_status_records.__name__)

#TODO: Isaac needs to normalize the status's to be abstract in the
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[pylint] reported by reviewdog 🐶
W0511: TODO: Isaac needs to normalize the status's to be abstract in the (fixme)

#collection_status table once augur dev is less unstable.
with DatabaseSession(logger,engine) as session:
query = s.sql.text(f"""UPDATE repo SET secondary_staus = {CollectionState.PENDING.value}"""
f""" WHERE secondary_status = '{CollectionState.ERROR.value}' ;"""
f"""UPDATE repo SET core_status = {CollectionState.PENDING.value}"""
f""" WHERE core_status = '{CollectionState.ERROR.value}' ;"""
f"""UPDATE repo SET facade_status = {CollectionState.PENDING.value}"""
f""" WHERE facade_status = '{CollectionState.ERROR.value}' ;"""
f"""UPDATE repo SET ml_status = {CollectionState.PENDING.value}"""
f""" WHERE ml_status = '{CollectionState.ERROR.value}' ;"""
)

session.execute_sql(query)



#Retry this task for every issue so that repos that were added manually get the chance to be added to the collection_status table.
@celery.task(autoretry_for=(Exception,), retry_backoff=True, retry_backoff_max=300, retry_jitter=True, max_retries=None)
def create_collection_status_records():
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[pylint] reported by reviewdog 🐶
C0116: Missing function or method docstring (missing-function-docstring)

"""
Automatic task that runs and checks for repos that haven't been given a collection_status
record corresponding to the state of their collection at the monent.

A special celery task that automatically retries itself and has no max retries.
"""

from augur.tasks.init.celery_app import engine
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[pylint] reported by reviewdog 🐶
C0415: Import outside toplevel (augur.tasks.init.celery_app.engine) (import-outside-toplevel)

logger = logging.getLogger(create_collection_status_records.__name__)

Expand Down
13 changes: 13 additions & 0 deletions augur/tasks/util/collection_state.py
@@ -0,0 +1,13 @@

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[pylint] reported by reviewdog 🐶
C0114: Missing module docstring (missing-module-docstring)

from enum import Enum

class CollectionState(Enum):
IsaacMilarky marked this conversation as resolved.
Show resolved Hide resolved
SUCCESS = "Success"
PENDING = "Pending"
ERROR = "Error"
COLLECTING = "Collecting"
INITIALIZING = "Initializing"
UPDATE = "Update"
FAILED_CLONE = "Failed Clone"
STANDBY = "Standby"
IGNORE = "Ignore"
11 changes: 1 addition & 10 deletions augur/tasks/util/collection_util.py
Expand Up @@ -24,18 +24,9 @@
from augur.tasks.github.util.github_task_session import GithubTaskManifest
from augur.application.db.session import DatabaseSession
from augur.tasks.util.worker_util import calculate_date_weight_from_timestamps
from augur.tasks.util.collection_state import CollectionState


# class syntax
class CollectionState(Enum):
SUCCESS = "Success"
PENDING = "Pending"
ERROR = "Error"
COLLECTING = "Collecting"
INITIALIZING = "Initializing"
UPDATE = "Update"
FAILED_CLONE = "Failed Clone"

def get_list_of_all_users(session):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[pylint] reported by reviewdog 🐶
C0116: Missing function or method docstring (missing-function-docstring)

#Get a list of all users.
query = s.sql.text("""
Expand Down