diff --git a/README.md b/README.md index eb9890d4c9..ab5342dac7 100644 --- a/README.md +++ b/README.md @@ -1,21 +1,21 @@ -# Augur NEW Release v0.44.0 +# Augur NEW Release v0.44.2 [![first-timers-only](https://img.shields.io/badge/first--timers--only-friendly-blue.svg?style=flat-square)](https://www.firsttimersonly.com/) We follow the [First Timers Only](https://www.firsttimersonly.com/) philosophy of tagging issues for first timers only, and walking one newcomer through the resolution process weekly. [You can find these issues tagged with "first timers only" on our issues list.](https://github.com/chaoss/augur/labels/first-timers-only). [![standard-readme compliant](https://img.shields.io/badge/standard--readme-OK-green.svg?style=flat-square)](https://github.com/RichardLitt/standard-readme) [![Build Docker images](https://github.com/chaoss/augur/actions/workflows/build_docker.yml/badge.svg)](https://github.com/chaoss/augur/actions/workflows/build_docker.yml) [![Hits-of-Code](https://hitsofcode.com/github/chaoss/augur?branch=main)](https://hitsofcode.com/github/chaoss/augur/view?branch=main) [![CII Best Practices](https://bestpractices.coreinfrastructure.org/projects/2788/badge)](https://bestpractices.coreinfrastructure.org/projects/2788) ## NEW RELEASE ALERT! -[If you want to jump right in, updated docker build/compose and bare metal installation instructions are available here](docs/new-install.md) +### [If you want to jump right in, updated docker build/compose and bare metal installation instructions are available here](docs/new-install.md) -Augur is now releasing a dramatically improved new version to the main branch. It is also available here: https://github.com/chaoss/augur/releases/tag/v0.44.0 +Augur is now releasing a dramatically improved new version to the main branch. It is also available here: https://github.com/chaoss/augur/releases/tag/v0.44.2 - The `main` branch is a stable version of our new architecture, which features: - - Dramatic improvement in the speed of large scale data collection (10,000+ repos). All data is obtained for 10k+ repos within a week + - Dramatic improvement in the speed of large scale data collection (100,000+ repos). All data is obtained for 100k+ repos within 2 weeks. - A new job management architecture that uses Celery and Redis to manage queues, and enables users to run a Flower job monitoring dashboard - Materialized views to increase the snappiness of API’s and Frontends on large scale data - Changes to primary keys, which now employ a UUID strategy that ensures unique keys across all Augur instances - - Support for https://github.com/chaoss/sandiego-rh dashboards (view a sample here: https://eightknot.osci.io/). (beautification coming soon!) + - Support for https://github.com/oss-aspen/8knot dashboards (view a sample here: https://eightknot.osci.io/). (beautification coming soon!) - Data collection completeness assurance enabled by a structured, relational data set that is easily compared with platform API Endpoints - The next release of the new version will include a hosted version of Augur where anyone can create an account and add repos “they care about”. If the hosted instance already has a requested organization or repository it will be added to a user’s view. If its a new repository or organization, the user will be notified that collection will take (time required for the scale of repositories added). diff --git a/augur/api/routes/pull_request_reports.py b/augur/api/routes/pull_request_reports.py index c5e936af5e..02f6e235cd 100644 --- a/augur/api/routes/pull_request_reports.py +++ b/augur/api/routes/pull_request_reports.py @@ -1345,10 +1345,8 @@ def mean_days_between_PR_comments(): plot_width = 950 p1 = figure(x_axis_type="datetime", - title="{}: Mean {} Between Comments by Month Closed for {} Pull Requests".format(repo_dict[repo_id], - time_unit, - description), - plot_width=plot_width, x_range=(pr_all[x_axis].min(), pr_all[x_axis].max()), plot_height=500, + title="{}: Mean {} Between Comments by Month Closed for {} Pull Requests".format(repo_dict[repo_id], time_unit, description), + plot_width=plot_width, x_range=(data_dict["All"][x_axis].min(), data_dict["All"][x_axis].max()), plot_height=500, toolbar_location=None) colors = Category20[10][6:] color_index = 0 @@ -1379,11 +1377,9 @@ def mean_days_between_PR_comments(): possible_maximums.append( max(driver_df_mean.loc[driver_df_mean[line_group] == line_group_value][y_axis].dropna())) for repo, num_outliers in num_outliers_repo_map.items(): - # FIXME repo_name is not defined - if repo_name == repo: - p1.add_layout( - Title(text="** {} outliers for {} were removed".format(num_outliers, repo), align="center"), - "below") + p1.add_layout( + Title(text="** {} outliers for {} were removed".format(num_outliers, repo), align="center"), + "below") p1.grid.grid_line_alpha = 0.3 p1.xaxis.axis_label = 'Month Closed' diff --git a/augur/api/view/init.py b/augur/api/view/init.py index fd98e12338..210dc60e08 100644 --- a/augur/api/view/init.py +++ b/augur/api/view/init.py @@ -1,6 +1,7 @@ from pathlib import Path from .server import Environment -import logging, sqlite3, secrets, hashlib, yaml +from augur.application.logs import AugurLogger +import logging, secrets, yaml env = Environment() @@ -52,7 +53,7 @@ def update_from(old): current_settings["version"] = version write_settings(current_settings) - logging.info(f"Configuration updated from {to_version_string(old)} to {to_version_string(version)}") + logger.info(f"Configuration updated from {to_version_string(old)} to {to_version_string(version)}") def compare_versions(old, new): if old["major"] < new["major"]: @@ -141,7 +142,5 @@ def compare_versions(old, new): # Initialize logging def init_logging(): - format = "%(asctime)s: %(message)s" global logger - logger = logging.getLogger("augur view") - logger.setLevel("DEBUG") + logger = AugurLogger("augur_view", reset_logfiles=True).get_logger() diff --git a/augur/api/view/utils.py b/augur/api/view/utils.py index e6926dc3b4..82ac3ea7de 100644 --- a/augur/api/view/utils.py +++ b/augur/api/view/utils.py @@ -1,10 +1,18 @@ from pathlib import Path from concurrent.futures import ThreadPoolExecutor -from flask import render_template, flash, url_for +from flask import render_template, flash, url_for, Flask from .init import * from .server import User +from ..server import app, db_session +from augur.application.config import AugurConfig import urllib.request, urllib.error, json, os, math, yaml, urllib3, time, logging, re +init_logging() + +from .init import logger + +config = AugurConfig(logger, db_session) + def parse_url(url): from urllib.parse import urlparse @@ -47,7 +55,7 @@ def is_status_ok(): if "status" in response: return request.url except Exception as e: - logging.error(f"Error during serving URL verification: {str(e)}") + logger.error(f"Error during serving URL verification: {str(e)}") return False @@ -88,9 +96,9 @@ def loadSettings(): if not configFilePath.is_file(): init_settings() with open(configFile, 'w') as file: - logging.info(f"Generating default configuration file: {configFile}") + logger.info(f"Generating default configuration file: {configFile}") yaml.dump(settings, file) - logging.info("Default configuration file successfully generated.") + logger.info("Default configuration file successfully generated.") else: with open(configFilePath) as file: settings = yaml.load(file, Loader=yaml.FullLoader) @@ -103,7 +111,7 @@ def loadSettings(): else: try: cachePath.mkdir(parents=True) - logging.info("cache directory initialized") + logger.info("cache directory initialized") except Exception as err: raise Exception(f"Cannot initialize caching: could not create cache directory [{cachePath}]") @@ -120,17 +128,16 @@ def loadSettings(): """ ---------------------------------------------------------------- """ -def getSetting(key): - if key == "serving": - return "http://127.0.0.1:5000/api/unstable" - return settings[key] - -init_logging() +def getSetting(key, section = "View"): + if section == "View": + if key == "serving": + return "http://127.0.0.1:5000/api/unstable" + return settings[key] + else: + return config.get_value(section, key) loadSettings() -from .init import logger - User.api = getSetting("serving") User.logger = logger @@ -149,16 +156,16 @@ def loadReports(): image['id'] = id = id + 1 return True except Exception as err: - logging.error(f"An exception occurred reading reports endpoints from [{getSetting('reports')}]:") - logging.error(err) + logger.error(f"An exception occurred reading reports endpoints from [{getSetting('reports')}]:") + logger.error(err) try: with open(getSetting("reports"), 'w') as file: - logging.info("Attempting to generate default reports.yml") + logger.info("Attempting to generate default reports.yml") yaml.dump(reports, file) - logging.info("Default reports file successfully generated.") + logger.info("Default reports file successfully generated.") except Exception as ioErr: - logging.error("Error creating default report configuration:") - logging.error(ioErr) + logger.error("Error creating default report configuration:") + logger.error(ioErr) return False if not loadReports(): @@ -176,11 +183,11 @@ def cacheFileExists(filename): if(cache_file_age > getSetting('cache_expiry')): try: cache_file.unlink() - logging.info(f"Cache file {filename} removed due to expiry") + logger.info(f"Cache file {filename} removed due to expiry") return False except Exception as e: - logging.error("Error: cache file age exceeds expiry limit, but an exception occurred while attempting to remove") - logging.error(e) + logger.error("Error: cache file age exceeds expiry limit, but an exception occurred while attempting to remove") + logger.error(e) return True else: return False @@ -220,7 +227,7 @@ def toCacheURL(endpoint): def requestJson(endpoint, cached = True): filename = toCacheFilepath(endpoint) requestURL = getSetting('serving') + "/" + endpoint - logging.info(f'requesting json from: {endpoint}') + logger.info(f'requesting json from: {endpoint}') try: if cached and cacheFileExists(filename): with open(filename) as f: @@ -239,8 +246,8 @@ def requestJson(endpoint, cached = True): cache_files_requested.remove(filename) return data except Exception as err: - logging.error("An exception occurred while fulfilling a json request") - logging.error(err) + logger.error("An exception occurred while fulfilling a json request") + logger.error(err) return False, str(err) """ ---------------------------------------------------------------- @@ -257,8 +264,8 @@ def requestPNG(endpoint): cache_files_requested.remove(filename) return toCacheURL(endpoint) except Exception as err: - logging.error("An exception occurred while fulfilling a png request") - logging.error(err) + logger.error("An exception occurred while fulfilling a png request") + logger.error(err) """ ---------------------------------------------------------------- """ @@ -269,20 +276,26 @@ def download(url, cmanager, filename, image_cache, image_id, repo_id = None): if cacheFileExists(filename): image_cache[image_id]['exists'] = True return - response = cmanager.request('GET', url) + try: + response = cmanager.request('GET', url) + except Exception as e: + logger.error("Could not make request: " + str(e)) + raise e + if "json" in response.headers['Content-Type']: - logging.warn(f"repo {repo_id}: unexpected json response in image request") - logging.warn(f" response: {response.data.decode('utf-8')}") + logger.warn(f"repo {repo_id}: unexpected json response in image request") + logger.warn(f" response: {response.data.decode('utf-8')}") image_cache[image_id]['exists'] = False return if response and response.status == 200: image_cache[image_id]['exists'] = True try: with open(filename, 'wb') as f: + logger.info("Writing image: " + filename) f.write(response.data) except Exception as err: - logging.error("An exception occurred writing a cache file to disk") - logging.error(err) + logger.error("An exception occurred writing a cache file to disk") + logger.error(err) """ ---------------------------------------------------------------- """ @@ -295,6 +308,9 @@ def requestReports(repo_id): report_requests[repo_id] = {} report_requests[repo_id]['complete'] = False + host = getSetting("host", "Server") + port = getSetting("port", "Server") + """ ---------- If the report definition could not be loaded, we cannot determine what files to request from the backend to compose the report. Returning here @@ -319,7 +335,7 @@ def requestReports(repo_id): # Where should the downloaded image be stored (in cache) filename = toCacheFilename(f"{image['url']}?repo_id={repo_id}") # Where are we downloading the image from - image_url = url_for(image['url'], repo_id = repo_id) + image_url = f"{host}:{port}" + url_for(image['url'], repo_id = repo_id) # f"{getSetting('serving')}/{image['url']}?repo_id={repo_id}" # Add a request for this image to the thread pool using the download function diff --git a/augur/application/cli/backend.py b/augur/application/cli/backend.py index 008afe8b26..dfbc8281ff 100644 --- a/augur/application/cli/backend.py +++ b/augur/application/cli/backend.py @@ -93,7 +93,7 @@ def start(disable_collection, development, port): time.sleep(3) logger.info('Gunicorn webserver started...') - logger.info(f'Augur is running at: http://127.0.0.1:{port}') + logger.info(f'Augur is running at: {"http" if development else "https"}://{host}:{port}') scheduling_worker_process = None core_worker_process = None diff --git a/augur/application/db/models/augur_data.py b/augur/application/db/models/augur_data.py index 5365dbf37f..e1234cda2d 100644 --- a/augur/application/db/models/augur_data.py +++ b/augur/application/db/models/augur_data.py @@ -858,7 +858,7 @@ def get_by_repo_git(session, repo_git): return session.query(Repo).filter(Repo.repo_git == repo_git).first() @staticmethod - def is_valid_github_repo(session, url: str) -> bool: + def is_valid_github_repo(gh_session, url: str) -> bool: """Determine whether repo url is valid. Args: @@ -871,7 +871,7 @@ def is_valid_github_repo(session, url: str) -> bool: REPO_ENDPOINT = "https://api.github.com/repos/{}/{}" - if not session.oauths.list_of_keys: + if not gh_session.oauths.list_of_keys: return False, {"status": "No valid github api keys to retrieve data with"} owner, repo = Repo.parse_github_repo_url(url) @@ -882,7 +882,7 @@ def is_valid_github_repo(session, url: str) -> bool: attempts = 0 while attempts < 10: - result = hit_api(session.oauths, url, logger) + result = hit_api(gh_session.oauths, url, logger) # if result is None try again if not result: diff --git a/augur/application/db/models/augur_operations.py b/augur/application/db/models/augur_operations.py index ab7c7d45cd..bba812c947 100644 --- a/augur/application/db/models/augur_operations.py +++ b/augur/application/db/models/augur_operations.py @@ -9,12 +9,14 @@ import logging import secrets +import importlib -from augur.application.db.models import Repo +from augur.application.db.models import Repo, RepoGroup from augur.application.db.session import DatabaseSession from augur.application.db.models.base import Base -DEFAULT_REPO_GROUP_ID = 1 +schema_6_revision = importlib.import_module('augur.application.schema.alembic.versions.6_add_repo_group_for_frontend_repos') +FRONTEND_REPO_GROUP_NAME = schema_6_revision.repo_group_name logger = logging.getLogger(__name__) def retrieve_org_repos(session, url: str) -> List[str]: @@ -410,8 +412,10 @@ def remove_group(self, group_name): return result def add_repo(self, group_name, repo_url): + + from augur.tasks.github.util.github_task_session import GithubTaskSession - with DatabaseSession(logger) as session: + with GithubTaskSession(logger) as session: result = UserRepo.add(session, repo_url, self.user_id, group_name) return result @@ -425,7 +429,9 @@ def remove_repo(self, session, group_name, repo_id): def add_org(self, group_name, org_url): - with DatabaseSession(logger) as session: + from augur.tasks.github.util.github_task_session import GithubTaskSession + + with GithubTaskSession(logger) as session: result = UserRepo.add_org_repos(session, org_url, self.user_id, group_name) return result @@ -724,7 +730,11 @@ def add(session, url: List[str], user_id: int, group_name=None, group_id=None, v if not result[0]: return False, {"status": result[1]["status"], "repo_url": url} - repo_id = Repo.insert(session, url, DEFAULT_REPO_GROUP_ID, "Frontend") + frontend_repo_group = session.query(RepoGroup).filter(RepoGroup.rg_name == FRONTEND_REPO_GROUP_NAME).first() + if not frontend_repo_group: + return False, {"status": "Could not find repo group with name 'Frontend Repos'", "repo_url": url} + + repo_id = Repo.insert(session, url, frontend_repo_group.repo_group_id, "Frontend") if not repo_id: return False, {"status": "Repo insertion failed", "repo_url": url} diff --git a/augur/application/schema/alembic/versions/6_add_repo_group_for_frontend_repos.py b/augur/application/schema/alembic/versions/6_add_repo_group_for_frontend_repos.py new file mode 100644 index 0000000000..eebc821d85 --- /dev/null +++ b/augur/application/schema/alembic/versions/6_add_repo_group_for_frontend_repos.py @@ -0,0 +1,36 @@ +"""Add repo group for frontend repos + +Revision ID: 6 +Revises: 5 +Create Date: 2023-02-17 14:52:35.095070 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql +from sqlalchemy.sql import text + + +# revision identifiers, used by Alembic. +revision = '6' +down_revision = '5' +branch_labels = None +depends_on = None + +repo_group_name = "Frontend Repos" + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + conn = op.get_bind() + conn.execute(f"""INSERT INTO "augur_data"."repo_groups" ("rg_name", "rg_description", "rg_website", "rg_recache", "rg_last_modified", "rg_type", "tool_source", "tool_version", "data_source", "data_collection_date") VALUES ('{repo_group_name}', 'DO NOT DELETE OR FRONTEND REPOS WILL BREAK', '', 0, '2023-02-17 15:00:00', NULL, NULL, NULL, NULL, NULL);""") + + + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + conn = op.get_bind() + conn.execute(text(f"""DELETE FROM "augur_data"."repo_groups" WHERE rg_name='{repo_group_name}';""")) + # ### end Alembic commands ### diff --git a/augur/tasks/data_analysis/clustering_worker/tasks.py b/augur/tasks/data_analysis/clustering_worker/tasks.py index be3d199637..3e2398294e 100644 --- a/augur/tasks/data_analysis/clustering_worker/tasks.py +++ b/augur/tasks/data_analysis/clustering_worker/tasks.py @@ -23,7 +23,6 @@ from augur.application.db.session import DatabaseSession from augur.application.config import AugurConfig from augur.application.db.models import Repo, RepoClusterMessage, RepoTopic, TopicWord -from augur.application.db.engine import DatabaseEngine from augur.application.db.util import execute_session_query @@ -42,11 +41,11 @@ def clustering_task(): repos = execute_session_query(query, 'all') - for repo in repos: - clustering_model(repo.repo_git, logger, engine) + for repo in repos: + clustering_model(repo.repo_git, logger, engine, session) -def clustering_model(repo_git: str,logger,engine) -> None: +def clustering_model(repo_git: str,logger,engine, session) -> None: logger.info(f"Starting clustering analysis for {repo_git}") @@ -62,19 +61,17 @@ def clustering_model(repo_git: str,logger,engine) -> None: tool_version = '0.2.0' data_source = 'Augur Collected Messages' - with DatabaseSession(logger, engine) as session: - - config = AugurConfig(logger, session) + config = AugurConfig(logger, session) - query = session.query(Repo).filter(Repo.repo_git == repo_git) - repo_id = execute_session_query(query, 'one').repo_id + query = session.query(Repo).filter(Repo.repo_git == repo_git) + repo_id = execute_session_query(query, 'one').repo_id - num_clusters = config.get_value("Clustering_Task", 'num_clusters') - max_df = config.get_value("Clustering_Task", 'max_df') - max_features = config.get_value("Clustering_Task", 'max_features') - min_df = config.get_value("Clustering_Task", 'min_df') + num_clusters = config.get_value("Clustering_Task", 'num_clusters') + max_df = config.get_value("Clustering_Task", 'max_df') + max_features = config.get_value("Clustering_Task", 'max_features') + min_df = config.get_value("Clustering_Task", 'min_df') - logger.info(f"Min df: {min_df}. Max df: {max_df}") + logger.info(f"Min df: {min_df}. Max df: {max_df}") logger.info("If you did not install NLTK libraries when you installed Augur, this will fail. ") #nltk.download('all') @@ -124,15 +121,14 @@ def clustering_model(repo_git: str,logger,engine) -> None: """ ) # result = db.execute(delete_points_SQL, repo_id=repo_id, min_date=min_date) - with DatabaseEngine(connection_pool_size=1) as engine: - msg_df_cur_repo = pd.read_sql(get_messages_for_repo_sql, engine, params={"repo_id": repo_id}) + msg_df_cur_repo = pd.read_sql(get_messages_for_repo_sql, engine, params={"repo_id": repo_id}) logger.info(msg_df_cur_repo.head()) logger.debug(f"Repo message df size: {len(msg_df_cur_repo.index)}") # check if dumped pickle file exists, if exists no need to train the model if not os.path.exists(MODEL_FILE_NAME): logger.info("clustering model not trained. Training the model.........") - train_model(logger, max_df, min_df, max_features, ngram_range, num_clusters, num_topics, num_words_per_topic, tool_source, tool_version, data_source) + train_model(logger, engine, session, max_df, min_df, max_features, ngram_range, num_clusters, num_topics, num_words_per_topic, tool_source, tool_version, data_source) else: model_stats = os.stat(MODEL_FILE_NAME) model_age = (time.time() - model_stats.st_mtime) @@ -140,7 +136,7 @@ def clustering_model(repo_git: str,logger,engine) -> None: logger.debug(f'model age is: {model_age}') if model_age > 2000000: logger.info("clustering model to old. Retraining the model.........") - train_model(logger, max_df, min_df, max_features, ngram_range, num_clusters, num_topics, num_words_per_topic, tool_source, tool_version, data_source) + train_model(logger, engine, session, max_df, min_df, max_features, ngram_range, num_clusters, num_topics, num_words_per_topic, tool_source, tool_version, data_source) else: logger.info("using pre-trained clustering model....") @@ -180,14 +176,13 @@ def clustering_model(repo_git: str,logger,engine) -> None: 'tool_version': tool_version, 'data_source': data_source } - with DatabaseSession(logger, engine) as session: - repo_cluster_messages_obj = RepoClusterMessage(**record) - session.add(repo_cluster_messages_obj) - session.commit() + repo_cluster_messages_obj = RepoClusterMessage(**record) + session.add(repo_cluster_messages_obj) + session.commit() - # result = db.execute(repo_cluster_messages_table.insert().values(record)) - logging.info( - "Primary key inserted into the repo_cluster_messages table: {}".format(repo_cluster_messages_obj.msg_cluster_id)) + # result = db.execute(repo_cluster_messages_table.insert().values(record)) + logging.info( + "Primary key inserted into the repo_cluster_messages table: {}".format(repo_cluster_messages_obj.msg_cluster_id)) try: logger.debug('pickling') lda_model = pickle.load(open("lda_model", "rb")) @@ -208,22 +203,21 @@ def clustering_model(repo_git: str,logger,engine) -> None: prediction = lda_model.transform(count_matrix_cur_repo) logger.debug('for loop for vocab') - with DatabaseSession(logger, engine) as session: - for i, prob_vector in enumerate(prediction): - # repo_id = msg_df.loc[i]['repo_id'] - for i, prob in enumerate(prob_vector): - record = { - 'repo_id': int(repo_id), - 'topic_id': i + 1, - 'topic_prob': prob, - 'tool_source': tool_source, - 'tool_version': tool_version, - 'data_source': data_source - } - - repo_topic_object = RepoTopic(**record) - session.add(repo_topic_object) - session.commit() + for i, prob_vector in enumerate(prediction): + # repo_id = msg_df.loc[i]['repo_id'] + for i, prob in enumerate(prob_vector): + record = { + 'repo_id': int(repo_id), + 'topic_id': i + 1, + 'topic_prob': prob, + 'tool_source': tool_source, + 'tool_version': tool_version, + 'data_source': data_source + } + + repo_topic_object = RepoTopic(**record) + session.add(repo_topic_object) + session.commit() # result = db.execute(repo_topic_table.insert().values(record)) except Exception as e: @@ -271,7 +265,7 @@ def preprocess_and_tokenize(text): stems = [stemmer.stem(t) for t in tokens] return stems -def train_model(logger, max_df, min_df, max_features, ngram_range, num_clusters, num_topics, num_words_per_topic, tool_source, tool_version, data_source): +def train_model(logger, engine, session, max_df, min_df, max_features, ngram_range, num_clusters, num_topics, num_words_per_topic, tool_source, tool_version, data_source): def visualize_labels_PCA(features, labels, annotations, num_components, title): labels_color_map = {-1: "red"} for label in labels: @@ -314,8 +308,7 @@ def visualize_labels_PCA(features, labels, annotations, num_components, title): AND prmr.msg_id=m.msg_id """ ) - with DatabaseEngine(connection_pool_size=1) as engine: - msg_df_all = pd.read_sql(get_messages_sql, engine, params={}) + msg_df_all = pd.read_sql(get_messages_sql, engine, params={}) # select only highly active repos logger.debug("Selecting highly active repos") @@ -382,33 +375,32 @@ def visualize_labels_PCA(features, labels, annotations, num_components, title): # twid = self.db.execute(key_sequence_words_sql) # logger.info("twid variable is: {}".format(twid)) # insert topic list into database - with DatabaseSession(logger, engine) as session: - topic_id = 1 - for topic in topic_list: - # twid = self.get_max_id('topic_words', 'topic_words_id') + 1 + topic_id = 1 + for topic in topic_list: + # twid = self.get_max_id('topic_words', 'topic_words_id') + 1 + # logger.info("twid variable is: {}".format(twid)) + for i in topic.argsort()[:-num_words_per_topic - 1:-1]: + # twid+=1 + # logger.info("in loop incremented twid variable is: {}".format(twid)) # logger.info("twid variable is: {}".format(twid)) - for i in topic.argsort()[:-num_words_per_topic - 1:-1]: - # twid+=1 - # logger.info("in loop incremented twid variable is: {}".format(twid)) - # logger.info("twid variable is: {}".format(twid)) - record = { - # 'topic_words_id': twid, - # 'word_prob': word_prob[i], - 'topic_id': int(topic_id), - 'word': feature_names[i], - 'tool_source': tool_source, - 'tool_version': tool_version, - 'data_source': data_source - } - - topic_word_obj = TopicWord(**record) - session.add(topic_word_obj) - session.commit() - - # result = db.execute(topic_words_table.insert().values(record)) - logger.info( - "Primary key inserted into the topic_words table: {}".format(topic_word_obj.topic_words_id)) - topic_id += 1 + record = { + # 'topic_words_id': twid, + # 'word_prob': word_prob[i], + 'topic_id': int(topic_id), + 'word': feature_names[i], + 'tool_source': tool_source, + 'tool_version': tool_version, + 'data_source': data_source + } + + topic_word_obj = TopicWord(**record) + session.add(topic_word_obj) + session.commit() + + # result = db.execute(topic_words_table.insert().values(record)) + logger.info( + "Primary key inserted into the topic_words table: {}".format(topic_word_obj.topic_words_id)) + topic_id += 1 # insert topic list into database diff --git a/augur/tasks/data_analysis/contributor_breadth_worker/contributor_breadth_worker.py b/augur/tasks/data_analysis/contributor_breadth_worker/contributor_breadth_worker.py index 1695dc935b..f8b6a9a585 100644 --- a/augur/tasks/data_analysis/contributor_breadth_worker/contributor_breadth_worker.py +++ b/augur/tasks/data_analysis/contributor_breadth_worker/contributor_breadth_worker.py @@ -7,7 +7,6 @@ from augur.application.db.session import DatabaseSession from augur.tasks.github.util.github_paginator import GithubPaginator from augur.application.db.models import ContributorRepo -from augur.application.db.engine import DatabaseEngine ### This worker scans all the platform users in Augur, and pulls their platform activity ### logs. Those are then used to analyze what repos each is working in (which will include repos not @@ -87,8 +86,7 @@ def contributor_breadth_model() -> None: WHERE 1 = 1 """) - with DatabaseEngine(connection_pool_size=1) as engine: - current_event_ids = json.loads(pd.read_sql(dup_query, engine, params={}).to_json(orient="records")) + current_event_ids = json.loads(pd.read_sql(dup_query, engine, params={}).to_json(orient="records")) #Convert list of dictionaries to regular list of 'event_ids'. #The only values that the sql query returns are event_ids so diff --git a/augur/tasks/data_analysis/discourse_analysis/tasks.py b/augur/tasks/data_analysis/discourse_analysis/tasks.py index 57ae59d77a..ba4a94b859 100644 --- a/augur/tasks/data_analysis/discourse_analysis/tasks.py +++ b/augur/tasks/data_analysis/discourse_analysis/tasks.py @@ -10,7 +10,6 @@ from augur.tasks.init.celery_app import celery_app as celery from augur.application.db.session import DatabaseSession from augur.application.db.models import Repo, DiscourseInsight -from augur.application.db.engine import DatabaseEngine from augur.application.db.util import execute_session_query #import os, sys, time, requests, json diff --git a/augur/tasks/data_analysis/insight_worker/tasks.py b/augur/tasks/data_analysis/insight_worker/tasks.py index 71470a9cb3..bae95faf55 100644 --- a/augur/tasks/data_analysis/insight_worker/tasks.py +++ b/augur/tasks/data_analysis/insight_worker/tasks.py @@ -17,7 +17,6 @@ from augur.application.db.session import DatabaseSession from augur.application.config import AugurConfig from augur.application.db.models import Repo, ChaossMetricStatus, RepoInsight, RepoInsightsRecord -from augur.application.db.engine import DatabaseEngine from augur.application.db.util import execute_session_query warnings.filterwarnings('ignore') @@ -34,11 +33,11 @@ def insight_task(): repos = execute_session_query(query, 'all') - for repo in repos: - insight_model(repo.repo_git, logger, engine) + for repo in repos: + insight_model(repo.repo_git, logger, engine, session) -def insight_model(repo_git: str,logger,engine) -> None: +def insight_model(repo_git: str,logger,engine,session) -> None: refresh = True send_insights = True @@ -49,19 +48,17 @@ def insight_model(repo_git: str,logger,engine) -> None: metrics = {"issues-new": "issues", "code-changes": "commit_count", "code-changes-lines": "added", "reviews": "pull_requests", "contributors-new": "new_contributors"} - with DatabaseSession(logger, engine) as session: - - config = AugurConfig(logger, session) + config = AugurConfig(logger, session) - query = session.query(Repo).filter(Repo.repo_git == repo_git) - repo_id = execute_session_query(query, 'one').repo_id + query = session.query(Repo).filter(Repo.repo_git == repo_git) + repo_id = execute_session_query(query, 'one').repo_id - anomaly_days = config.get_value('Insight_Task', 'anomaly_days') - training_days = config.get_value('Insight_Task', 'training_days') - contamination = config.get_value('Insight_Task', 'contamination') - confidence = config.get_value('Insight_Task', 'confidence_interval') / 100 - api_host = config.get_value('Server', 'host') - api_port = config.get_value('Server', 'port') + anomaly_days = config.get_value('Insight_Task', 'anomaly_days') + training_days = config.get_value('Insight_Task', 'training_days') + contamination = config.get_value('Insight_Task', 'contamination') + confidence = config.get_value('Insight_Task', 'confidence_interval') / 100 + api_host = config.get_value('Server', 'host') + api_port = config.get_value('Server', 'port') logger.info("Discovering insights for repo {}\n".format(repo_git)) @@ -314,12 +311,10 @@ def classify_anomalies(df, metric): logger.info("error occurred while storing datapoint: {}\n".format(repr(e))) break -def confidence_interval_insights(logger): +def confidence_interval_insights(logger, engine): """ Anomaly detection method based on confidence intervals """ - from augur.tasks.init.celery_app import engine - # Update table of endpoints before we query them all logger.info("Discovering insights for task with entry info: {}".format(entry_info)) @@ -479,10 +474,9 @@ def is_unique_key(key): "data_source": data_source } - with DatabaseSession(logger, engine) as session: - repo_insight_obj = RepoInsightsRecord(**record) - session.add(repo_insight_obj) - session.commit() + repo_insight_obj = RepoInsightsRecord(**record) + session.add(repo_insight_obj) + session.commit() logger.info("Primary key inserted into the repo_insights_records table: {}\n".format(repo_insight_obj.ri_id)) @@ -510,10 +504,9 @@ def is_unique_key(key): "tool_version": tool_version, "data_source": data_source } - with DatabaseSession(logger, engine) as session: - repo_insight_obj = RepoInsight(**data_point) - session.add(repo_insight_obj) - session.commit() + repo_insight_obj = RepoInsight(**data_point) + session.add(repo_insight_obj) + session.commit() logger.info("Primary key inserted into the repo_insights table: " + str( repo_insight_obj.ri_id)) @@ -530,7 +523,7 @@ def is_unique_key(key): else: logger.info("Key: {} has empty raw_values, should not have key here".format(key)) -def send_insight(insight, units_from_mean, logger): +def send_insight(insight, units_from_mean, logger, engine): try: repoSQL = s.sql.text(""" SELECT repo_git, rg_name @@ -538,8 +531,7 @@ def send_insight(insight, units_from_mean, logger): WHERE repo_id = {} """.format(insight['repo_id'])) - with DatabaseEngine(connection_pool_size=1) as engine: - repo = pd.read_sql(repoSQL, engine, params={}).iloc[0] + repo = pd.read_sql(repoSQL, engine, params={}).iloc[0] begin_date = datetime.datetime.now() - datetime.timedelta(days=anomaly_days) dict_date = insight['ri_date'].strftime("%Y-%m-%d %H:%M:%S") @@ -577,8 +569,7 @@ def clear_insights(repo_id, new_endpoint, new_field, logger): AND ri_field = '{}' """.format(repo_id, new_endpoint, new_field) try: - with DatabaseEngine(1) as engine: - result = engine.execute(deleteSQL) + result = engine.execute(deleteSQL) except Exception as e: logger.info("Error occured deleting insight slot: {}".format(e)) @@ -595,8 +586,7 @@ def clear_insights(repo_id, new_endpoint, new_field, logger): AND ri_field = '{}' """.format(repo_id, new_endpoint, new_field) try: - with DatabaseEngine(connection_pool_size=1) as engine: - result = engine.execute(deleteSQL) + result = engine.execute(deleteSQL) except Exception as e: logger.info("Error occured deleting insight slot: {}".format(e)) @@ -616,8 +606,7 @@ def clear_insight(repo_id, new_score, new_metric, new_field, logger): AND ri_field = '{}' ORDER BY ri_score DESC """.format(repo_id, new_metric, new_field)) - with DatabaseEngine(connection_pool_size=1) as engine: - rec = json.loads(pd.read_sql(recordSQL, engine, params={}).to_json(orient='records')) + rec = json.loads(pd.read_sql(recordSQL, engine, params={}).to_json(orient='records')) logger.info("recordsql: {}, \n{}".format(recordSQL, rec)) # If new score is higher, continue with deletion if len(rec) > 0: @@ -638,8 +627,7 @@ def clear_insight(repo_id, new_score, new_metric, new_field, logger): AND ri_field = '{}' """.format(record['repo_id'], record['ri_metric'], record['ri_field']) try: - with DatabaseEngine(connection_pool_size=1) as engine: - result = engine.execute(deleteSQL) + result = engine.execute(deleteSQL) except Exception as e: logger.info("Error occured deleting insight slot: {}".format(e)) else: @@ -653,8 +641,7 @@ def clear_insight(repo_id, new_score, new_metric, new_field, logger): WHERE repo_id = {} ORDER BY ri_score ASC """.format(repo_id)) - with DatabaseEngine(connection_pool_size=1) as engine: - ins = json.loads(pd.read_sql(insightSQL, engine, params={}).to_json(orient='records')) + ins = json.loads(pd.read_sql(insightSQL, engine, params={}).to_json(orient='records')) logger.info("This repos insights: {}".format(ins)) # Determine if inisghts need to be deleted based on if there are more insights than we want stored, @@ -692,8 +679,7 @@ def clear_insight(repo_id, new_score, new_metric, new_field, logger): AND ri_metric = '{}' """.format(insight['repo_id'], insight['ri_metric']) try: - with DatabaseEngine(connection_pool_size=1) as engine: - result = engine.execute(deleteSQL) + result = engine.execute(deleteSQL) except Exception as e: logger.info("Error occured deleting insight slot: {}".format(e)) @@ -710,9 +696,7 @@ def confidence_interval(data, logger, timeperiod='week', confidence=.95, ): logger.info("H: {}".format(h)) return m, m - h, m + h -def update_metrics(api_host, api_port, tool_source, tool_version, logger): - - from augur.tasks.init.celery_app import engine +def update_metrics(api_host, api_port, tool_source, tool_version, logger, session, engine): logger.info("Preparing to update metrics ...\n\n" + "Hitting endpoint: http://{}:{}/api/unstable/metrics/status ...\n".format( @@ -725,7 +709,7 @@ def update_metrics(api_host, api_port, tool_source, tool_version, logger): # Duplicate checking ... need_insertion = filter_duplicates({'cm_api_endpoint_repo': "endpoint"}, ['chaoss_metric_status'], - active_metrics, logger) + active_metrics, logger, engine) logger.info("Count of contributors needing insertion: " + str(len(need_insertion)) + "\n") for metric in need_insertion: @@ -745,16 +729,15 @@ def update_metrics(api_host, api_port, tool_source, tool_version, logger): "tool_version": tool_version, "data_source": metric['data_source'] } - with DatabaseSession(logger, engine) as session: - cms_tuple = ChaossMetricStatus(**cms_tuple) - session.add(cms_tuple) - session.commit() + cms_tuple = ChaossMetricStatus(**cms_tuple) + session.add(cms_tuple) + session.commit() - logger.info("Primary key inserted into the metrics table: {}\n".format(cms_tuple.cms_id)) + logger.info("Primary key inserted into the metrics table: {}\n".format(cms_tuple.cms_id)) logger.info("Inserted metric: " + metric['display_name'] + "\n") -def filter_duplicates(cols, tables, og_data, logger): +def filter_duplicates(cols, tables, og_data, logger, engine): need_insertion = [] table_str = tables[0] @@ -765,8 +748,7 @@ def filter_duplicates(cols, tables, og_data, logger): colSQL = s.sql.text(""" SELECT {} FROM {} """.format(col, table_str)) - with DatabaseEngine(connection_pool_size=1) as engine: - values = pd.read_sql(colSQL, engine, params={}) + values = pd.read_sql(colSQL, engine, params={}) for obj in og_data: if values.isin([obj[cols[col]]]).any().any(): diff --git a/augur/tasks/data_analysis/message_insights/tasks.py b/augur/tasks/data_analysis/message_insights/tasks.py index 2339ee0e92..689ce54355 100644 --- a/augur/tasks/data_analysis/message_insights/tasks.py +++ b/augur/tasks/data_analysis/message_insights/tasks.py @@ -15,7 +15,6 @@ from augur.application.db.session import DatabaseSession from augur.application.config import AugurConfig from augur.application.db.models import Repo, MessageAnalysis, MessageAnalysisSummary -from augur.application.db.engine import DatabaseEngine from augur.application.db.util import execute_session_query #SPDX-License-Identifier: MIT @@ -33,12 +32,12 @@ def message_insight_task(): repos = execute_session_query(query, 'all') - for repo in repos: - message_insight_model(repo.repo_git, logger, engine) + for repo in repos: + message_insight_model(repo.repo_git, logger, engine, session) -def message_insight_model(repo_git: str,logger,engine) -> None: +def message_insight_model(repo_git: str,logger,engine, session) -> None: full_train = True begin_date = '' @@ -50,15 +49,13 @@ def message_insight_model(repo_git: str,logger,engine) -> None: now = datetime.datetime.utcnow() run_id = int(now.timestamp())+5 - with DatabaseSession(logger, engine) as session: - - config = AugurConfig(logger, session) + config = AugurConfig(logger, session) - query = session.query(Repo).filter(Repo.repo_git == repo_git) - repo_id = execute_session_query(query, 'one').repo_id + query = session.query(Repo).filter(Repo.repo_git == repo_git) + repo_id = execute_session_query(query, 'one').repo_id - models_dir = os.path.join(ROOT_AUGUR_DIRECTORY, "tasks", "data_analysis", "message_insights", config.get_value("Message_Insights", 'models_dir')) - insight_days = config.get_value("Message_Insights", 'insight_days') + models_dir = os.path.join(ROOT_AUGUR_DIRECTORY, "tasks", "data_analysis", "message_insights", config.get_value("Message_Insights", 'models_dir')) + insight_days = config.get_value("Message_Insights", 'insight_days') # Any initial database instructions, like finding the last tuple inserted or generate the next ID value @@ -100,7 +97,7 @@ def message_insight_model(repo_git: str,logger,engine) -> None: logger.debug(f'{begin_date}') # Assign new run_id for every run - run_id = get_max_id('message_analysis', 'worker_run_id', logger) + run_id = get_max_id('message_analysis', 'worker_run_id', logger, engine) logger.info(f'Last analyzed msg_id of repo {repo_id} is {df_past["msg_id"].iloc[-1]}') logger.info(f'Fetching recent messages from {begin_date} of repo {repo_id}...\n') @@ -198,34 +195,32 @@ def message_insight_model(repo_git: str,logger,engine) -> None: logger.info('Begin message_analysis data insertion...') logger.info(f'{df_message.shape[0]} data records to be inserted') - with DatabaseSession(logger, engine) as session: - - for row in df_message.itertuples(index=False): - try: - msg = { - "msg_id": row.msg_id, - "worker_run_id": run_id, - "sentiment_score": row.sentiment_score, - "reconstruction_error": row.rec_err, - "novelty_flag": row.novel_label, - "feedback_flag": None, - "tool_source": tool_source, - "tool_version": tool_version, - "data_source": data_source, - } - - message_analysis_object = MessageAnalysis(**msg) - session.add(message_analysis_object) - session.commit() - - # result = create_database_engine().execute(message_analysis_table.insert().values(msg)) - logger.info( - f'Primary key inserted into the message_analysis table: {message_analysis_object.msg_analysis_id}') - # logger.info( - # f'Inserted data point {results_counter} with msg_id {row.msg_id} and timestamp {row.msg_timestamp}') - except Exception as e: - logger.error(f'Error occurred while storing datapoint {repr(e)}') - break + for row in df_message.itertuples(index=False): + try: + msg = { + "msg_id": row.msg_id, + "worker_run_id": run_id, + "sentiment_score": row.sentiment_score, + "reconstruction_error": row.rec_err, + "novelty_flag": row.novel_label, + "feedback_flag": None, + "tool_source": tool_source, + "tool_version": tool_version, + "data_source": data_source, + } + + message_analysis_object = MessageAnalysis(**msg) + session.add(message_analysis_object) + session.commit() + + # result = create_database_engine().execute(message_analysis_table.insert().values(msg)) + logger.info( + f'Primary key inserted into the message_analysis table: {message_analysis_object.msg_analysis_id}') + # logger.info( + # f'Inserted data point {results_counter} with msg_id {row.msg_id} and timestamp {row.msg_timestamp}') + except Exception as e: + logger.error(f'Error occurred while storing datapoint {repr(e)}') + break logger.info('Data insertion completed\n') @@ -406,7 +401,7 @@ def message_insight_model(repo_git: str,logger,engine) -> None: logger.info(f'Insights dict: {insights}') # Send insights to Auggie - send_insight(repo_id, insights, logger) + send_insight(repo_id, insights, logger, engine) else: if df_message.empty: @@ -415,7 +410,7 @@ def message_insight_model(repo_git: str,logger,engine) -> None: logger.warning("Insufficient data to analyze") -def send_insight(repo_id, insights, logger): +def send_insight(repo_id, insights, logger, engine): try: repoSQL = s.sql.text(""" SELECT repo_git, rg_name @@ -423,8 +418,7 @@ def send_insight(repo_id, insights, logger): WHERE repo_id = {} """.format(repo_id)) - with DatabaseEngine(connection_pool_size=1) as engine: - repo = pd.read_sql(repoSQL, engine, params={}).iloc[0] + repo = pd.read_sql(repoSQL, engine, params={}).iloc[0] to_send = { 'message_insight': True, @@ -443,7 +437,7 @@ def send_insight(repo_id, insights, logger): logger.info("Sending insight to Auggie failed: {}\n".format(e)) -def get_max_id(table, column, logger, default=25150): +def get_max_id(table, column, logger, engine, default=25150): """ Gets the max value (usually used for id/pk's) of any Integer column of any table @@ -459,8 +453,7 @@ def get_max_id(table, column, logger, default=25150): SELECT max({0}.{1}) AS {1} FROM {0} """.format(table, column)) - with DatabaseEngine(connection_pool_size=1) as engine: - rs = pd.read_sql(max_id_sql, engine, params={}) + rs = pd.read_sql(max_id_sql, engine, params={}) if rs.iloc[0][column] is not None: max_id = int(rs.iloc[0][column]) + 1 logger.info("Found max id for {} column in the {} table: {}\n".format(column, table, max_id)) diff --git a/augur/tasks/data_analysis/pull_request_analysis_worker/tasks.py b/augur/tasks/data_analysis/pull_request_analysis_worker/tasks.py index 2fe970311d..07810e8b3a 100644 --- a/augur/tasks/data_analysis/pull_request_analysis_worker/tasks.py +++ b/augur/tasks/data_analysis/pull_request_analysis_worker/tasks.py @@ -12,7 +12,6 @@ from augur.application.db.session import DatabaseSession from augur.application.config import AugurConfig from augur.application.db.models import Repo, PullRequestAnalysis -from augur.application.db.engine import DatabaseEngine from augur.application.db.util import execute_session_query # from sklearn.metrics import (confusion_matrix, f1_score, precision_score, recall_score) diff --git a/augur/tasks/git/facade_tasks.py b/augur/tasks/git/facade_tasks.py index 1b1a90f6dd..430f14fbf5 100644 --- a/augur/tasks/git/facade_tasks.py +++ b/augur/tasks/git/facade_tasks.py @@ -1,3 +1,5 @@ +#SPDX-License-Identifier: MIT + import sys import json import time @@ -47,8 +49,6 @@ @celery.task def facade_error_handler(request,exc,traceback): - from augur.tasks.init.celery_app import engine - logger = logging.getLogger(facade_error_handler.__name__) logger.error(f"Task {request.id} raised exception: {exc}! \n {traceback}") @@ -68,8 +68,6 @@ def facade_error_handler(request,exc,traceback): @celery.task def facade_analysis_init_facade_task(): - from augur.tasks.init.celery_app import engine - logger = logging.getLogger(facade_analysis_init_facade_task.__name__) with FacadeSession(logger) as session: session.update_status('Running analysis') @@ -91,8 +89,6 @@ def grab_comitters(repo_id,platform="github"): @celery.task def trim_commits_facade_task(repo_id): - from augur.tasks.init.celery_app import engine - logger = logging.getLogger(trim_commits_facade_task.__name__) with FacadeSession(logger) as session: @@ -142,8 +138,6 @@ def update_analysis_log(repos_id,status): @celery.task def trim_commits_post_analysis_facade_task(repo_id): - from augur.tasks.init.celery_app import engine - logger = logging.getLogger(trim_commits_post_analysis_facade_task.__name__) @@ -230,18 +224,15 @@ def update_analysis_log(repos_id,status): @celery.task def facade_analysis_end_facade_task(): - from augur.tasks.init.celery_app import engine - logger = logging.getLogger(facade_analysis_end_facade_task.__name__) - FacadeSession(logger).log_activity('Info','Running analysis (complete)') + with FacadeSession(logger) as session: + session.log_activity('Info','Running analysis (complete)') @celery.task def facade_start_contrib_analysis_task(): - from augur.tasks.init.celery_app import engine - logger = logging.getLogger(facade_start_contrib_analysis_task.__name__) with FacadeSession(logger) as session: session.update_status('Updating Contributors') @@ -254,8 +245,6 @@ def analyze_commits_in_parallel(repo_id, multithreaded: bool)-> None: """Take a large list of commit data to analyze and store in the database. Meant to be run in parallel with other instances of this task. """ - from augur.tasks.init.celery_app import engine - #create new session for celery thread. logger = logging.getLogger(analyze_commits_in_parallel.__name__) with FacadeSession(logger) as session: @@ -343,8 +332,6 @@ def analyze_commits_in_parallel(repo_id, multithreaded: bool)-> None: @celery.task def nuke_affiliations_facade_task(): - from augur.tasks.init.celery_app import engine - logger = logging.getLogger(nuke_affiliations_facade_task.__name__) with FacadeSession(logger) as session: @@ -353,8 +340,6 @@ def nuke_affiliations_facade_task(): @celery.task def fill_empty_affiliations_facade_task(): - from augur.tasks.init.celery_app import engine - logger = logging.getLogger(fill_empty_affiliations_facade_task.__name__) with FacadeSession(logger) as session: fill_empty_affiliations(session) @@ -362,8 +347,6 @@ def fill_empty_affiliations_facade_task(): @celery.task def invalidate_caches_facade_task(): - from augur.tasks.init.celery_app import engine - logger = logging.getLogger(invalidate_caches_facade_task.__name__) with FacadeSession(logger) as session: @@ -372,8 +355,6 @@ def invalidate_caches_facade_task(): @celery.task def rebuild_unknown_affiliation_and_web_caches_facade_task(): - from augur.tasks.init.celery_app import engine - logger = logging.getLogger(rebuild_unknown_affiliation_and_web_caches_facade_task.__name__) with FacadeSession(logger) as session: @@ -382,8 +363,6 @@ def rebuild_unknown_affiliation_and_web_caches_facade_task(): @celery.task def force_repo_analysis_facade_task(repo_git): - from augur.tasks.init.celery_app import engine - logger = logging.getLogger(force_repo_analysis_facade_task.__name__) with FacadeSession(logger) as session: @@ -392,8 +371,6 @@ def force_repo_analysis_facade_task(repo_git): @celery.task def git_repo_cleanup_facade_task(repo_git): - from augur.tasks.init.celery_app import engine - logger = logging.getLogger(git_repo_cleanup_facade_task.__name__) with FacadeSession(logger) as session: @@ -402,8 +379,6 @@ def git_repo_cleanup_facade_task(repo_git): @celery.task def git_repo_initialize_facade_task(repo_git): - from augur.tasks.init.celery_app import engine - logger = logging.getLogger(git_repo_initialize_facade_task.__name__) with FacadeSession(logger) as session: @@ -412,8 +387,6 @@ def git_repo_initialize_facade_task(repo_git): @celery.task def check_for_repo_updates_facade_task(repo_git): - from augur.tasks.init.celery_app import engine - logger = logging.getLogger(check_for_repo_updates_facade_task.__name__) with FacadeSession(logger) as session: @@ -422,8 +395,6 @@ def check_for_repo_updates_facade_task(repo_git): @celery.task def force_repo_updates_facade_task(repo_git): - from augur.tasks.init.celery_app import engine - logger = logging.getLogger(force_repo_updates_facade_task.__name__) with FacadeSession(logger) as session: @@ -432,8 +403,6 @@ def force_repo_updates_facade_task(repo_git): @celery.task def git_repo_updates_facade_task(repo_git): - from augur.tasks.init.celery_app import engine - logger = logging.getLogger(git_repo_updates_facade_task.__name__) with FacadeSession(logger) as session: @@ -599,11 +568,21 @@ def generate_non_repo_domain_facade_tasks(logger): facade_sequence = [] if nuke_stored_affiliations: - facade_sequence.append(nuke_affiliations_facade_task.si().on_error(facade_error_handler.s()))#nuke_affiliations(session.cfg) + #facade_sequence.append(nuke_affiliations_facade_task.si().on_error(facade_error_handler.s()))#nuke_affiliations(session.cfg) + logger.info("Nuke stored affiliations is deprecated.") + # deprecated because the UI component of facade where affiliations would be + # nuked upon change no longer exists, and this information can easily be derived + # from queries and materialized views in the current version of Augur. + # This method is also a major performance bottleneck with little value. #session.logger.info(session.cfg) if not limited_run or (limited_run and fix_affiliations): - facade_sequence.append(fill_empty_affiliations_facade_task.si().on_error(facade_error_handler.s()))#fill_empty_affiliations(session) + #facade_sequence.append(fill_empty_affiliations_facade_task.si().on_error(facade_error_handler.s()))#fill_empty_affiliations(session) + logger.info("Fill empty affiliations is deprecated.") + # deprecated because the UI component of facade where affiliations would need + # to be fixed upon change no longer exists, and this information can easily be derived + # from queries and materialized views in the current version of Augur. + # This method is also a major performance bottleneck with little value. if force_invalidate_caches: facade_sequence.append(invalidate_caches_facade_task.si().on_error(facade_error_handler.s()))#invalidate_caches(session.cfg) diff --git a/augur/tasks/git/util/facade_worker/facade_worker/__init__.py b/augur/tasks/git/util/facade_worker/facade_worker/__init__.py index 697b69d329..1753083d3f 100644 --- a/augur/tasks/git/util/facade_worker/facade_worker/__init__.py +++ b/augur/tasks/git/util/facade_worker/facade_worker/__init__.py @@ -1,6 +1,6 @@ #SPDX-License-Identifier: MIT """augur_worker_github - Augur Worker that collects GitHub data""" -__version__ = '1.2.4' +__version__ = '1.3.0' __author__ = 'Augur Team ' __all__ = [] diff --git a/augur/tasks/git/util/facade_worker/facade_worker/facade01config.py b/augur/tasks/git/util/facade_worker/facade_worker/facade01config.py index a405aadcca..352d30f1c3 100644 --- a/augur/tasks/git/util/facade_worker/facade_worker/facade01config.py +++ b/augur/tasks/git/util/facade_worker/facade_worker/facade01config.py @@ -305,7 +305,7 @@ def __init__(self, logger: Logger): sys.exit(1) self.tool_source = '\'Facade \'' - self.tool_version = '\'1.2.4\'' + self.tool_version = '\'1.3.0\'' self.data_source = '\'Git Log\'' self.worker_options = worker_options diff --git a/augur/tasks/git/util/facade_worker/setup.py b/augur/tasks/git/util/facade_worker/setup.py index 5bc4f99af1..298baff49d 100644 --- a/augur/tasks/git/util/facade_worker/setup.py +++ b/augur/tasks/git/util/facade_worker/setup.py @@ -14,7 +14,7 @@ def read(filename): setup( name="facade_worker", - version="1.2.4", + version="1.3.0", url="https://github.com/chaoss/augur", license='MIT', author="Augurlabs", diff --git a/augur/tasks/github/contributors/tasks.py b/augur/tasks/github/contributors/tasks.py index 0304abca45..7de6a9ba37 100644 --- a/augur/tasks/github/contributors/tasks.py +++ b/augur/tasks/github/contributors/tasks.py @@ -45,7 +45,7 @@ def process_contributors(): url = f"https://api.github.com/users/{contributor_dict['cntrb_login']}" - data = retrieve_dict_data(url, session) + data = retrieve_dict_data(url, session.oauths, logger) if data is None: print(f"Unable to get contributor data for: {contributor_dict['cntrb_login']}") @@ -65,12 +65,12 @@ def process_contributors(): -def retrieve_dict_data(url: str, session): +def retrieve_dict_data(url: str, key_auth, logger): num_attempts = 0 while num_attempts <= 10: - response = hit_api(session.oauths, url, session.logger) + response = hit_api(key_auth, url, logger) # increment attempts if response is None: @@ -83,14 +83,14 @@ def retrieve_dict_data(url: str, session): if "message" in page_data: if page_data['message'] == "Not Found": - session.logger.info( + logger.info( "Github repo was not found or does not exist for endpoint: " f"{response.url}\n" ) break elif "You have exceeded a secondary rate limit. Please wait a few minutes before you try again" in page_data['message']: - session.logger.info('\n\n\n\nSleeping for 100 seconds due to secondary rate limit issue.\n\n\n\n') + logger.info('\n\n\n\nSleeping for 100 seconds due to secondary rate limit issue.\n\n\n\n') time.sleep(100) continue diff --git a/augur/tasks/github/detect_move/core.py b/augur/tasks/github/detect_move/core.py index c4ee89dad1..97c456c8d2 100644 --- a/augur/tasks/github/detect_move/core.py +++ b/augur/tasks/github/detect_move/core.py @@ -15,10 +15,10 @@ class CollectionState(Enum): COLLECTING = "Collecting" -def extract_owner_and_repo_from_endpoint(session,url): - response_from_gh = hit_api(session.oauths, url, session.logger) +def extract_owner_and_repo_from_endpoint(key_auth, url, logger): + response_from_gh = hit_api(key_auth, url, logger) - page_data = parse_json_response(session.logger, response_from_gh) + page_data = parse_json_response(logger, response_from_gh) full_repo_name = page_data['full_name'] @@ -26,7 +26,7 @@ def extract_owner_and_repo_from_endpoint(session,url): return splits[0], splits[-1] -def ping_github_for_repo_move(session,repo): +def ping_github_for_repo_move(session,repo, logger): owner, name = get_owner_repo(repo.repo_git) url = f"https://api.github.com/repos/{owner}/{name}" @@ -50,7 +50,7 @@ def ping_github_for_repo_move(session,repo): session.logger.info(f"Repo found at url: {url}") return - owner, name = extract_owner_and_repo_from_endpoint(session, response_from_gh.headers['location']) + owner, name = extract_owner_and_repo_from_endpoint(session, response_from_gh.headers['location'], logger) current_repo_dict = repo.__dict__ del current_repo_dict['_sa_instance_state'] diff --git a/augur/tasks/github/detect_move/tasks.py b/augur/tasks/github/detect_move/tasks.py index 2edb141dd6..251880b791 100644 --- a/augur/tasks/github/detect_move/tasks.py +++ b/augur/tasks/github/detect_move/tasks.py @@ -21,6 +21,6 @@ def detect_github_repo_move(repo_git : str) -> None: query = session.query(Repo).filter(Repo.repo_git == repo_git) repo = execute_session_query(query, 'one') logger.info(f"Pinging repo: {repo_git}") - ping_github_for_repo_move(session, repo) + ping_github_for_repo_move(session, repo, logger) except Exception as e: logger.error(f"Could not check repo source for {repo_git}\n Reason: {e} \n Traceback: {''.join(traceback.format_exception(None, e, e.__traceback__))}") \ No newline at end of file diff --git a/augur/tasks/github/events/tasks.py b/augur/tasks/github/events/tasks.py index 6966642ee1..f909934b46 100644 --- a/augur/tasks/github/events/tasks.py +++ b/augur/tasks/github/events/tasks.py @@ -19,11 +19,9 @@ def collect_events(repo_git: str): from augur.tasks.init.celery_app import engine - from augur.tasks.init.celery_app import engine - logger = logging.getLogger(collect_events.__name__) - with DatabaseSession(logger, engine) as session: + with GithubTaskSession(logger, engine) as session: try: @@ -37,11 +35,11 @@ def collect_events(repo_git: str): url = f"https://api.github.com/repos/{owner}/{repo}/issues/events" - event_data = retrieve_all_event_data(repo_git, logger) + event_data = retrieve_all_event_data(repo_git, logger, session.oauths) if event_data: - process_events(event_data, f"{owner}/{repo}: Event task", repo_id, logger) + process_events(event_data, f"{owner}/{repo}: Event task", repo_id, logger, session) else: logger.info(f"{owner}/{repo} has no events") @@ -49,7 +47,7 @@ def collect_events(repo_git: str): logger.error(f"Could not collect events for {repo_git}\n Reason: {e} \n Traceback: {''.join(traceback.format_exception(None, e, e.__traceback__))}") -def retrieve_all_event_data(repo_git: str, logger): +def retrieve_all_event_data(repo_git: str, logger, key_auth): from augur.tasks.init.celery_app import engine @@ -58,11 +56,9 @@ def retrieve_all_event_data(repo_git: str, logger): logger.info(f"Collecting Github events for {owner}/{repo}") url = f"https://api.github.com/repos/{owner}/{repo}/issues/events" - - with GithubTaskSession(logger, engine) as session: - - # returns an iterable of all issues at this url (this essentially means you can treat the issues variable as a list of the issues) - events = GithubPaginator(url, session.oauths, logger) + + # returns an iterable of all issues at this url (this essentially means you can treat the issues variable as a list of the issues) + events = GithubPaginator(url, key_auth, logger) num_pages = events.get_num_pages() @@ -83,7 +79,7 @@ def retrieve_all_event_data(repo_git: str, logger): return all_data -def process_events(events, task_name, repo_id, logger): +def process_events(events, task_name, repo_id, logger, session): from augur.tasks.init.celery_app import engine @@ -95,85 +91,83 @@ def process_events(events, task_name, repo_id, logger): issue_event_dicts = [] contributors = [] - with DatabaseSession(logger, engine) as session: - - not_mapable_event_count = 0 - event_len = len(events) - for event in events: - - event, contributor = process_github_event_contributors(logger, event, tool_source, tool_version, data_source) - - # event_mapping_data is the pr or issue data needed to relate the event to an issue or pr - event_mapping_data = event["issue"] + not_mapable_event_count = 0 + event_len = len(events) + for event in events: + + event, contributor = process_github_event_contributors(logger, event, tool_source, tool_version, data_source) + + # event_mapping_data is the pr or issue data needed to relate the event to an issue or pr + event_mapping_data = event["issue"] + + if event_mapping_data is None: + not_mapable_event_count += 1 + continue + + if 'pull_request' in list(event_mapping_data.keys()): + pr_url = event_mapping_data["pull_request"]["url"] + + try: + query = session.query(PullRequest).filter(PullRequest.pr_url == pr_url) + related_pr = execute_session_query(query, 'one') + except s.orm.exc.NoResultFound: + logger.info(f"{task_name}: Could not find related pr") + logger.info(f"{task_name}: We were searching for: {pr_url}") + # TODO: Add table to log all errors + logger.info(f"{task_name}: Skipping") + continue - if event_mapping_data is None: - not_mapable_event_count += 1 + pr_event_dicts.append( + extract_pr_event_data(event, related_pr.pull_request_id, platform_id, repo_id, + tool_source, tool_version, data_source) + ) + + else: + issue_url = event_mapping_data["url"] + + try: + query = session.query(Issue).filter(Issue.issue_url == issue_url) + related_issue = execute_session_query(query, 'one') + except s.orm.exc.NoResultFound: + logger.info(f"{task_name}: Could not find related pr") + logger.info( + f"{task_name}: We were searching for: {issue_url}") + # TODO: Add table to log all errors + logger.info(f"{task_name}: Skipping") continue - - if 'pull_request' in list(event_mapping_data.keys()): - pr_url = event_mapping_data["pull_request"]["url"] - - try: - query = session.query(PullRequest).filter(PullRequest.pr_url == pr_url) - related_pr = execute_session_query(query, 'one') - except s.orm.exc.NoResultFound: - logger.info(f"{task_name}: Could not find related pr") - logger.info(f"{task_name}: We were searching for: {pr_url}") - # TODO: Add table to log all errors - logger.info(f"{task_name}: Skipping") - continue - - pr_event_dicts.append( - extract_pr_event_data(event, related_pr.pull_request_id, platform_id, repo_id, - tool_source, tool_version, data_source) - ) - else: - issue_url = event_mapping_data["url"] - - try: - query = session.query(Issue).filter(Issue.issue_url == issue_url) - related_issue = execute_session_query(query, 'one') - except s.orm.exc.NoResultFound: - logger.info(f"{task_name}: Could not find related pr") - logger.info( - f"{task_name}: We were searching for: {issue_url}") - # TODO: Add table to log all errors - logger.info(f"{task_name}: Skipping") - continue - - issue_event_dicts.append( - extract_issue_event_data(event, related_issue.issue_id, platform_id, repo_id, - tool_source, tool_version, data_source) - ) - - # add contributor to list after porcessing the event, - # so if it fails processing for some reason the contributor is not inserted - # NOTE: contributor is none when there is no contributor data on the event - if contributor: - contributors.append(contributor) + issue_event_dicts.append( + extract_issue_event_data(event, related_issue.issue_id, platform_id, repo_id, + tool_source, tool_version, data_source) + ) + + # add contributor to list after porcessing the event, + # so if it fails processing for some reason the contributor is not inserted + # NOTE: contributor is none when there is no contributor data on the event + if contributor: + contributors.append(contributor) - # remove contributors that were found in the data more than once - contributors = remove_duplicate_dicts(contributors) + # remove contributors that were found in the data more than once + contributors = remove_duplicate_dicts(contributors) - session.insert_data(contributors, Contributor, ["cntrb_id"]) + session.insert_data(contributors, Contributor, ["cntrb_id"]) - issue_events_len = len(issue_event_dicts) - pr_events_len = len(pr_event_dicts) - if event_len != (issue_events_len + pr_events_len): + issue_events_len = len(issue_event_dicts) + pr_events_len = len(pr_event_dicts) + if event_len != (issue_events_len + pr_events_len): - unassigned_events = event_len - issue_events_len - pr_events_len + unassigned_events = event_len - issue_events_len - pr_events_len - logger.error(f"{task_name}: {event_len} events were processed, but {pr_events_len} pr events were found and related to a pr, and {issue_events_len} issue events were found and related to an issue. {not_mapable_event_count} events were not related to a pr or issue due to the api returning insufficient data. For some reason {unassigned_events} events were not able to be processed even when the api returned sufficient data. This is usually because pull requests or issues have not been collected, and the events are skipped because they cannot be related to a pr or issue") + logger.error(f"{task_name}: {event_len} events were processed, but {pr_events_len} pr events were found and related to a pr, and {issue_events_len} issue events were found and related to an issue. {not_mapable_event_count} events were not related to a pr or issue due to the api returning insufficient data. For some reason {unassigned_events} events were not able to be processed even when the api returned sufficient data. This is usually because pull requests or issues have not been collected, and the events are skipped because they cannot be related to a pr or issue") - logger.info(f"{task_name}: Inserting {len(pr_event_dicts)} pr events and {len(issue_event_dicts)} issue events") + logger.info(f"{task_name}: Inserting {len(pr_event_dicts)} pr events and {len(issue_event_dicts)} issue events") - # TODO: Could replace this with "id" but it isn't stored on the table for some reason - pr_event_natural_keys = ["node_id"] - session.insert_data(pr_event_dicts, PullRequestEvent, pr_event_natural_keys) + # TODO: Could replace this with "id" but it isn't stored on the table for some reason + pr_event_natural_keys = ["node_id"] + session.insert_data(pr_event_dicts, PullRequestEvent, pr_event_natural_keys) - issue_event_natural_keys = ["issue_id", "issue_event_src_id"] - session.insert_data(issue_event_dicts, IssueEvent, issue_event_natural_keys) + issue_event_natural_keys = ["issue_id", "issue_event_src_id"] + session.insert_data(issue_event_dicts, IssueEvent, issue_event_natural_keys) # TODO: Should we skip an event if there is no contributor to resolve it o diff --git a/augur/tasks/github/issues/tasks.py b/augur/tasks/github/issues/tasks.py index 5413aabc9d..093cb924e0 100644 --- a/augur/tasks/github/issues/tasks.py +++ b/augur/tasks/github/issues/tasks.py @@ -41,11 +41,11 @@ def collect_issues(repo_git : str) -> None: owner, repo = get_owner_repo(repo_git) - issue_data = retrieve_all_issue_data(repo_git, logger) + issue_data = retrieve_all_issue_data(repo_git, logger, session.oauths) if issue_data: - process_issues(issue_data, f"{owner}/{repo}: Issue task", repo_id, logger) + process_issues(issue_data, f"{owner}/{repo}: Issue task", repo_id, logger, session) else: logger.info(f"{owner}/{repo} has no issues") @@ -54,11 +54,7 @@ def collect_issues(repo_git : str) -> None: -def retrieve_all_issue_data(repo_git, logger) -> None: - - from augur.tasks.init.celery_app import engine - - print(f"Eventlet engine id: {id(engine)}") +def retrieve_all_issue_data(repo_git, logger, key_auth) -> None: owner, repo = get_owner_repo(repo_git) @@ -66,12 +62,9 @@ def retrieve_all_issue_data(repo_git, logger) -> None: url = f"https://api.github.com/repos/{owner}/{repo}/issues?state=all" - - with GithubTaskSession(logger, engine) as session: - - # returns an iterable of all issues at this url (this essentially means you can treat the issues variable as a list of the issues) - # Reference the code documenation for GithubPaginator for more details - issues = GithubPaginator(url, session.oauths, logger) + # returns an iterable of all issues at this url (this essentially means you can treat the issues variable as a list of the issues) + # Reference the code documenation for GithubPaginator for more details + issues = GithubPaginator(url, key_auth, logger) # this is defined so we can decrement it each time # we come across a pr, so at the end we can log how @@ -96,9 +89,7 @@ def retrieve_all_issue_data(repo_git, logger) -> None: return all_data -def process_issues(issues, task_name, repo_id, logger) -> None: - - from augur.tasks.init.celery_app import engine +def process_issues(issues, task_name, repo_id, logger, session) -> None: # get repo_id or have it passed tool_source = "Issue Task" @@ -147,64 +138,62 @@ def process_issues(issues, task_name, repo_id, logger) -> None: print("No issues found while processing") return - with DatabaseSession(logger, engine) as session: - - # remove duplicate contributors before inserting - contributors = remove_duplicate_dicts(contributors) - - # insert contributors from these issues - logger.info(f"{task_name}: Inserting {len(contributors)} contributors") - session.insert_data(contributors, Contributor, ["cntrb_id"]) - + # remove duplicate contributors before inserting + contributors = remove_duplicate_dicts(contributors) + + # insert contributors from these issues + logger.info(f"{task_name}: Inserting {len(contributors)} contributors") + session.insert_data(contributors, Contributor, ["cntrb_id"]) + + + # insert the issues into the issues table. + # issue_urls are gloablly unique across github so we are using it to determine whether an issue we collected is already in the table + # specified in issue_return_columns is the columns of data we want returned. This data will return in this form; {"issue_url": url, "issue_id": id} + logger.info(f"{task_name}: Inserting {len(issue_dicts)} issues") + issue_natural_keys = ["repo_id", "gh_issue_id"] + issue_return_columns = ["issue_url", "issue_id"] + issue_string_columns = ["issue_title", "issue_body"] + try: + issue_return_data = session.insert_data(issue_dicts, Issue, issue_natural_keys, return_columns=issue_return_columns, string_fields=issue_string_columns) + except IntegrityError as e: + logger.error(f"Ran into integrity error:{e} \n Offending data: \n{issue_dicts}") + + if development: + raise e + # loop through the issue_return_data so it can find the labels and + # assignees that corelate to the issue that was inserted labels + issue_label_dicts = [] + issue_assignee_dicts = [] + for data in issue_return_data: + + issue_url = data["issue_url"] + issue_id = data["issue_id"] - # insert the issues into the issues table. - # issue_urls are gloablly unique across github so we are using it to determine whether an issue we collected is already in the table - # specified in issue_return_columns is the columns of data we want returned. This data will return in this form; {"issue_url": url, "issue_id": id} - logger.info(f"{task_name}: Inserting {len(issue_dicts)} issues") - issue_natural_keys = ["repo_id", "gh_issue_id"] - issue_return_columns = ["issue_url", "issue_id"] - issue_string_columns = ["issue_title", "issue_body"] try: - issue_return_data = session.insert_data(issue_dicts, Issue, issue_natural_keys, return_columns=issue_return_columns, string_fields=issue_string_columns) - except IntegrityError as e: - logger.error(f"Ran into integrity error:{e} \n Offending data: \n{issue_dicts}") + other_issue_data = issue_mapping_data[issue_url] + except KeyError as e: + logger.info(f"{task_name}: Cold not find other issue data. This should never happen. Error: {e}") - if development: - raise e - # loop through the issue_return_data so it can find the labels and - # assignees that corelate to the issue that was inserted labels - issue_label_dicts = [] - issue_assignee_dicts = [] - for data in issue_return_data: - issue_url = data["issue_url"] - issue_id = data["issue_id"] + # add the issue id to the lables and assignees, then add them to a list of dicts that will be inserted soon + dict_key = "issue_id" + issue_label_dicts += add_key_value_pair_to_dicts(other_issue_data["labels"], "issue_id", issue_id) + issue_assignee_dicts += add_key_value_pair_to_dicts(other_issue_data["assignees"], "issue_id", issue_id) - try: - other_issue_data = issue_mapping_data[issue_url] - except KeyError as e: - logger.info(f"{task_name}: Cold not find other issue data. This should never happen. Error: {e}") + logger.info(f"{task_name}: Inserting other issue data of lengths: Labels: {len(issue_label_dicts)} - Assignees: {len(issue_assignee_dicts)}") - # add the issue id to the lables and assignees, then add them to a list of dicts that will be inserted soon - dict_key = "issue_id" - issue_label_dicts += add_key_value_pair_to_dicts(other_issue_data["labels"], "issue_id", issue_id) - issue_assignee_dicts += add_key_value_pair_to_dicts(other_issue_data["assignees"], "issue_id", issue_id) + # inserting issue labels + # we are using label_src_id and issue_id to determine if the label is already in the database. + issue_label_natural_keys = ['label_src_id', 'issue_id'] + issue_label_string_fields = ["label_text", "label_description"] + session.insert_data(issue_label_dicts, IssueLabel, + issue_label_natural_keys, string_fields=issue_label_string_fields) - - logger.info(f"{task_name}: Inserting other issue data of lengths: Labels: {len(issue_label_dicts)} - Assignees: {len(issue_assignee_dicts)}") - - # inserting issue labels - # we are using label_src_id and issue_id to determine if the label is already in the database. - issue_label_natural_keys = ['label_src_id', 'issue_id'] - issue_label_string_fields = ["label_text", "label_description"] - session.insert_data(issue_label_dicts, IssueLabel, - issue_label_natural_keys, string_fields=issue_label_string_fields) - - # inserting issue assignees - # we are using issue_assignee_src_id and issue_id to determine if the label is already in the database. - issue_assignee_natural_keys = ['issue_assignee_src_id', 'issue_id'] - session.insert_data(issue_assignee_dicts, IssueAssignee, issue_assignee_natural_keys) + # inserting issue assignees + # we are using issue_assignee_src_id and issue_id to determine if the label is already in the database. + issue_assignee_natural_keys = ['issue_assignee_src_id', 'issue_id'] + session.insert_data(issue_assignee_dicts, IssueAssignee, issue_assignee_natural_keys) diff --git a/augur/tasks/github/messages/tasks.py b/augur/tasks/github/messages/tasks.py index f60859acd0..d77367f5c9 100644 --- a/augur/tasks/github/messages/tasks.py +++ b/augur/tasks/github/messages/tasks.py @@ -23,11 +23,9 @@ def collect_github_messages(repo_git: str) -> None: from augur.tasks.init.celery_app import engine - from augur.tasks.init.celery_app import engine - logger = logging.getLogger(collect_github_messages.__name__) - with DatabaseSession(logger, engine) as session: + with GithubTaskSession(logger, engine) as session: try: @@ -35,11 +33,11 @@ def collect_github_messages(repo_git: str) -> None: Repo.repo_git == repo_git).one().repo_id owner, repo = get_owner_repo(repo_git) - message_data = retrieve_all_pr_and_issue_messages(repo_git, logger) + message_data = retrieve_all_pr_and_issue_messages(repo_git, logger, session.oauths) if message_data: - process_messages(message_data, f"{owner}/{repo}: Message task", repo_id, logger) + process_messages(message_data, f"{owner}/{repo}: Message task", repo_id, logger, session) else: logger.info(f"{owner}/{repo} has no messages") @@ -48,9 +46,7 @@ def collect_github_messages(repo_git: str) -> None: -def retrieve_all_pr_and_issue_messages(repo_git: str, logger) -> None: - - from augur.tasks.init.celery_app import engine +def retrieve_all_pr_and_issue_messages(repo_git: str, logger, key_auth) -> None: owner, repo = get_owner_repo(repo_git) @@ -64,10 +60,9 @@ def retrieve_all_pr_and_issue_messages(repo_git: str, logger) -> None: url = f"https://api.github.com/repos/{owner}/{repo}/issues/comments" # define database task session, that also holds authentication keys the GithubPaginator needs - with GithubTaskSession(logger, engine) as session: - # returns an iterable of all issues at this url (this essentially means you can treat the issues variable as a list of the issues) - messages = GithubPaginator(url, session.oauths, logger) + # returns an iterable of all issues at this url (this essentially means you can treat the issues variable as a list of the issues) + messages = GithubPaginator(url, key_auth, logger) num_pages = messages.get_num_pages() all_data = [] @@ -90,9 +85,7 @@ def retrieve_all_pr_and_issue_messages(repo_git: str, logger) -> None: return all_data -def process_messages(messages, task_name, repo_id, logger): - - from augur.tasks.init.celery_app import engine +def process_messages(messages, task_name, repo_id, logger, session): tool_source = "Pr comment task" tool_version = "2.0" @@ -109,120 +102,118 @@ def process_messages(messages, task_name, repo_id, logger): if len(messages) == 0: logger.info(f"{task_name}: No messages to process") - with DatabaseSession(logger, engine) as session: - - for message in messages: + for message in messages: - related_pr_of_issue_found = False + related_pr_of_issue_found = False - # this adds the cntrb_id to the message data - # the returned contributor will be added to the contributors list later, if the related issue or pr are found - # this logic is used so we don't insert a contributor when the related message isn't inserted - message, contributor = process_github_comment_contributors(message, tool_source, tool_version, data_source) + # this adds the cntrb_id to the message data + # the returned contributor will be added to the contributors list later, if the related issue or pr are found + # this logic is used so we don't insert a contributor when the related message isn't inserted + message, contributor = process_github_comment_contributors(message, tool_source, tool_version, data_source) - if is_issue_message(message["html_url"]): + if is_issue_message(message["html_url"]): - try: - query = session.query(Issue).filter(Issue.issue_url == message["issue_url"]) - related_issue = execute_session_query(query, 'one') - related_pr_of_issue_found = True + try: + query = session.query(Issue).filter(Issue.issue_url == message["issue_url"]) + related_issue = execute_session_query(query, 'one') + related_pr_of_issue_found = True - except s.orm.exc.NoResultFound: - logger.info(f"{task_name}: Could not find related pr") - logger.info( - f"{task_name}: We were searching for: {message['id']}") - logger.info(f"{task_name}: Skipping") - continue + except s.orm.exc.NoResultFound: + logger.info(f"{task_name}: Could not find related pr") + logger.info( + f"{task_name}: We were searching for: {message['id']}") + logger.info(f"{task_name}: Skipping") + continue - issue_id = related_issue.issue_id + issue_id = related_issue.issue_id - issue_message_ref_data = extract_needed_issue_message_ref_data(message, issue_id, repo_id, tool_source, tool_version, data_source) + issue_message_ref_data = extract_needed_issue_message_ref_data(message, issue_id, repo_id, tool_source, tool_version, data_source) - message_ref_mapping_data.append( - { - "platform_msg_id": message["id"], - "msg_ref_data": issue_message_ref_data, - "is_issue": True - } - ) + message_ref_mapping_data.append( + { + "platform_msg_id": message["id"], + "msg_ref_data": issue_message_ref_data, + "is_issue": True + } + ) - else: + else: - try: - query = session.query(PullRequest).filter(PullRequest.pr_issue_url == message["issue_url"]) - related_pr = execute_session_query(query, 'one') - related_pr_of_issue_found = True + try: + query = session.query(PullRequest).filter(PullRequest.pr_issue_url == message["issue_url"]) + related_pr = execute_session_query(query, 'one') + related_pr_of_issue_found = True - except s.orm.exc.NoResultFound: - logger.info(f"{task_name}: Could not find related pr") - logger.info(f"We were searching for: {message['issue_url']}") - logger.info(f"{task_name}: Skipping") - continue + except s.orm.exc.NoResultFound: + logger.info(f"{task_name}: Could not find related pr") + logger.info(f"We were searching for: {message['issue_url']}") + logger.info(f"{task_name}: Skipping") + continue - pull_request_id = related_pr.pull_request_id + pull_request_id = related_pr.pull_request_id - pr_message_ref_data = extract_needed_pr_message_ref_data(message, pull_request_id, repo_id, tool_source, tool_version, data_source) + pr_message_ref_data = extract_needed_pr_message_ref_data(message, pull_request_id, repo_id, tool_source, tool_version, data_source) - message_ref_mapping_data.append( - { - "platform_msg_id": message["id"], - "msg_ref_data": pr_message_ref_data, - "is_issue": False - } - ) - - if related_pr_of_issue_found: + message_ref_mapping_data.append( + { + "platform_msg_id": message["id"], + "msg_ref_data": pr_message_ref_data, + "is_issue": False + } + ) + + if related_pr_of_issue_found: - message_dicts.append( - extract_needed_message_data(message, platform_id, repo_id, tool_source, tool_version, data_source) - ) + message_dicts.append( + extract_needed_message_data(message, platform_id, repo_id, tool_source, tool_version, data_source) + ) - contributors.append(contributor) + contributors.append(contributor) - contributors = remove_duplicate_dicts(contributors) + contributors = remove_duplicate_dicts(contributors) - logger.info(f"{task_name}: Inserting {len(contributors)} contributors") + logger.info(f"{task_name}: Inserting {len(contributors)} contributors") - session.insert_data(contributors, Contributor, ["cntrb_id"]) + session.insert_data(contributors, Contributor, ["cntrb_id"]) - logger.info(f"{task_name}: Inserting {len(message_dicts)} messages") - message_natural_keys = ["platform_msg_id"] - message_return_columns = ["msg_id", "platform_msg_id"] - message_string_fields = ["msg_text"] - message_return_data = session.insert_data(message_dicts, Message, message_natural_keys, - return_columns=message_return_columns, string_fields=message_string_fields) + logger.info(f"{task_name}: Inserting {len(message_dicts)} messages") + message_natural_keys = ["platform_msg_id"] + message_return_columns = ["msg_id", "platform_msg_id"] + message_string_fields = ["msg_text"] + message_return_data = session.insert_data(message_dicts, Message, message_natural_keys, + return_columns=message_return_columns, string_fields=message_string_fields) - pr_message_ref_dicts = [] - issue_message_ref_dicts = [] - for mapping_data in message_ref_mapping_data: + pr_message_ref_dicts = [] + issue_message_ref_dicts = [] + for mapping_data in message_ref_mapping_data: - value = mapping_data["platform_msg_id"] - key = "platform_msg_id" + value = mapping_data["platform_msg_id"] + key = "platform_msg_id" - issue_or_pr_message = find_dict_in_list_of_dicts(message_return_data, key, value) + issue_or_pr_message = find_dict_in_list_of_dicts(message_return_data, key, value) - if issue_or_pr_message: + if issue_or_pr_message: - msg_id = issue_or_pr_message["msg_id"] - else: - print("Count not find issue or pull request message to map to") - continue + msg_id = issue_or_pr_message["msg_id"] + else: + print("Count not find issue or pull request message to map to") + continue - message_ref_data = mapping_data["msg_ref_data"] - message_ref_data["msg_id"] = msg_id + message_ref_data = mapping_data["msg_ref_data"] + message_ref_data["msg_id"] = msg_id - if mapping_data["is_issue"] is True: - issue_message_ref_dicts.append(message_ref_data) - else: - pr_message_ref_dicts.append(message_ref_data) + if mapping_data["is_issue"] is True: + issue_message_ref_dicts.append(message_ref_data) + else: + pr_message_ref_dicts.append(message_ref_data) - pr_message_ref_natural_keys = ["pull_request_id", "pr_message_ref_src_comment_id"] - session.insert_data(pr_message_ref_dicts, PullRequestMessageRef, pr_message_ref_natural_keys) + pr_message_ref_natural_keys = ["pull_request_id", "pr_message_ref_src_comment_id"] + session.insert_data(pr_message_ref_dicts, PullRequestMessageRef, pr_message_ref_natural_keys) - issue_message_ref_natural_keys = ["issue_id", "issue_msg_ref_src_comment_id"] - session.insert_data(issue_message_ref_dicts, IssueMessageRef, issue_message_ref_natural_keys) + issue_message_ref_natural_keys = ["issue_id", "issue_msg_ref_src_comment_id"] + session.insert_data(issue_message_ref_dicts, IssueMessageRef, issue_message_ref_natural_keys) - logger.info(f"{task_name}: Inserted {len(message_dicts)} messages. {len(issue_message_ref_dicts)} from issues and {len(pr_message_ref_dicts)} from prs") + logger.info(f"{task_name}: Inserted {len(message_dicts)} messages. {len(issue_message_ref_dicts)} from issues and {len(pr_message_ref_dicts)} from prs") def is_issue_message(html_url): diff --git a/augur/tasks/github/pull_requests/commits_model/core.py b/augur/tasks/github/pull_requests/commits_model/core.py index ffa152f6cd..1a6a98b48e 100644 --- a/augur/tasks/github/pull_requests/commits_model/core.py +++ b/augur/tasks/github/pull_requests/commits_model/core.py @@ -11,9 +11,7 @@ from augur.application.db.util import execute_session_query -def pull_request_commits_model(repo_id,logger): - - from augur.tasks.init.celery_app import engine +def pull_request_commits_model(repo_id,logger, session): # query existing PRs and the respective url we will append the commits url to pr_url_sql = s.sql.text(""" @@ -24,48 +22,45 @@ def pull_request_commits_model(repo_id,logger): pr_urls = [] #pd.read_sql(pr_number_sql, self.db, params={}) - with DatabaseSession(logger, engine) as session: - pr_urls = session.fetchall_data_from_sql_text(pr_url_sql)#session.execute_sql(pr_number_sql).fetchall() - - query = session.query(Repo).filter(Repo.repo_id == repo_id) - repo = execute_session_query(query, 'one') + pr_urls = session.fetchall_data_from_sql_text(pr_url_sql)#session.execute_sql(pr_number_sql).fetchall() + + query = session.query(Repo).filter(Repo.repo_id == repo_id) + repo = execute_session_query(query, 'one') owner, name = get_owner_repo(repo.repo_git) logger.info(f"Getting pull request commits for repo: {repo.repo_git}") - - with GithubTaskSession(logger, engine) as session: - - for index,pr_info in enumerate(pr_urls): - logger.info(f'Querying commits for pull request #{index + 1} of {len(pr_urls)}') - - commits_url = pr_info['pr_url'] + '/commits?state=all' - - #Paginate through the pr commits - pr_commits = GithubPaginator(commits_url, session.oauths, logger) - - all_data = [] - for page_data in pr_commits: - logger.info(f"Processing pr commit with hash {page_data['sha']}") - pr_commit_row = { - 'pull_request_id': pr_info['pull_request_id'], - 'pr_cmt_sha': page_data['sha'], - 'pr_cmt_node_id': page_data['node_id'], - 'pr_cmt_message': page_data['commit']['message'], - # 'pr_cmt_comments_url': pr_commit['comments_url'], - 'tool_source': 'pull_request_commits_model', - 'tool_version': '0.41', - 'data_source': 'GitHub API', - 'repo_id': repo_id, - } - - all_data.append(pr_commit_row) - if len(all_data) > 0: - #Execute bulk upsert - pr_commits_natural_keys = [ "pull_request_id", "repo_id", "pr_cmt_sha"] - session.insert_data(all_data,PullRequestCommit,pr_commits_natural_keys) - + for index,pr_info in enumerate(pr_urls): + logger.info(f'Querying commits for pull request #{index + 1} of {len(pr_urls)}') + + commits_url = pr_info['pr_url'] + '/commits?state=all' + + #Paginate through the pr commits + pr_commits = GithubPaginator(commits_url, session.oauths, logger) + + all_data = [] + for page_data in pr_commits: + logger.info(f"Processing pr commit with hash {page_data['sha']}") + pr_commit_row = { + 'pull_request_id': pr_info['pull_request_id'], + 'pr_cmt_sha': page_data['sha'], + 'pr_cmt_node_id': page_data['node_id'], + 'pr_cmt_message': page_data['commit']['message'], + # 'pr_cmt_comments_url': pr_commit['comments_url'], + 'tool_source': 'pull_request_commits_model', + 'tool_version': '0.41', + 'data_source': 'GitHub API', + 'repo_id': repo_id, + } + + all_data.append(pr_commit_row) + + if len(all_data) > 0: + #Execute bulk upsert + pr_commits_natural_keys = [ "pull_request_id", "repo_id", "pr_cmt_sha"] + session.insert_data(all_data,PullRequestCommit,pr_commits_natural_keys) + diff --git a/augur/tasks/github/pull_requests/commits_model/tasks.py b/augur/tasks/github/pull_requests/commits_model/tasks.py index cd969ae236..d2dfbf3ae6 100644 --- a/augur/tasks/github/pull_requests/commits_model/tasks.py +++ b/augur/tasks/github/pull_requests/commits_model/tasks.py @@ -13,12 +13,12 @@ def process_pull_request_commits(repo_git: str) -> None: logger = logging.getLogger(process_pull_request_commits.__name__) - with DatabaseSession(logger, engine) as session: + with GithubTaskSession(logger, engine) as session: query = session.query(Repo).filter(Repo.repo_git == repo_git) repo = execute_session_query(query, 'one') try: - pull_request_commits_model(repo.repo_id, logger) + pull_request_commits_model(repo.repo_id, logger, session) except Exception as e: logger.error(f"Could not complete pull_request_commits_model!\n Reason: {e} \n Traceback: {''.join(traceback.format_exception(None, e, e.__traceback__))}") raise e diff --git a/augur/tasks/github/pull_requests/core.py b/augur/tasks/github/pull_requests/core.py index 03a6b80438..831b4fd9aa 100644 --- a/augur/tasks/github/pull_requests/core.py +++ b/augur/tasks/github/pull_requests/core.py @@ -131,7 +131,7 @@ def extract_data_from_pr_list(pull_requests: List[dict], return pr_dicts, pr_mapping_data, pr_numbers, contributors -def insert_pr_contributors(contributors: List[dict], session: GithubTaskSession, task_name: str) -> None: +def insert_pr_contributors(contributors: List[dict], session: DatabaseSession, task_name: str) -> None: """Insert pr contributors Args: @@ -148,7 +148,7 @@ def insert_pr_contributors(contributors: List[dict], session: GithubTaskSession, session.insert_data(contributors, Contributor, ["cntrb_id"]) -def insert_prs(pr_dicts: List[dict], session: GithubTaskSession, task_name: str) -> Optional[List[dict]]: +def insert_prs(pr_dicts: List[dict], session: DatabaseSession, task_name: str) -> Optional[List[dict]]: """Insert pull requests Args: @@ -213,7 +213,7 @@ def map_other_pr_data_to_pr( return pr_label_dicts, pr_assignee_dicts, pr_reviewer_dicts, pr_metadata_dicts -def insert_pr_labels(labels: List[dict], logger: logging.Logger) -> None: +def insert_pr_labels(labels: List[dict], logger: logging.Logger, session) -> None: """Insert pull request labels Note: @@ -223,16 +223,12 @@ def insert_pr_labels(labels: List[dict], logger: logging.Logger) -> None: labels: list of labels to insert logger: handles logging """ - from augur.tasks.init.celery_app import engine + # we are using pr_src_id and pull_request_id to determine if the label is already in the database. + pr_label_natural_keys = ['pr_src_id', 'pull_request_id'] + session.insert_data(labels, PullRequestLabel, pr_label_natural_keys) - with DatabaseSession(logger, engine) as session: - # we are using pr_src_id and pull_request_id to determine if the label is already in the database. - pr_label_natural_keys = ['pr_src_id', 'pull_request_id'] - session.insert_data(labels, PullRequestLabel, pr_label_natural_keys) - - -def insert_pr_assignees(assignees: List[dict], logger: logging.Logger) -> None: +def insert_pr_assignees(assignees: List[dict], logger: logging.Logger, session) -> None: """Insert pull request assignees Note: @@ -242,16 +238,12 @@ def insert_pr_assignees(assignees: List[dict], logger: logging.Logger) -> None: assignees: list of assignees to insert logger: handles logging """ - from augur.tasks.init.celery_app import engine - - with DatabaseSession(logger, engine) as session: - - # we are using pr_assignee_src_id and pull_request_id to determine if the label is already in the database. - pr_assignee_natural_keys = ['pr_assignee_src_id', 'pull_request_id'] - session.insert_data(assignees, PullRequestAssignee, pr_assignee_natural_keys) + # we are using pr_assignee_src_id and pull_request_id to determine if the label is already in the database. + pr_assignee_natural_keys = ['pr_assignee_src_id', 'pull_request_id'] + session.insert_data(assignees, PullRequestAssignee, pr_assignee_natural_keys) -def insert_pr_reviewers(reviewers: List[dict], logger: logging.Logger) -> None: +def insert_pr_reviewers(reviewers: List[dict], logger: logging.Logger, session) -> None: """Insert pull request reviewers Note: @@ -261,16 +253,12 @@ def insert_pr_reviewers(reviewers: List[dict], logger: logging.Logger) -> None: reviewers: list of reviewers to insert logger: handles logging """ - from augur.tasks.init.celery_app import engine + # we are using pr_src_id and pull_request_id to determine if the label is already in the database. + pr_reviewer_natural_keys = ["pull_request_id", "pr_reviewer_src_id"] + session.insert_data(reviewers, PullRequestReviewer, pr_reviewer_natural_keys) - with DatabaseSession(logger, engine) as session: - # we are using pr_src_id and pull_request_id to determine if the label is already in the database. - pr_reviewer_natural_keys = ["pull_request_id", "pr_reviewer_src_id"] - session.insert_data(reviewers, PullRequestReviewer, pr_reviewer_natural_keys) - - -def insert_pr_metadata(metadata: List[dict], logger: logging.Logger) -> None: +def insert_pr_metadata(metadata: List[dict], logger: logging.Logger, session) -> None: """Insert pull request metadata Note: @@ -280,14 +268,10 @@ def insert_pr_metadata(metadata: List[dict], logger: logging.Logger) -> None: metadata: list of metadata to insert logger: handles logging """ - from augur.tasks.init.celery_app import engine - - with DatabaseSession(logger, engine) as session: - - # inserting pr metadata - # we are using pull_request_id, pr_head_or_base, and pr_sha to determine if the label is already in the database. - pr_metadata_natural_keys = ['pull_request_id', 'pr_head_or_base', 'pr_sha'] - session.insert_data(metadata, PullRequestMeta, pr_metadata_natural_keys) + # inserting pr metadata + # we are using pull_request_id, pr_head_or_base, and pr_sha to determine if the label is already in the database. + pr_metadata_natural_keys = ['pull_request_id', 'pr_head_or_base', 'pr_sha'] + session.insert_data(metadata, PullRequestMeta, pr_metadata_natural_keys) diff --git a/augur/tasks/github/pull_requests/files_model/core.py b/augur/tasks/github/pull_requests/files_model/core.py index 91d3b1aded..70d52a2ec6 100644 --- a/augur/tasks/github/pull_requests/files_model/core.py +++ b/augur/tasks/github/pull_requests/files_model/core.py @@ -11,7 +11,7 @@ from augur.tasks.github.util.util import get_owner_repo from augur.application.db.util import execute_session_query -def pull_request_files_model(repo_id,logger): +def pull_request_files_model(repo_id,logger, session): from augur.tasks.init.celery_app import engine @@ -24,69 +24,68 @@ def pull_request_files_model(repo_id,logger): pr_numbers = [] #pd.read_sql(pr_number_sql, self.db, params={}) - with GithubTaskSession(logger, engine) as session: - result = session.execute_sql(pr_number_sql).fetchall() - pr_numbers = [dict(zip(row.keys(), row)) for row in result] + result = session.execute_sql(pr_number_sql).fetchall() + pr_numbers = [dict(zip(row.keys(), row)) for row in result] - query = session.query(Repo).filter(Repo.repo_id == repo_id) - repo = execute_session_query(query, 'one') + query = session.query(Repo).filter(Repo.repo_id == repo_id) + repo = execute_session_query(query, 'one') - owner, name = get_owner_repo(repo.repo_git) + owner, name = get_owner_repo(repo.repo_git) - pr_file_rows = [] - logger.info(f"Getting pull request files for repo: {repo.repo_git}") - for index,pr_info in enumerate(pr_numbers): + pr_file_rows = [] + logger.info(f"Getting pull request files for repo: {repo.repo_git}") + for index,pr_info in enumerate(pr_numbers): - logger.info(f'Querying files for pull request #{index + 1} of {len(pr_numbers)}') - - query = """ + logger.info(f'Querying files for pull request #{index + 1} of {len(pr_numbers)}') + + query = """ - query($repo: String!, $owner: String!,$pr_number: Int!, $numRecords: Int!, $cursor: String) { - repository(name: $repo, owner: $owner) { - pullRequest(number: $pr_number) { - files ( first: $numRecords, after: $cursor) - { - edges { - node { - additions - deletions - path - } - } - totalCount - pageInfo { - hasNextPage - endCursor + query($repo: String!, $owner: String!,$pr_number: Int!, $numRecords: Int!, $cursor: String) { + repository(name: $repo, owner: $owner) { + pullRequest(number: $pr_number) { + files ( first: $numRecords, after: $cursor) + { + edges { + node { + additions + deletions + path } } + totalCount + pageInfo { + hasNextPage + endCursor + } } } } - """ - - values = ("repository","pullRequest","files") - params = { - 'owner' : owner, - 'repo' : name, - 'pr_number' : pr_info['pr_src_number'], - 'values' : values } + """ + + values = ("repository","pullRequest","files") + params = { + 'owner' : owner, + 'repo' : name, + 'pr_number' : pr_info['pr_src_number'], + 'values' : values + } - try: - file_collection = GraphQlPageCollection(query, session.oauths, session.logger,bind=params) + try: + file_collection = GraphQlPageCollection(query, session.oauths, session.logger,bind=params) - pr_file_rows += [{ - 'pull_request_id': pr_info['pull_request_id'], - 'pr_file_additions': pr_file['additions'] if 'additions' in pr_file else None, - 'pr_file_deletions': pr_file['deletions'] if 'deletions' in pr_file else None, - 'pr_file_path': pr_file['path'], - 'data_source': 'GitHub API', - 'repo_id': repo_id, - } for pr_file in file_collection if pr_file and 'path' in pr_file] - except Exception as e: - logger.error(f"Ran into error with pull request #{index + 1} in repo {repo_id}") - logger.error( - ''.join(traceback.format_exception(None, e, e.__traceback__))) + pr_file_rows += [{ + 'pull_request_id': pr_info['pull_request_id'], + 'pr_file_additions': pr_file['additions'] if 'additions' in pr_file else None, + 'pr_file_deletions': pr_file['deletions'] if 'deletions' in pr_file else None, + 'pr_file_path': pr_file['path'], + 'data_source': 'GitHub API', + 'repo_id': repo_id, + } for pr_file in file_collection if pr_file and 'path' in pr_file] + except Exception as e: + logger.error(f"Ran into error with pull request #{index + 1} in repo {repo_id}") + logger.error( + ''.join(traceback.format_exception(None, e, e.__traceback__))) diff --git a/augur/tasks/github/pull_requests/files_model/tasks.py b/augur/tasks/github/pull_requests/files_model/tasks.py index f0b0a7f142..0618a1a7b8 100644 --- a/augur/tasks/github/pull_requests/files_model/tasks.py +++ b/augur/tasks/github/pull_requests/files_model/tasks.py @@ -12,11 +12,11 @@ def process_pull_request_files(repo_git: str) -> None: logger = logging.getLogger(process_pull_request_files.__name__) - with DatabaseSession(logger, engine) as session: + with GithubTaskSession(logger, engine) as session: query = session.query(Repo).filter(Repo.repo_git == repo_git) repo = execute_session_query(query, 'one') try: - pull_request_files_model(repo.repo_id, logger) + pull_request_files_model(repo.repo_id, logger, session) except Exception as e: logger.error(f"Could not complete pull_request_files_model!\n Reason: {e} \n Traceback: {''.join(traceback.format_exception(None, e, e.__traceback__))}") #raise e \ No newline at end of file diff --git a/augur/tasks/github/pull_requests/tasks.py b/augur/tasks/github/pull_requests/tasks.py index b8d0bd8573..02184a2d03 100644 --- a/augur/tasks/github/pull_requests/tasks.py +++ b/augur/tasks/github/pull_requests/tasks.py @@ -32,19 +32,18 @@ def collect_pull_requests(repo_git: str) -> None: logger.info(f"Celery engine: {engine}") - with DatabaseSession(logger, engine) as session: + with GithubTaskSession(logger, engine) as session: - try: repo_id = session.query(Repo).filter( Repo.repo_git == repo_git).one().repo_id owner, repo = get_owner_repo(repo_git) - pr_data = retrieve_all_pr_data(repo_git, logger) + pr_data = retrieve_all_pr_data(repo_git, logger, session.oauths) if pr_data: - process_pull_requests(pr_data, f"{owner}/{repo}: Pr task", repo_id, logger) + process_pull_requests(pr_data, f"{owner}/{repo}: Pr task", repo_id, logger, session) else: logger.info(f"{owner}/{repo} has no pull requests") except Exception as e: @@ -53,24 +52,18 @@ def collect_pull_requests(repo_git: str) -> None: # TODO: Rename pull_request_reviewers table to pull_request_requested_reviewers # TODO: Fix column names in pull request labels table -def retrieve_all_pr_data(repo_git: str, logger) -> None: - - from augur.tasks.init.celery_app import engine - - print(f"Eventlet engine id: {id(engine)}") +def retrieve_all_pr_data(repo_git: str, logger, key_auth) -> None: owner, repo = get_owner_repo(repo_git) # define GithubTaskSession to handle insertions, and store oauth keys - with GithubTaskSession(logger, engine) as session: - - owner, repo = get_owner_repo(repo_git) + owner, repo = get_owner_repo(repo_git) - logger.info(f"Collecting pull requests for {owner}/{repo}") + logger.info(f"Collecting pull requests for {owner}/{repo}") - url = f"https://api.github.com/repos/{owner}/{repo}/pulls?state=all&direction=desc" - # returns an iterable of all prs at this url (this essentially means you can treat the prs variable as a list of the prs) - prs = GithubPaginator(url, session.oauths, logger) + url = f"https://api.github.com/repos/{owner}/{repo}/pulls?state=all&direction=desc" + # returns an iterable of all prs at this url (this essentially means you can treat the prs variable as a list of the prs) + prs = GithubPaginator(url, key_auth, logger) all_data = [] num_pages = prs.get_num_pages() @@ -92,7 +85,7 @@ def retrieve_all_pr_data(repo_git: str, logger) -> None: return all_data -def process_pull_requests(pull_requests, task_name, repo_id, logger): +def process_pull_requests(pull_requests, task_name, repo_id, logger, session): from augur.tasks.init.celery_app import engine diff --git a/augur/tasks/github/releases/core.py b/augur/tasks/github/releases/core.py index fa22dde271..38d527a610 100644 --- a/augur/tasks/github/releases/core.py +++ b/augur/tasks/github/releases/core.py @@ -14,7 +14,7 @@ from augur.application.db.util import execute_session_query -def get_release_inf(session, repo_id, release, tag_only): +def get_release_inf(repo_id, release, tag_only): if not tag_only: if release['author'] is None: @@ -81,7 +81,7 @@ def insert_release(session, repo_id, owner, release, tag_only = False): # Put all data together in format of the table session.logger.info(f'Inserting release for repo with id:{repo_id}, owner:{owner}, release name:{release["name"]}\n') - release_inf = get_release_inf(session, repo_id, release, tag_only) + release_inf = get_release_inf(repo_id, release, tag_only) #Do an upsert session.insert_data(release_inf,Release,['release_id']) diff --git a/augur/tasks/github/releases/tasks.py b/augur/tasks/github/releases/tasks.py index 5e86c6d69f..a6d0d81454 100644 --- a/augur/tasks/github/releases/tasks.py +++ b/augur/tasks/github/releases/tasks.py @@ -9,8 +9,6 @@ def collect_releases(repo_git): from augur.tasks.init.celery_app import engine - from augur.tasks.init.celery_app import engine - logger = logging.getLogger(collect_releases.__name__) with GithubTaskSession(logger, engine) as session: diff --git a/augur/tasks/github/repo_info/core.py b/augur/tasks/github/repo_info/core.py index cb62ab1452..b4a49bfed4 100644 --- a/augur/tasks/github/repo_info/core.py +++ b/augur/tasks/github/repo_info/core.py @@ -100,8 +100,8 @@ def grab_repo_info_from_graphql_endpoint(session,query): return data -def repo_info_model(session, repo_orm_obj): - session.logger.info("Beginning filling the repo_info model for repo: " + repo_orm_obj.repo_git + "\n") +def repo_info_model(session, repo_orm_obj, logger): + logger.info("Beginning filling the repo_info model for repo: " + repo_orm_obj.repo_git + "\n") owner, repo = get_owner_repo(repo_orm_obj.repo_git) @@ -220,7 +220,7 @@ def repo_info_model(session, repo_orm_obj): try: data = grab_repo_info_from_graphql_endpoint(session, query) except Exception as e: - session.logger.error(f"Could not grab info for repo {repo_orm_obj.repo_id}") + logger.error(f"Could not grab info for repo {repo_orm_obj.repo_id}") raise e return @@ -235,7 +235,7 @@ def repo_info_model(session, repo_orm_obj): committers_count = query_committers_count(session, owner, repo) # Put all data together in format of the table - session.logger.info(f'Inserting repo info for repo with id:{repo_orm_obj.repo_id}, owner:{owner}, name:{repo}\n') + logger.info(f'Inserting repo info for repo with id:{repo_orm_obj.repo_id}, owner:{owner}, name:{repo}\n') rep_inf = { 'repo_id': repo_orm_obj.repo_id, 'last_updated': data['updatedAt'] if 'updatedAt' in data else None, @@ -301,6 +301,6 @@ def repo_info_model(session, repo_orm_obj): update_repo_data = s.sql.text("""UPDATE repo SET forked_from=:forked, repo_archived=:archived, repo_archived_date_collected=:archived_date_collected WHERE repo_id=:repo_id""").bindparams(forked=forked, archived=archived, archived_date_collected=archived_date_collected, repo_id=repo_orm_obj.repo_id) session.execute_sql(update_repo_data) - session.logger.info(f"Inserted info for {owner}/{repo}\n") + logger.info(f"Inserted info for {owner}/{repo}\n") diff --git a/augur/tasks/github/repo_info/tasks.py b/augur/tasks/github/repo_info/tasks.py index b26ed2e09e..1c58c3dd6c 100644 --- a/augur/tasks/github/repo_info/tasks.py +++ b/augur/tasks/github/repo_info/tasks.py @@ -10,15 +10,13 @@ def collect_repo_info(repo_git: str): from augur.tasks.init.celery_app import engine - from augur.tasks.init.celery_app import engine - logger = logging.getLogger(collect_repo_info.__name__) with GithubTaskSession(logger, engine) as session: query = session.query(Repo).filter(Repo.repo_git == repo_git) repo = execute_session_query(query, 'one') try: - repo_info_model(session, repo) + repo_info_model(session, repo, logger) except Exception as e: session.logger.error(f"Could not add repo info for repo {repo.repo_id}\n Error: {e}") session.logger.error( diff --git a/augur/tasks/init/celery_app.py b/augur/tasks/init/celery_app.py index 3762054903..1a238ebbd0 100644 --- a/augur/tasks/init/celery_app.py +++ b/augur/tasks/init/celery_app.py @@ -40,7 +40,8 @@ data_analysis_tasks = ['augur.tasks.data_analysis.message_insights.tasks', 'augur.tasks.data_analysis.clustering_worker.tasks', 'augur.tasks.data_analysis.discourse_analysis.tasks', - 'augur.tasks.data_analysis.pull_request_analysis_worker.tasks'] + 'augur.tasks.data_analysis.pull_request_analysis_worker.tasks', + 'augur.tasks.data_analysis.insight_worker.tasks'] materialized_view_tasks = ['augur.tasks.db.refresh_materialized_views'] @@ -160,7 +161,7 @@ def init_worker(**kwargs): from augur.application.db.engine import DatabaseEngine - engine = DatabaseEngine(pool_size=5, max_overflow=10, pool_timeout=240).engine + engine = DatabaseEngine(pool_size=5, max_overflow=10).engine @worker_process_shutdown.connect diff --git a/augur/tasks/start_tasks.py b/augur/tasks/start_tasks.py index 3404bc679b..f3ac6bcd2d 100644 --- a/augur/tasks/start_tasks.py +++ b/augur/tasks/start_tasks.py @@ -30,7 +30,6 @@ from augur.application.logs import AugurLogger from augur.application.config import AugurConfig from augur.application.db.session import DatabaseSession -from augur.application.db.engine import DatabaseEngine from augur.application.db.util import execute_session_query from logging import Logger from enum import Enum diff --git a/docker/augurface/Dockerfile b/docker/augurface/Dockerfile index d495808b8e..c758d6c253 100644 --- a/docker/augurface/Dockerfile +++ b/docker/augurface/Dockerfile @@ -2,7 +2,7 @@ FROM node:16 as build-stage LABEL maintainer="outdoors@acm.org" -LABEL version="0.42.0" +LABEL version="0.44.2" WORKDIR /augur/frontend/ COPY ./docker/frontend/docker.config.json frontend.config.json diff --git a/docker/backend/Dockerfile b/docker/backend/Dockerfile index b536873d0e..ca569339bc 100644 --- a/docker/backend/Dockerfile +++ b/docker/backend/Dockerfile @@ -2,7 +2,7 @@ FROM python:3.8.11-slim-buster LABEL maintainer="outdoors@acm.org" -LABEL version="0.42.0" +LABEL version="0.44.2" ENV DEBIAN_FRONTEND=noninteractive diff --git a/docker/database/Dockerfile b/docker/database/Dockerfile index a8a725a71a..6cf9517cad 100644 --- a/docker/database/Dockerfile +++ b/docker/database/Dockerfile @@ -2,7 +2,7 @@ FROM postgres:12 LABEL maintainer="outdoors@acm.org" -LABEL version="0.42.0" +LABEL version="0.44.2" ENV POSTGRES_DB "test" ENV POSTGRES_USER "augur" diff --git a/docker/frontend/Dockerfile b/docker/frontend/Dockerfile index eb29636940..317e3a0cd2 100644 --- a/docker/frontend/Dockerfile +++ b/docker/frontend/Dockerfile @@ -2,7 +2,7 @@ FROM node:16 as build-stage LABEL maintainer="outdoors@acm.org" -LABEL version="0.42.0" +LABEL version="0.44.2" WORKDIR /augur/frontend/ COPY ./docker/frontend/docker.config.json frontend.config.json diff --git a/metadata.py b/metadata.py index 525dffaad1..77f950c14b 100644 --- a/metadata.py +++ b/metadata.py @@ -5,8 +5,8 @@ __short_description__ = "Python 3 package for free/libre and open-source software community metrics, models & data collection" -__version__ = "0.44.0" -__release__ = "v0.44.0 (Brussel Sprouts)" +__version__ = "0.44.2" +__release__ = "v0.44.2 (Brussels Sprouts)" __license__ = "MIT" __copyright__ = "University of Missouri, University of Nebraska-Omaha, CHAOSS, Brian Warner & Augurlabs 2023" diff --git a/scripts/docker/install-workers-deps.sh b/scripts/docker/install-workers-deps.sh index cdd18a5c76..90d4d44e0e 100755 --- a/scripts/docker/install-workers-deps.sh +++ b/scripts/docker/install-workers-deps.sh @@ -10,4 +10,7 @@ do done # install nltk -/opt/venv/bin/python -m nltk.downloader all +# taken from ./scripts/install/nltk_dictionaries.sh +for i in stopwords punkt popular universal_tagset ; do + /opt/venv/bin/python -m nltk.downloader $i +done