diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 83481a3ae9..07bb6ddce0 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -289,6 +289,10 @@ def __init__( nodes.BigFrameNode, nodes.BigFrameNode ] = weakref.WeakKeyDictionary() + # performance logging + self._bytes_processed_sum = 0 + self._slot_millis_sum = 0 + @property def bqclient(self): return self._clients_provider.bqclient @@ -338,6 +342,24 @@ def objects( def _project(self): return self.bqclient.project + @property + def bytes_processed_sum(self): + """The sum of all bytes processed by bigquery jobs using this session.""" + return self._bytes_processed_sum + + @property + def slot_millis_sum(self): + """The sum of all slot time used by bigquery jobs in this session.""" + return self._slot_millis_sum + + def _add_bytes_processed(self, amount: int): + """Increment bytes_processed_sum by amount.""" + self._bytes_processed_sum += amount + + def _add_slot_millis(self, amount: int): + """Increment slot_millis_sum by amount.""" + self._slot_millis_sum += amount + def __hash__(self): # Stable hash needed to use in expression tree return hash(str(self._anonymous_dataset)) @@ -1825,7 +1847,7 @@ def _start_query( """ job_config = self._prepare_query_job_config(job_config) return bigframes.session._io.bigquery.start_query_with_client( - self.bqclient, + self, sql, job_config, max_results, @@ -1849,7 +1871,7 @@ def _start_query_ml_ddl( job_config.destination_encryption_configuration = None return bigframes.session._io.bigquery.start_query_with_client( - self.bqclient, sql, job_config + self, sql, job_config ) def _cache_with_cluster_cols( diff --git a/bigframes/session/_io/bigquery/__init__.py b/bigframes/session/_io/bigquery/__init__.py index cd6847c312..6afa86aa2d 100644 --- a/bigframes/session/_io/bigquery/__init__.py +++ b/bigframes/session/_io/bigquery/__init__.py @@ -219,7 +219,7 @@ def add_labels(job_config, api_name: Optional[str] = None): def start_query_with_client( - bq_client: bigquery.Client, + session: bigframes.session.Session, sql: str, job_config: bigquery.job.QueryJobConfig, max_results: Optional[int] = None, @@ -229,6 +229,7 @@ def start_query_with_client( """ Starts query job and waits for results. """ + bq_client: bigquery.Client = session.bqclient add_labels(job_config, api_name=api_name) try: @@ -246,14 +247,41 @@ def start_query_with_client( else: results_iterator = query_job.result(max_results=max_results) - if LOGGING_NAME_ENV_VAR in os.environ: - # when running notebooks via pytest nbmake - pytest_log_job(query_job) + stats = get_performance_stats(query_job) + if stats is not None: + bytes_processed, slot_millis = stats + session._add_bytes_processed(bytes_processed) + session._add_slot_millis(slot_millis) + if LOGGING_NAME_ENV_VAR in os.environ: + # when running notebooks via pytest nbmake + write_stats_to_disk(bytes_processed, slot_millis) return results_iterator, query_job -def pytest_log_job(query_job: bigquery.QueryJob): +def get_performance_stats(query_job: bigquery.QueryJob) -> Optional[Tuple[int, int]]: + """Parse the query job for performance stats. + + Return None if the stats do not reflect real work done in bigquery. + """ + bytes_processed = query_job.total_bytes_processed + if not isinstance(bytes_processed, int): + return None # filter out mocks + if query_job.configuration.dry_run: + # dry run stats are just predictions of the real run + bytes_processed = 0 + + slot_millis = query_job.slot_millis + if not isinstance(slot_millis, int): + return None # filter out mocks + if query_job.configuration.dry_run: + # dry run stats are just predictions of the real run + slot_millis = 0 + + return bytes_processed, slot_millis + + +def write_stats_to_disk(bytes_processed: int, slot_millis: int): """For pytest runs only, log information about the query job to a file in order to create a performance report. """ @@ -265,16 +293,17 @@ def pytest_log_job(query_job: bigquery.QueryJob): ) test_name = os.environ[LOGGING_NAME_ENV_VAR] current_directory = os.getcwd() - bytes_processed = query_job.total_bytes_processed - if not isinstance(bytes_processed, int): - return # filter out mocks - if query_job.configuration.dry_run: - # dry runs don't process their total_bytes_processed - bytes_processed = 0 + + # store bytes processed bytes_file = os.path.join(current_directory, test_name + ".bytesprocessed") with open(bytes_file, "a") as f: f.write(str(bytes_processed) + "\n") + # store slot milliseconds + bytes_file = os.path.join(current_directory, test_name + ".slotmillis") + with open(bytes_file, "a") as f: + f.write(str(slot_millis) + "\n") + def delete_tables_matching_session_id( client: bigquery.Client, dataset: bigquery.DatasetReference, session_id: str diff --git a/noxfile.py b/noxfile.py index c6e8da8c81..52583bbf1a 100644 --- a/noxfile.py +++ b/noxfile.py @@ -800,33 +800,57 @@ def notebook(session: nox.Session): for process in processes: process.join() - # when run via pytest, notebooks output a .bytesprocessed report + # when the environment variable is set as it is above, + # notebooks output a .bytesprocessed and .slotmillis report # collect those reports and print a summary - _print_bytes_processed_report() + _print_performance_report() -def _print_bytes_processed_report(): - """Add an informational report about http queries and bytes - processed to the testlog output for purposes of measuring - bigquery-related performance changes. +def _print_performance_report(): + """Add an informational report about http queries, bytes + processed, and slot time to the testlog output for purposes + of measuring bigquery-related performance changes. """ print("---BIGQUERY USAGE REPORT---") - cumulative_queries = 0 - cumulative_bytes = 0 - for report in Path("notebooks/").glob("*/*.bytesprocessed"): - with open(report, "r") as f: - filename = report.stem - lines = f.read().splitlines() + results_dict = {} + for bytes_report in Path("notebooks/").glob("*/*.bytesprocessed"): + with open(bytes_report, "r") as bytes_file: + filename = bytes_report.stem + lines = bytes_file.read().splitlines() query_count = len(lines) total_bytes = sum([int(line) for line in lines]) - format_string = f"{filename} - query count: {query_count}, bytes processed sum: {total_bytes}" - print(format_string) - cumulative_bytes += total_bytes - cumulative_queries += query_count - print( - "---total queries: {total_queries}, total bytes: {total_bytes}---".format( - total_queries=cumulative_queries, total_bytes=cumulative_bytes + results_dict[filename] = [query_count, total_bytes] + for millis_report in Path("notebooks/").glob("*/*.slotmillis"): + with open(millis_report, "r") as millis_file: + filename = millis_report.stem + lines = millis_file.read().splitlines() + total_slot_millis = sum([int(line) for line in lines]) + results_dict[filename] += [total_slot_millis] + + cumulative_queries = 0 + cumulative_bytes = 0 + cumulative_slot_millis = 0 + for results in results_dict.values(): + if len(results) != 3: + raise IOError( + "Mismatch in performance logging output. " + "Expected one .bytesprocessed and one .slotmillis " + "file for each notebook." + ) + query_count, total_bytes, total_slot_millis = results + cumulative_queries += query_count + cumulative_bytes += total_bytes + cumulative_slot_millis += total_slot_millis + print( + f"{filename} - query count: {query_count}," + f" bytes processed sum: {total_bytes}," + f" slot millis sum: {total_slot_millis}" ) + + print( + f"---total queries: {cumulative_queries}, " + f"total bytes: {cumulative_bytes}, " + f"total slot millis: {cumulative_slot_millis}---" )