-
Notifications
You must be signed in to change notification settings - Fork 842
/
facade_tasks.py
548 lines (367 loc) · 21.2 KB
/
facade_tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
#SPDX-License-Identifier: MIT
import logging
from celery import group, chain
import sqlalchemy as s
from augur.tasks.git.util.facade_worker.facade_worker.utilitymethods import trim_commits
from augur.tasks.git.util.facade_worker.facade_worker.utilitymethods import get_absolute_repo_path, get_parent_commits_set, get_existing_commits_set
from augur.tasks.git.util.facade_worker.facade_worker.analyzecommit import analyze_commit
from augur.tasks.git.util.facade_worker.facade_worker.utilitymethods import get_repo_commit_count, update_facade_scheduling_fields, get_facade_weight_with_commit_count, facade_bulk_insert_commits
from augur.tasks.git.util.facade_worker.facade_worker.rebuildcache import fill_empty_affiliations, invalidate_caches, nuke_affiliations, rebuild_unknown_affiliation_and_web_caches
from augur.tasks.git.util.facade_worker.facade_worker.postanalysiscleanup import git_repo_cleanup
from augur.tasks.github.facade_github.tasks import *
from augur.tasks.util.collection_state import CollectionState
from augur.tasks.util.collection_util import get_collection_status_repo_git_from_filter
from augur.tasks.git.util.facade_worker.facade_worker.repofetch import GitCloneError, git_repo_initialize, git_repo_updates
from augur.tasks.init.celery_app import celery_app as celery
from augur.tasks.init.celery_app import AugurFacadeRepoCollectionTask
from augur.application.db.models import Repo, CollectionStatus
from augur.tasks.git.dependency_tasks.tasks import process_dependency_metrics
from augur.tasks.git.dependency_libyear_tasks.tasks import process_libyear_dependency_metrics
from augur.tasks.git.scc_value_tasks.tasks import process_scc_value_metrics
from augur.tasks.github.util.github_task_session import *
#define an error callback for chains in facade collection so facade doesn't make the program crash
#if it does.
@celery.task
def facade_error_handler(request,exc,traceback):
logger = logging.getLogger(facade_error_handler.__name__)
logger.error(f"Task {request.id} raised exception: {exc}! \n {traceback}")
print(f"chain: {request.chain}")
#Make sure any further execution of tasks dependent on this one stops.
try:
#Replace the tasks queued ahead of this one in a chain with None.
request.chain = None
except AttributeError:
pass #Task is not part of a chain. Normal so don't log.
except Exception as e:
logger.error(f"Could not mutate request chain! \n Error: {e}")
#Predefine facade collection with tasks
@celery.task(base=AugurFacadeRepoCollectionTask)
def facade_analysis_init_facade_task(repo_git):
logger = logging.getLogger(facade_analysis_init_facade_task.__name__)
with FacadeSession(logger) as session:
repo = session.query(Repo).filter(Repo.repo_git == repo_git).one()
repo_id = repo.repo_id
session.update_status('Running analysis')
session.log_activity('Info',f"Beginning analysis.")
@celery.task(base=AugurFacadeRepoCollectionTask)
def trim_commits_facade_task(repo_git):
logger = logging.getLogger(trim_commits_facade_task.__name__)
with FacadeSession(logger) as session:
repo = session.query(Repo).filter(Repo.repo_git == repo_git).one()
repo_id = repo.repo_id
def update_analysis_log(repos_id,status):
# Log a repo's analysis status
log_message = s.sql.text("""INSERT INTO analysis_log (repos_id,status)
VALUES (:repo_id,:status)""").bindparams(repo_id=repos_id,status=status)
try:
session.execute_sql(log_message)
except:
pass
session.inc_repos_processed()
update_analysis_log(repo_id,"Beginning analysis.")
# First we check to see if the previous analysis didn't complete
get_status = s.sql.text("""SELECT working_commit FROM working_commits WHERE repos_id=:repo_id
""").bindparams(repo_id=repo_id)
try:
working_commits = session.fetchall_data_from_sql_text(get_status)
except:
working_commits = []
# If there's a commit still there, the previous run was interrupted and
# the commit data may be incomplete. It should be trimmed, just in case.
commits_to_trim = [commit['working_commit'] for commit in working_commits]
trim_commits(session,repo_id,commits_to_trim)
# Start the main analysis
update_analysis_log(repo_id,'Collecting data')
logger.info(f"Got past repo {repo_id}")
@celery.task(base=AugurFacadeRepoCollectionTask)
def trim_commits_post_analysis_facade_task(repo_git):
logger = logging.getLogger(trim_commits_post_analysis_facade_task.__name__)
with FacadeSession(logger) as session:
repo = session.query(Repo).filter(Repo.repo_git == repo_git).one()
repo_id = repo.repo_id
start_date = session.get_setting('start_date')
def update_analysis_log(repos_id,status):
# Log a repo's analysis status
log_message = s.sql.text("""INSERT INTO analysis_log (repos_id,status)
VALUES (:repo_id,:status)""").bindparams(repo_id=repos_id,status=status)
session.execute_sql(log_message)
session.logger.info(f"Generating sequence for repo {repo_id}")
query = session.query(Repo).filter(Repo.repo_id == repo_id)
repo = execute_session_query(query, 'one')
#Get the huge list of commits to process.
absoulte_path = get_absolute_repo_path(session.repo_base_directory, repo.repo_id, repo.repo_path,repo.repo_name)
repo_loc = (f"{absoulte_path}/.git")
# Grab the parents of HEAD
parent_commits = get_parent_commits_set(repo_loc, start_date)
# Grab the existing commits from the database
existing_commits = get_existing_commits_set(session, repo_id)
# Find missing commits and add them
missing_commits = parent_commits - existing_commits
session.log_activity('Debug',f"Commits missing from repo {repo_id}: {len(missing_commits)}")
# Find commits which are out of the analysis range
trimmed_commits = existing_commits - parent_commits
update_analysis_log(repo_id,'Data collection complete')
update_analysis_log(repo_id,'Beginning to trim commits')
session.log_activity('Debug',f"Commits to be trimmed from repo {repo_id}: {len(trimmed_commits)}")
#for commit in trimmed_commits:
trim_commits(session,repo_id,trimmed_commits)
update_analysis_log(repo_id,'Commit trimming complete')
update_analysis_log(repo_id,'Complete')
@celery.task
def facade_analysis_end_facade_task():
logger = logging.getLogger(facade_analysis_end_facade_task.__name__)
with FacadeSession(logger) as session:
session.log_activity('Info','Running analysis (complete)')
@celery.task
def facade_start_contrib_analysis_task():
logger = logging.getLogger(facade_start_contrib_analysis_task.__name__)
with FacadeSession(logger) as session:
session.update_status('Updating Contributors')
session.log_activity('Info', 'Updating Contributors with commits')
#enable celery multithreading
@celery.task(base=AugurFacadeRepoCollectionTask)
def analyze_commits_in_parallel(repo_git, multithreaded: bool)-> None:
"""Take a large list of commit data to analyze and store in the database. Meant to be run in parallel with other instances of this task.
"""
#create new session for celery thread.
logger = logging.getLogger(analyze_commits_in_parallel.__name__)
with FacadeSession(logger) as session:
repo = session.query(Repo).filter(Repo.repo_git == repo_git).one()
repo_id = repo.repo_id
start_date = session.get_setting('start_date')
session.logger.info(f"Generating sequence for repo {repo_id}")
query = session.query(Repo).filter(Repo.repo_id == repo_id)
repo = execute_session_query(query, 'one')
#Get the huge list of commits to process.
absoulte_path = get_absolute_repo_path(session.repo_base_directory, repo.repo_id, repo.repo_path, repo.repo_name)
repo_loc = (f"{absoulte_path}/.git")
# Grab the parents of HEAD
parent_commits = get_parent_commits_set(repo_loc, start_date)
# Grab the existing commits from the database
existing_commits = get_existing_commits_set(session, repo_id)
# Find missing commits and add them
missing_commits = parent_commits - existing_commits
session.log_activity('Debug',f"Commits missing from repo {repo_id}: {len(missing_commits)}")
if not len(missing_commits) or repo_id is None:
#session.log_activity('Info','Type of missing_commits: %s' % type(missing_commits))
return
queue = list(missing_commits)
logger.info(f"Got to analysis!")
absoulte_path = get_absolute_repo_path(session.repo_base_directory, repo.repo_id, repo.repo_path,repo.repo_name)
repo_loc = (f"{absoulte_path}/.git")
pendingCommitRecordsToInsert = []
for count, commitTuple in enumerate(queue):
quarterQueue = int(len(queue) / 4)
if quarterQueue == 0:
quarterQueue = 1 # prevent division by zero with integer math
#Log progress when another quarter of the queue has been processed
if (count + 1) % quarterQueue == 0:
logger.info(f"Progress through current analysis queue is {(count / len(queue)) * 100}%")
#logger.info(f"Got to analysis!")
commitRecords = analyze_commit(session, repo_id, repo_loc, commitTuple)
#logger.debug(commitRecord)
if len(commitRecords):
pendingCommitRecordsToInsert.extend(commitRecords)
if len(pendingCommitRecordsToInsert) >= 1000:
facade_bulk_insert_commits(session,pendingCommitRecordsToInsert)
pendingCommitRecordsToInsert = []
facade_bulk_insert_commits(session,pendingCommitRecordsToInsert)
# Remove the working commit.
remove_commit = s.sql.text("""DELETE FROM working_commits
WHERE repos_id = :repo_id AND working_commit IN :hashes
""").bindparams(repo_id=repo_id,hashes=tuple(queue))
session.execute_sql(remove_commit)
logger.info("Analysis complete")
return
@celery.task
def nuke_affiliations_facade_task():
logger = logging.getLogger(nuke_affiliations_facade_task.__name__)
with FacadeSession(logger) as session:
nuke_affiliations(session)
@celery.task
def fill_empty_affiliations_facade_task():
logger = logging.getLogger(fill_empty_affiliations_facade_task.__name__)
with FacadeSession(logger) as session:
fill_empty_affiliations(session)
@celery.task
def invalidate_caches_facade_task():
logger = logging.getLogger(invalidate_caches_facade_task.__name__)
with FacadeSession(logger) as session:
invalidate_caches(session)
@celery.task
def rebuild_unknown_affiliation_and_web_caches_facade_task():
logger = logging.getLogger(rebuild_unknown_affiliation_and_web_caches_facade_task.__name__)
with FacadeSession(logger) as session:
rebuild_unknown_affiliation_and_web_caches(session)
@celery.task
def git_repo_cleanup_facade_task(repo_git):
logger = logging.getLogger(git_repo_cleanup_facade_task.__name__)
with FacadeSession(logger) as session:
git_repo_cleanup(session, repo_git)
# retry this task indefinitely every 5 minutes if it errors. Since the only way it gets scheduled is by itself, so if it stops running no more clones will happen till the instance is restarted
@celery.task(autoretry_for=(Exception,), retry_backoff=True, retry_backoff_max=300, retry_jitter=True, max_retries=None)
def clone_repos():
logger = logging.getLogger(clone_repos.__name__)
is_pending = CollectionStatus.facade_status == CollectionState.PENDING.value
with FacadeSession(logger) as session:
# process up to 1000 repos at a time
repo_git_identifiers = get_collection_status_repo_git_from_filter(session, is_pending, 999999)
for repo_git in repo_git_identifiers:
# set repo to intializing
repo = session.query(Repo).filter(Repo.repo_git == repo_git).one()
repoStatus = repo.collection_status[0]
setattr(repoStatus,"facade_status", CollectionState.INITIALIZING.value)
session.commit()
# clone repo
try:
git_repo_initialize(session, repo_git)
session.commit()
# get the commit count
commit_count = get_repo_commit_count(session, repo_git)
facade_weight = get_facade_weight_with_commit_count(session, repo_git, commit_count)
update_facade_scheduling_fields(session, repo_git, facade_weight, commit_count)
# set repo to update
setattr(repoStatus,"facade_status", CollectionState.UPDATE.value)
session.commit()
except GitCloneError:
# continue to next repo, since we can't calculate
# commit_count or weight without the repo cloned
setattr(repoStatus,"facade_status", CollectionState.FAILED_CLONE.value)
session.commit()
except Exception as e:
logger.error(f"Ran into unexpected issue when cloning repositories \n Error: {e}")
# set repo to error
setattr(repoStatus,"facade_status", CollectionState.ERROR.value)
session.commit()
clone_repos.si().apply_async(countdown=60*5)
#@celery.task(bind=True)
#def check_for_repo_updates_facade_task(self, repo_git):
#
# engine = self.app.engine
#
# logger = logging.getLogger(check_for_repo_updates_facade_task.__name__)
#
# with FacadeSession(logger) as session:
# check_for_repo_updates(session, repo_git)
@celery.task(base=AugurFacadeRepoCollectionTask, bind=True)
def git_update_commit_count_weight(self, repo_git):
engine = self.app.engine
logger = logging.getLogger(git_update_commit_count_weight.__name__)
# Change facade session to take in engine
with FacadeSession(logger) as session:
commit_count = get_repo_commit_count(session, repo_git)
facade_weight = get_facade_weight_with_commit_count(session, repo_git, commit_count)
update_facade_scheduling_fields(session, repo_git, facade_weight, commit_count)
@celery.task(base=AugurFacadeRepoCollectionTask)
def git_repo_updates_facade_task(repo_git):
logger = logging.getLogger(git_repo_updates_facade_task.__name__)
with FacadeSession(logger) as session:
git_repo_updates(session, repo_git)
def generate_analysis_sequence(logger,repo_git, session):
"""Run the analysis by looping over all active repos. For each repo, we retrieve
the list of commits which lead to HEAD. If any are missing from the database,
they are filled in. Then we check to see if any commits in the database are
not in the list of parents, and prune them out.
We also keep track of the last commit to be processed, so that if the analysis
is interrupted (possibly leading to partial data in the database for the
commit being analyzed at the time) we can recover.
"""
analysis_sequence = []
repo_list = s.sql.text("""SELECT repo_id,repo_group_id,repo_path,repo_name FROM repo
WHERE repo_git=:value""").bindparams(value=repo_git)
repos = session.fetchall_data_from_sql_text(repo_list)
start_date = session.get_setting('start_date')
repo_ids = [repo['repo_id'] for repo in repos]
repo_id = repo_ids.pop(0)
analysis_sequence.append(facade_analysis_init_facade_task.si(repo_git))
analysis_sequence.append(trim_commits_facade_task.si(repo_git))
analysis_sequence.append(analyze_commits_in_parallel.si(repo_git,True))
analysis_sequence.append(trim_commits_post_analysis_facade_task.si(repo_git))
analysis_sequence.append(facade_analysis_end_facade_task.si())
logger.info(f"Analysis sequence: {analysis_sequence}")
return analysis_sequence
def facade_phase(repo_git):
logger = logging.getLogger(facade_phase.__name__)
logger.info("Generating facade sequence")
with FacadeSession(logger) as session:
#Get the repo_id
repo_list = s.sql.text("""SELECT repo_id,repo_group_id,repo_path,repo_name FROM repo
WHERE repo_git=:value""").bindparams(value=repo_git)
repos = session.fetchall_data_from_sql_text(repo_list)
start_date = session.get_setting('start_date')
repo_ids = [repo['repo_id'] for repo in repos]
repo_id = repo_ids.pop(0)
#Get the collectionStatus
query = session.query(CollectionStatus).filter(CollectionStatus.repo_id == repo_id)
status = execute_session_query(query,'one')
# Figure out what we need to do
limited_run = session.limited_run
run_analysis = session.run_analysis
pull_repos = session.pull_repos
#force_analysis = session.force_analysis
run_facade_contributors = session.run_facade_contributors
facade_sequence = []
facade_core_collection = []
if not limited_run or (limited_run and pull_repos):
facade_core_collection.append(git_repo_updates_facade_task.si(repo_git))
facade_core_collection.append(git_update_commit_count_weight.si(repo_git))
#Generate commit analysis task order.
if not limited_run or (limited_run and run_analysis):
facade_core_collection.extend(generate_analysis_sequence(logger,repo_git,session))
#Generate contributor analysis task group.
if not limited_run or (limited_run and run_facade_contributors):
facade_core_collection.append(insert_facade_contributors.si(repo_git))
#These tasks need repos to be cloned by facade before they can work.
facade_sequence.append(
group(
chain(*facade_core_collection),
process_dependency_metrics.si(repo_git),
process_libyear_dependency_metrics.si(repo_git),
process_scc_value_metrics.si(repo_git)
)
)
logger.info(f"Facade sequence: {facade_sequence}")
return chain(*facade_sequence)
def generate_non_repo_domain_facade_tasks(logger):
logger.info("Generating facade sequence")
with FacadeSession(logger) as session:
# Figure out what we need to do
limited_run = session.limited_run
delete_marked_repos = session.delete_marked_repos
pull_repos = session.pull_repos
# clone_repos = session.clone_repos
check_updates = session.check_updates
# force_updates = session.force_updates
run_analysis = session.run_analysis
# force_analysis = session.force_analysis
nuke_stored_affiliations = session.nuke_stored_affiliations
fix_affiliations = session.fix_affiliations
force_invalidate_caches = session.force_invalidate_caches
rebuild_caches = session.rebuild_caches
#if abs((datetime.datetime.strptime(session.cfg.get_setting('aliases_processed')[:-3],
# '%Y-%m-%d %I:%M:%S.%f') - datetime.datetime.now()).total_seconds()) // 3600 > int(session.cfg.get_setting(
# 'update_frequency')) else 0
force_invalidate_caches = session.force_invalidate_caches
create_xlsx_summary_files = session.create_xlsx_summary_files
multithreaded = session.multithreaded
facade_sequence = []
if nuke_stored_affiliations:
#facade_sequence.append(nuke_affiliations_facade_task.si().on_error(facade_error_handler.s()))#nuke_affiliations(session.cfg)
logger.info("Nuke stored affiliations is deprecated.")
# deprecated because the UI component of facade where affiliations would be
# nuked upon change no longer exists, and this information can easily be derived
# from queries and materialized views in the current version of Augur.
# This method is also a major performance bottleneck with little value.
#session.logger.info(session.cfg)
if not limited_run or (limited_run and fix_affiliations):
#facade_sequence.append(fill_empty_affiliations_facade_task.si().on_error(facade_error_handler.s()))#fill_empty_affiliations(session)
logger.info("Fill empty affiliations is deprecated.")
# deprecated because the UI component of facade where affiliations would need
# to be fixed upon change no longer exists, and this information can easily be derived
# from queries and materialized views in the current version of Augur.
# This method is also a major performance bottleneck with little value.
if force_invalidate_caches:
facade_sequence.append(invalidate_caches_facade_task.si().on_error(facade_error_handler.s()))#invalidate_caches(session.cfg)
if not limited_run or (limited_run and rebuild_caches):
facade_sequence.append(rebuild_unknown_affiliation_and_web_caches_facade_task.si().on_error(facade_error_handler.s()))#rebuild_unknown_affiliation_and_web_caches(session.cfg)
return facade_sequence