Skip to content

Commit

Permalink
feat: add slot_millis and add stats to session object (#725)
Browse files Browse the repository at this point in the history
* feat: add slot_millis and add stats to session object

* fix none handling

* fix indent

* make incrementor internal

* update noxfile

* update comment

* add comment
  • Loading branch information
milkshakeiii committed May 28, 2024
1 parent d850da6 commit 72e9583
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 32 deletions.
26 changes: 24 additions & 2 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,10 @@ def __init__(
nodes.BigFrameNode, nodes.BigFrameNode
] = weakref.WeakKeyDictionary()

# performance logging
self._bytes_processed_sum = 0
self._slot_millis_sum = 0

@property
def bqclient(self):
return self._clients_provider.bqclient
Expand Down Expand Up @@ -338,6 +342,24 @@ def objects(
def _project(self):
return self.bqclient.project

@property
def bytes_processed_sum(self):
"""The sum of all bytes processed by bigquery jobs using this session."""
return self._bytes_processed_sum

@property
def slot_millis_sum(self):
"""The sum of all slot time used by bigquery jobs in this session."""
return self._slot_millis_sum

def _add_bytes_processed(self, amount: int):
"""Increment bytes_processed_sum by amount."""
self._bytes_processed_sum += amount

def _add_slot_millis(self, amount: int):
"""Increment slot_millis_sum by amount."""
self._slot_millis_sum += amount

def __hash__(self):
# Stable hash needed to use in expression tree
return hash(str(self._anonymous_dataset))
Expand Down Expand Up @@ -1825,7 +1847,7 @@ def _start_query(
"""
job_config = self._prepare_query_job_config(job_config)
return bigframes.session._io.bigquery.start_query_with_client(
self.bqclient,
self,
sql,
job_config,
max_results,
Expand All @@ -1849,7 +1871,7 @@ def _start_query_ml_ddl(
job_config.destination_encryption_configuration = None

return bigframes.session._io.bigquery.start_query_with_client(
self.bqclient, sql, job_config
self, sql, job_config
)

def _cache_with_cluster_cols(
Expand Down
51 changes: 40 additions & 11 deletions bigframes/session/_io/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ def add_labels(job_config, api_name: Optional[str] = None):


def start_query_with_client(
bq_client: bigquery.Client,
session: bigframes.session.Session,
sql: str,
job_config: bigquery.job.QueryJobConfig,
max_results: Optional[int] = None,
Expand All @@ -229,6 +229,7 @@ def start_query_with_client(
"""
Starts query job and waits for results.
"""
bq_client: bigquery.Client = session.bqclient
add_labels(job_config, api_name=api_name)

try:
Expand All @@ -246,14 +247,41 @@ def start_query_with_client(
else:
results_iterator = query_job.result(max_results=max_results)

if LOGGING_NAME_ENV_VAR in os.environ:
# when running notebooks via pytest nbmake
pytest_log_job(query_job)
stats = get_performance_stats(query_job)
if stats is not None:
bytes_processed, slot_millis = stats
session._add_bytes_processed(bytes_processed)
session._add_slot_millis(slot_millis)
if LOGGING_NAME_ENV_VAR in os.environ:
# when running notebooks via pytest nbmake
write_stats_to_disk(bytes_processed, slot_millis)

return results_iterator, query_job


def pytest_log_job(query_job: bigquery.QueryJob):
def get_performance_stats(query_job: bigquery.QueryJob) -> Optional[Tuple[int, int]]:
"""Parse the query job for performance stats.
Return None if the stats do not reflect real work done in bigquery.
"""
bytes_processed = query_job.total_bytes_processed
if not isinstance(bytes_processed, int):
return None # filter out mocks
if query_job.configuration.dry_run:
# dry run stats are just predictions of the real run
bytes_processed = 0

slot_millis = query_job.slot_millis
if not isinstance(slot_millis, int):
return None # filter out mocks
if query_job.configuration.dry_run:
# dry run stats are just predictions of the real run
slot_millis = 0

return bytes_processed, slot_millis


def write_stats_to_disk(bytes_processed: int, slot_millis: int):
"""For pytest runs only, log information about the query job
to a file in order to create a performance report.
"""
Expand All @@ -265,16 +293,17 @@ def pytest_log_job(query_job: bigquery.QueryJob):
)
test_name = os.environ[LOGGING_NAME_ENV_VAR]
current_directory = os.getcwd()
bytes_processed = query_job.total_bytes_processed
if not isinstance(bytes_processed, int):
return # filter out mocks
if query_job.configuration.dry_run:
# dry runs don't process their total_bytes_processed
bytes_processed = 0

# store bytes processed
bytes_file = os.path.join(current_directory, test_name + ".bytesprocessed")
with open(bytes_file, "a") as f:
f.write(str(bytes_processed) + "\n")

# store slot milliseconds
bytes_file = os.path.join(current_directory, test_name + ".slotmillis")
with open(bytes_file, "a") as f:
f.write(str(slot_millis) + "\n")


def delete_tables_matching_session_id(
client: bigquery.Client, dataset: bigquery.DatasetReference, session_id: str
Expand Down
62 changes: 43 additions & 19 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -800,33 +800,57 @@ def notebook(session: nox.Session):
for process in processes:
process.join()

# when run via pytest, notebooks output a .bytesprocessed report
# when the environment variable is set as it is above,
# notebooks output a .bytesprocessed and .slotmillis report
# collect those reports and print a summary
_print_bytes_processed_report()
_print_performance_report()


def _print_bytes_processed_report():
"""Add an informational report about http queries and bytes
processed to the testlog output for purposes of measuring
bigquery-related performance changes.
def _print_performance_report():
"""Add an informational report about http queries, bytes
processed, and slot time to the testlog output for purposes
of measuring bigquery-related performance changes.
"""
print("---BIGQUERY USAGE REPORT---")
cumulative_queries = 0
cumulative_bytes = 0
for report in Path("notebooks/").glob("*/*.bytesprocessed"):
with open(report, "r") as f:
filename = report.stem
lines = f.read().splitlines()
results_dict = {}
for bytes_report in Path("notebooks/").glob("*/*.bytesprocessed"):
with open(bytes_report, "r") as bytes_file:
filename = bytes_report.stem
lines = bytes_file.read().splitlines()
query_count = len(lines)
total_bytes = sum([int(line) for line in lines])
format_string = f"{filename} - query count: {query_count}, bytes processed sum: {total_bytes}"
print(format_string)
cumulative_bytes += total_bytes
cumulative_queries += query_count
print(
"---total queries: {total_queries}, total bytes: {total_bytes}---".format(
total_queries=cumulative_queries, total_bytes=cumulative_bytes
results_dict[filename] = [query_count, total_bytes]
for millis_report in Path("notebooks/").glob("*/*.slotmillis"):
with open(millis_report, "r") as millis_file:
filename = millis_report.stem
lines = millis_file.read().splitlines()
total_slot_millis = sum([int(line) for line in lines])
results_dict[filename] += [total_slot_millis]

cumulative_queries = 0
cumulative_bytes = 0
cumulative_slot_millis = 0
for results in results_dict.values():
if len(results) != 3:
raise IOError(
"Mismatch in performance logging output. "
"Expected one .bytesprocessed and one .slotmillis "
"file for each notebook."
)
query_count, total_bytes, total_slot_millis = results
cumulative_queries += query_count
cumulative_bytes += total_bytes
cumulative_slot_millis += total_slot_millis
print(
f"{filename} - query count: {query_count},"
f" bytes processed sum: {total_bytes},"
f" slot millis sum: {total_slot_millis}"
)

print(
f"---total queries: {cumulative_queries}, "
f"total bytes: {cumulative_bytes}, "
f"total slot millis: {cumulative_slot_millis}---"
)


Expand Down

0 comments on commit 72e9583

Please sign in to comment.